Model/backend/condition/parsing/lbwf_parser.py
2026-01-19 16:51:18 +00:00

112 lines
4.2 KiB
Python

from typing import BinaryIO, Any, Dict, Iterator, List, Tuple
from openpyxl import Workbook, load_workbook
from datetime import date
from backend.condition.parsing.parser import Parser
from backend.condition.parsing.records.lbwf_asset_condition import LbwfAssetCondition
from backend.condition.utils.date_utils import normalise_date
from utils.logger import setup_logger
logger = setup_logger
class LbwfParser(Parser):
def parse(self, file_stream: BinaryIO) -> Any:
wb = load_workbook(file_stream)
address_to_uprn_map: Dict[str, int] = LbwfParser._generate_address_to_uprn_dict(wb)
assets_sheet: Workbook = wb["Houses Asset Data"]
rows: Iterator[Tuple[object | None, ...]] = assets_sheet.iter_rows(values_only=True)
headers = next(rows)
header_indexes: Dict[str, int] = LbwfParser._get_column_indexes_by_name(headers)
assets: List[LbwfAssetCondition] = []
for row in rows:
try:
assets.append(LbwfParser._map_row_to_asset_record(row, header_indexes, address_to_uprn_map))
except Exception as e:
logger.error(f"Error mapping LBWF row to asset record: {e}")
print(assets)
return assets
@staticmethod
def _map_row_to_asset_record(
row: Any | Tuple[object | None, ...],
header_indexes: Dict[str, int],
address_to_uprn_map: Dict[str, int]
) -> LbwfAssetCondition:
address: str = row[header_indexes["ADDRESS"]]
return LbwfAssetCondition(
uprn=LbwfParser._get_uprn_from_address(address, address_to_uprn_map),
prop_ref=row[header_indexes["PROP REF"]],
domna=row[header_indexes["Domna"]],
address=address,
ownership=row[header_indexes["OWNERSHIP"]],
prop_status=row[header_indexes["PROP STATUS"]],
prop_type=row[header_indexes["PROP TYPE"]],
prop_sub_type=row[header_indexes["PROP SUB TYPE"]],
element_group=row[header_indexes["ELEMENT GROUP"]],
element_code=row[header_indexes["ELEMENT CODE"]],
element_code_description=row[header_indexes["ELEMENT CODE DESCRIPTION"]],
attribute_code=row[header_indexes["ATTRIBUTE CODE"]],
attribute_code_description=row[header_indexes["ATTRIBUTE CODE DESCRIPTION"]],
element_date_value=row[header_indexes["ELEMENT DATE VALUE"]],
element_numerical_value=row[header_indexes["ELEMENT NUMERIC VALUE"]],
element_text_value=row[header_indexes["ELEMENT TEXT VALUE"]],
quantity=row[header_indexes["QUANTITY"]],
install_date=normalise_date(row[header_indexes["INSTALL DATE"]]),
remaining_life=row[header_indexes["REMAINING LIFE"]],
element_comments=row[header_indexes["ELEMENT COMMENTS"]],
)
@staticmethod
def _generate_address_to_uprn_dict(wb: Workbook) -> Dict[str, int | None]:
sheet: Workbook = wb["All Energy Breakdown "]
rows: Iterator[Tuple[object | None, ...]] = sheet.iter_rows(values_only=True)
headers = next(rows)
header_indexes: Dict[str, int] = LbwfParser._get_column_indexes_by_name(headers)
address_idx = header_indexes["Address"]
uprn_idx = header_indexes["UPRN"]
mapping: Dict[str, int | None] = {}
for row in rows:
address = row[address_idx]
uprn = row[uprn_idx]
if not isinstance(address, str):
continue
if uprn is not None and not isinstance(uprn, int):
raise ValueError(f"Unexpected UPRN value: {uprn!r}")
mapping[address] = uprn
return mapping
def _get_column_indexes_by_name(
headers: Tuple[object | None, ...]
) -> Dict[str, int]:
index: Dict[str, int] = {}
for i, header in enumerate(headers):
if isinstance(header, str):
index[header] = i
return index
def _get_uprn_from_address(address: str, address_to_uprn_map: Dict[str, int]) -> int | None:
pseudo_name = address.split(",")[0]
if pseudo_name.lower() in (k.lower() for k in address_to_uprn_map.keys()):
return address_to_uprn_map[pseudo_name.upper()]
return None