import csv from pathlib import Path from typing import Any, BinaryIO, Dict, List, Optional, Tuple, DefaultDict from openpyxl import Workbook, load_workbook from collections import defaultdict from backend.condition.lookups.uprn_lookup import UprnLookup from backend.condition.parsing.parser import Parser from backend.condition.parsing.records.peabody.peabody_asset_condition import ( PeabodyAssetCondition, ) from backend.condition.parsing.records.peabody.peabody_property import PeabodyProperty from utils.logger import setup_logger logger = setup_logger() class PeabodyParser(Parser): def __init__(self, uprn_lookup: UprnLookup): self.uprn_lookup: UprnLookup = uprn_lookup # TODO: move this to the ABC? def parse( self, file_stream: BinaryIO, ) -> Any: file_stream.seek(0) logger.debug("[PeabodyParser] Loading workbook...") wb: Workbook = load_workbook(file_stream, read_only=True, data_only=True) logger.debug("[PeabodyParser] Successfully loaded workbook. Parsing assets...") assets = PeabodyParser._parse_assets(wb) logger.debug( "[PeabodyParser] Successfully parsed assets. Parsing UPRN lookup..." ) location_ref_to_uprn_map = self.uprn_lookup.get_property_ref_to_uprn_lookup() logger.debug("[PeabodyParser] Successfully parsed UPRN lookup") return PeabodyParser._group_assets_into_properties( assets=assets, location_ref_to_uprn_map=location_ref_to_uprn_map, ) @staticmethod def _parse_assets(wb: Workbook) -> List[PeabodyAssetCondition]: assets_sheet = wb["Survey Records - D & Lower"] asset_rows = assets_sheet.iter_rows(values_only=True) asset_headers = next(asset_rows) asset_header_indexes = PeabodyParser._get_column_indexes_by_name(asset_headers) assets: List[PeabodyAssetCondition] = [] for row in asset_rows: try: asset = PeabodyParser._map_row_to_asset_record( row, asset_header_indexes ) if not asset.is_block_level: # Block-level condition surveys are out of scope for now # until we have a wider think on how to handle blocks assets.append(asset) # TODO: handle block-level assets except Exception as e: logger.error(f"Error mapping Peabody row to asset record: {e}") continue return assets @staticmethod def _group_assets_into_properties( assets: List[PeabodyAssetCondition], location_ref_to_uprn_map: Dict[str, int], ) -> List[PeabodyProperty]: assets_by_location_reference: DefaultDict[str, List[PeabodyAssetCondition]] = ( defaultdict(list) ) for asset in assets: if asset.lo_reference is None: continue assets_by_location_reference[asset.lo_reference].append(asset) properties: List[PeabodyProperty] = [] failed_mappings_count = 0 for location_ref, grouped_assets in assets_by_location_reference.items(): uprn = location_ref_to_uprn_map.get(location_ref) if uprn is None: failed_mappings_count += 1 continue properties.append( PeabodyProperty( uprn=uprn, assets=grouped_assets, ) ) logger.warning(f"No UPRN found for {failed_mappings_count} Location References") return properties @staticmethod def _map_row_to_asset_record( row: Any | Tuple[object | None, ...], header_indexes: Dict[str, int], ) -> PeabodyAssetCondition: return PeabodyAssetCondition( lo_reference=row[header_indexes["Lo_Reference"]], full_address=row[header_indexes["full_address"]], location_type_code=row[header_indexes["location_type_code"]], parent_lo_reference=row[header_indexes["Parent_Lo_Reference"]], element_code=row[header_indexes["Element_Code"]], element=row[header_indexes["Element"]], sub_element_code=row[header_indexes["Sub_Element_Code"]], sub_element=row[header_indexes["Sub_Element"]], material_code=row[header_indexes["Material_Code"]], material_or_answer=row[header_indexes["material_or_answer"]], renewal_quantity=row[header_indexes["Renewal_Quantity"]], renewal_year=row[header_indexes["Renewal_Year"]], renewal_cost=row[header_indexes["Renewal_Cost"]], cloned=row[header_indexes["cloned"]], lo_type_code=row[header_indexes["lo_type_code"]], condition_survey_date=row[header_indexes["condition_survey_date"]], ) @staticmethod def _get_column_indexes_by_name( headers: Tuple[object | None, ...], ) -> Dict[str, int]: index: Dict[str, int] = {} for i, header in enumerate(headers): if isinstance(header, str): index[header] = i return index