from typing import Any, BinaryIO, Dict, Iterator, List, Tuple, DefaultDict from openpyxl import Workbook, load_workbook from collections import defaultdict from backend.condition.parsing.parser import Parser from backend.condition.parsing.records.peabody.peabody_asset_condition import PeabodyAssetCondition from backend.condition.parsing.records.peabody.peabody_property import PeabodyProperty from utils.logger import setup_logger logger = setup_logger() class PeabodyParser(Parser): def parse(self, file_stream: BinaryIO) -> Any: wb: Workbook = load_workbook(file_stream) address_to_uprn_map: Dict[str, int] = PeabodyParser._generate_address_to_uprn_dict(wb) assets = self._parse_assets(wb) return self._group_assets_into_properties( assets=assets, address_to_uprn_map=address_to_uprn_map, ) @staticmethod def _parse_assets(wb: Workbook) -> List[PeabodyAssetCondition]: assets_sheet = wb["Survey Records - D & Lower"] asset_rows = assets_sheet.iter_rows(values_only=True) asset_headers = next(asset_rows) asset_header_indexes = PeabodyParser._get_column_indexes_by_name(asset_headers) assets: List[PeabodyAssetCondition] = [] for row in asset_rows: try: asset = PeabodyParser._map_row_to_asset_record(row, asset_header_indexes) if not asset.is_block_level: # Block-level condition surveys are out of scope for now # until we have a wider think on how to handle block assets.append(asset) # TODO: handle block-level assets except Exception as e: logger.error(f"Error mapping Peabody row to asset record: {e}") continue return assets @staticmethod def _group_assets_into_properties( assets: List[PeabodyAssetCondition], address_to_uprn_map: Dict[str, int], ) -> List[PeabodyProperty]: assets_by_address: DefaultDict[str, List[PeabodyAssetCondition]] = defaultdict(list) for asset in assets: if asset.full_address is None: continue address = asset.full_address.strip() assets_by_address[address].append(asset) properties: List[PeabodyProperty] = [] for address, grouped_assets in assets_by_address.items(): uprn = address_to_uprn_map.get(address) if uprn is None: logger.warning(f"No UPRN found for address: {address}") continue properties.append( PeabodyProperty( uprn=uprn, assets=grouped_assets, ) ) return properties @staticmethod def _map_row_to_asset_record( row: Any | Tuple[object | None, ...], header_indexes: Dict[str, int], ) -> PeabodyAssetCondition: return PeabodyAssetCondition( lo_reference=row[header_indexes["Lo_Reference"]], full_address=row[header_indexes["full_address"]], location_type_code=row[header_indexes["location_type_code"]], parent_lo_reference=row[header_indexes["Parent_Lo_Reference"]], element_code=row[header_indexes["Element_Code"]], element=row[header_indexes["Element"]], sub_element_code=row[header_indexes["Sub_Element_Code"]], sub_element=row[header_indexes["Sub_Element"]], material_code=row[header_indexes["Material_Code"]], material_or_answer=row[header_indexes["material_or_answer"]], renewal_quantity=row[header_indexes["Renewal_Quantity"]], renewal_year=row[header_indexes["Renewal_Year"]], renewal_cost=row[header_indexes["Renewal_Cost"]], cloned=row[header_indexes["cloned"]], lo_type_code=row[header_indexes["lo_type_code"]], condition_survey_date=row[header_indexes["condition_survey_date"]], ) @staticmethod def _generate_address_to_uprn_dict(wb: Workbook) -> Dict[str, int | None]: sheet = wb["Survey Records - D & Lower"] rows: Iterator[Tuple[object | None, ...]] = sheet.iter_rows(values_only=True) headers = next(rows) header_indexes: Dict[str, int] = PeabodyParser._get_column_indexes_by_name(headers) address_idx = header_indexes["full_address"] address_to_uprn: Dict[str, int] = {} # Generate random UPRNs for now next_uprn = 1 # TODO: get real UPRNs for row in rows: address = row[address_idx] if address is None: continue address = address.strip() if address not in address_to_uprn: address_to_uprn[address] = next_uprn next_uprn += 1 return address_to_uprn @staticmethod def _get_column_indexes_by_name( headers: Tuple[object | None, ...] ) -> Dict[str, int]: index: Dict[str, int] = {} for i, header in enumerate(headers): if isinstance(header, str): index[header] = i return index