mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
147 lines
5.2 KiB
Python
147 lines
5.2 KiB
Python
import csv
|
|
from pathlib import Path
|
|
from typing import Any, BinaryIO, Dict, List, Optional, Tuple, DefaultDict
|
|
from openpyxl import Workbook, load_workbook
|
|
from collections import defaultdict
|
|
|
|
from backend.condition.parsing.parser import Parser
|
|
from backend.condition.parsing.records.peabody.peabody_asset_condition import (
|
|
PeabodyAssetCondition,
|
|
)
|
|
from backend.condition.parsing.records.peabody.peabody_property import PeabodyProperty
|
|
from utils.logger import setup_logger
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
class PeabodyParser(Parser):
|
|
def parse(
|
|
self,
|
|
file_stream: BinaryIO,
|
|
location_ref_to_uprn_map: Optional[Dict[str, int]] = None,
|
|
) -> Any:
|
|
wb: Workbook = load_workbook(file_stream)
|
|
|
|
if location_ref_to_uprn_map is None:
|
|
location_ref_to_uprn_map: Dict[str, int] = (
|
|
PeabodyParser._build_location_ref_to_uprn_map()
|
|
)
|
|
|
|
assets = PeabodyParser._parse_assets(wb)
|
|
|
|
return PeabodyParser._group_assets_into_properties(
|
|
assets=assets,
|
|
location_ref_to_uprn_map=location_ref_to_uprn_map,
|
|
)
|
|
|
|
@staticmethod
|
|
def _build_location_ref_to_uprn_map() -> Dict[str, int]:
|
|
location_ref_to_uprn_filepath: Path = (
|
|
Path(__file__).resolve().parents[1]
|
|
/ "sample_data"
|
|
/ "peabody"
|
|
/ "PeabodyPropertymatched_Dec25_propref_UPRN.csv"
|
|
)
|
|
location_ref_to_uprn_map: Dict[str, int] = {}
|
|
|
|
with location_ref_to_uprn_filepath.open(newline="") as f:
|
|
reader: Any = csv.DictReader(f)
|
|
for row in reader:
|
|
location_ref_to_uprn_map[row["reference"]] = int(row["out_uprn"])
|
|
|
|
return location_ref_to_uprn_map
|
|
|
|
@staticmethod
|
|
def _parse_assets(wb: Workbook) -> List[PeabodyAssetCondition]:
|
|
assets_sheet = wb["Survey Records - D & Lower"]
|
|
asset_rows = assets_sheet.iter_rows(values_only=True)
|
|
|
|
asset_headers = next(asset_rows)
|
|
asset_header_indexes = PeabodyParser._get_column_indexes_by_name(asset_headers)
|
|
|
|
assets: List[PeabodyAssetCondition] = []
|
|
for row in asset_rows:
|
|
try:
|
|
asset = PeabodyParser._map_row_to_asset_record(
|
|
row, asset_header_indexes
|
|
)
|
|
if not asset.is_block_level:
|
|
# Block-level condition surveys are out of scope for now
|
|
# until we have a wider think on how to handle block
|
|
assets.append(asset) # TODO: handle block-level assets
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error mapping Peabody row to asset record: {e}")
|
|
continue
|
|
|
|
return assets
|
|
|
|
@staticmethod
|
|
def _group_assets_into_properties(
|
|
assets: List[PeabodyAssetCondition],
|
|
location_ref_to_uprn_map: Dict[str, int],
|
|
) -> List[PeabodyProperty]:
|
|
assets_by_location_reference: DefaultDict[str, List[PeabodyAssetCondition]] = (
|
|
defaultdict(list)
|
|
)
|
|
|
|
for asset in assets:
|
|
if asset.lo_reference is None:
|
|
continue
|
|
|
|
assets_by_location_reference[asset.lo_reference].append(asset)
|
|
|
|
properties: List[PeabodyProperty] = []
|
|
|
|
for location_ref, grouped_assets in assets_by_location_reference.items():
|
|
|
|
uprn = location_ref_to_uprn_map.get(location_ref)
|
|
|
|
if uprn is None:
|
|
logger.warning(f"No UPRN found for Location Reference: {location_ref}")
|
|
continue
|
|
|
|
properties.append(
|
|
PeabodyProperty(
|
|
uprn=uprn,
|
|
assets=grouped_assets,
|
|
)
|
|
)
|
|
|
|
return properties
|
|
|
|
@staticmethod
|
|
def _map_row_to_asset_record(
|
|
row: Any | Tuple[object | None, ...],
|
|
header_indexes: Dict[str, int],
|
|
) -> PeabodyAssetCondition:
|
|
return PeabodyAssetCondition(
|
|
lo_reference=row[header_indexes["Lo_Reference"]],
|
|
full_address=row[header_indexes["full_address"]],
|
|
location_type_code=row[header_indexes["location_type_code"]],
|
|
parent_lo_reference=row[header_indexes["Parent_Lo_Reference"]],
|
|
element_code=row[header_indexes["Element_Code"]],
|
|
element=row[header_indexes["Element"]],
|
|
sub_element_code=row[header_indexes["Sub_Element_Code"]],
|
|
sub_element=row[header_indexes["Sub_Element"]],
|
|
material_code=row[header_indexes["Material_Code"]],
|
|
material_or_answer=row[header_indexes["material_or_answer"]],
|
|
renewal_quantity=row[header_indexes["Renewal_Quantity"]],
|
|
renewal_year=row[header_indexes["Renewal_Year"]],
|
|
renewal_cost=row[header_indexes["Renewal_Cost"]],
|
|
cloned=row[header_indexes["cloned"]],
|
|
lo_type_code=row[header_indexes["lo_type_code"]],
|
|
condition_survey_date=row[header_indexes["condition_survey_date"]],
|
|
)
|
|
|
|
@staticmethod
|
|
def _get_column_indexes_by_name(
|
|
headers: Tuple[object | None, ...],
|
|
) -> Dict[str, int]:
|
|
index: Dict[str, int] = {}
|
|
|
|
for i, header in enumerate(headers):
|
|
if isinstance(header, str):
|
|
index[header] = i
|
|
|
|
return index
|