Model/backend/condition/parsing/peabody_parser.py

147 lines
5.2 KiB
Python

import csv
from pathlib import Path
from typing import Any, BinaryIO, Dict, List, Optional, Tuple, DefaultDict
from openpyxl import Workbook, load_workbook
from collections import defaultdict
from backend.condition.parsing.parser import Parser
from backend.condition.parsing.records.peabody.peabody_asset_condition import (
PeabodyAssetCondition,
)
from backend.condition.parsing.records.peabody.peabody_property import PeabodyProperty
from utils.logger import setup_logger
logger = setup_logger()
class PeabodyParser(Parser):
def parse(
self,
file_stream: BinaryIO,
location_ref_to_uprn_map: Optional[Dict[str, int]] = None,
) -> Any:
wb: Workbook = load_workbook(file_stream)
if location_ref_to_uprn_map is None:
location_ref_to_uprn_map: Dict[str, int] = (
PeabodyParser._build_location_ref_to_uprn_map()
)
assets = PeabodyParser._parse_assets(wb)
return PeabodyParser._group_assets_into_properties(
assets=assets,
location_ref_to_uprn_map=location_ref_to_uprn_map,
)
@staticmethod
def _build_location_ref_to_uprn_map() -> Dict[str, int]:
location_ref_to_uprn_filepath: Path = (
Path(__file__).resolve().parents[1]
/ "sample_data"
/ "peabody"
/ "PeabodyPropertymatched_Dec25_propref_UPRN.csv"
)
location_ref_to_uprn_map: Dict[str, int] = {}
with location_ref_to_uprn_filepath.open(newline="") as f:
reader: Any = csv.DictReader(f)
for row in reader:
location_ref_to_uprn_map[row["reference"]] = int(row["out_uprn"])
return location_ref_to_uprn_map
@staticmethod
def _parse_assets(wb: Workbook) -> List[PeabodyAssetCondition]:
assets_sheet = wb["Survey Records - D & Lower"]
asset_rows = assets_sheet.iter_rows(values_only=True)
asset_headers = next(asset_rows)
asset_header_indexes = PeabodyParser._get_column_indexes_by_name(asset_headers)
assets: List[PeabodyAssetCondition] = []
for row in asset_rows:
try:
asset = PeabodyParser._map_row_to_asset_record(
row, asset_header_indexes
)
if not asset.is_block_level:
# Block-level condition surveys are out of scope for now
# until we have a wider think on how to handle block
assets.append(asset) # TODO: handle block-level assets
except Exception as e:
logger.error(f"Error mapping Peabody row to asset record: {e}")
continue
return assets
@staticmethod
def _group_assets_into_properties(
assets: List[PeabodyAssetCondition],
location_ref_to_uprn_map: Dict[str, int],
) -> List[PeabodyProperty]:
assets_by_location_reference: DefaultDict[str, List[PeabodyAssetCondition]] = (
defaultdict(list)
)
for asset in assets:
if asset.lo_reference is None:
continue
assets_by_location_reference[asset.lo_reference].append(asset)
properties: List[PeabodyProperty] = []
for location_ref, grouped_assets in assets_by_location_reference.items():
uprn = location_ref_to_uprn_map.get(location_ref)
if uprn is None:
logger.warning(f"No UPRN found for Location Reference: {location_ref}")
continue
properties.append(
PeabodyProperty(
uprn=uprn,
assets=grouped_assets,
)
)
return properties
@staticmethod
def _map_row_to_asset_record(
row: Any | Tuple[object | None, ...],
header_indexes: Dict[str, int],
) -> PeabodyAssetCondition:
return PeabodyAssetCondition(
lo_reference=row[header_indexes["Lo_Reference"]],
full_address=row[header_indexes["full_address"]],
location_type_code=row[header_indexes["location_type_code"]],
parent_lo_reference=row[header_indexes["Parent_Lo_Reference"]],
element_code=row[header_indexes["Element_Code"]],
element=row[header_indexes["Element"]],
sub_element_code=row[header_indexes["Sub_Element_Code"]],
sub_element=row[header_indexes["Sub_Element"]],
material_code=row[header_indexes["Material_Code"]],
material_or_answer=row[header_indexes["material_or_answer"]],
renewal_quantity=row[header_indexes["Renewal_Quantity"]],
renewal_year=row[header_indexes["Renewal_Year"]],
renewal_cost=row[header_indexes["Renewal_Cost"]],
cloned=row[header_indexes["cloned"]],
lo_type_code=row[header_indexes["lo_type_code"]],
condition_survey_date=row[header_indexes["condition_survey_date"]],
)
@staticmethod
def _get_column_indexes_by_name(
headers: Tuple[object | None, ...],
) -> Dict[str, int]:
index: Dict[str, int] = {}
for i, header in enumerate(headers):
if isinstance(header, str):
index[header] = i
return index