Model/backend/condition/parsing/lbwf_parser.py

180 lines
6.3 KiB
Python

from typing import BinaryIO, Any, Dict, Iterator, List, Tuple
from openpyxl import Workbook, load_workbook
from collections import defaultdict
from backend.condition.parsing.parser import Parser
from backend.condition.parsing.records.lbwf.lbwf_asset_condition import LbwfAssetCondition
from backend.condition.parsing.records.lbwf.lbwf_house import LbwfHouse
from backend.condition.utils.date_utils import normalise_date
from utils.logger import setup_logger
logger = setup_logger
class LbwfParser(Parser):
def parse(self, file_stream: BinaryIO) -> Any:
wb: Workbook = load_workbook(file_stream)
address_to_uprn_map: Dict[str, int] = self._generate_address_to_uprn_dict(wb)
assets = self._parse_assets(wb)
houses = self._parse_houses(wb, address_to_uprn_map)
self._merge_assets_into_houses(assets, houses)
return houses
@staticmethod
def _parse_assets(wb: Workbook) -> List[LbwfAssetCondition]:
assets_sheet = wb["Houses Asset Data"]
asset_rows = assets_sheet.iter_rows(values_only=True)
asset_headers = next(asset_rows)
asset_header_indexes = LbwfParser._get_column_indexes_by_name(asset_headers)
assets: List[LbwfAssetCondition] = []
for row in asset_rows:
try:
assets.append(
LbwfParser._map_row_to_asset_record(row, asset_header_indexes)
)
except Exception as e:
logger.error(f"Error mapping LBWF row to asset record: {e}")
continue
return assets
@staticmethod
def _parse_houses(
wb: Workbook,
address_to_uprn_map: Dict[str, int],
) -> List[LbwfHouse]:
houses_sheet = wb["Houses"]
house_rows = houses_sheet.iter_rows(values_only=True)
house_headers = next(house_rows)
house_header_indexes = LbwfParser._get_column_indexes_by_name(house_headers)
houses: List[LbwfHouse] = []
for row in house_rows:
try:
houses.append(
LbwfParser._map_row_to_house_record(
row,
house_header_indexes,
address_to_uprn_map,
)
)
except Exception as e:
logger.error(f"Error mapping LBWF row to house record: {e}")
continue
return houses
@staticmethod
def _merge_assets_into_houses(
assets: List[LbwfAssetCondition],
houses: List[LbwfHouse],
) -> None:
assets_by_ref: Dict[int, List[LbwfAssetCondition]] = defaultdict(list)
for asset in assets:
assets_by_ref[asset.prop_ref].append(asset)
for house in houses:
house.assets = assets_by_ref.get(house.reference, [])
@staticmethod
def _map_row_to_house_record(
row: Any | Tuple[object | None, ...],
header_indexes: Dict[str, int],
address_to_uprn_map: Dict[str, int],
) -> LbwfHouse:
address: str = row[header_indexes["Address"]]
return LbwfHouse(
uprn=LbwfParser._get_uprn_from_address(address, address_to_uprn_map),
reference=row[header_indexes["Reference"]],
address=address,
epc=row[header_indexes["EPC "]],
shdf=row[header_indexes["SHDF"]],
house=row[header_indexes["HOSUE"]],
fail_decency=row[header_indexes["Fail Decency"]],
assets=[],
)
@staticmethod
def _map_row_to_asset_record(
row: Any | Tuple[object | None, ...],
header_indexes: Dict[str, int],
) -> LbwfAssetCondition:
return LbwfAssetCondition(
prop_ref=row[header_indexes["PROP REF"]],
domna=row[header_indexes["Domna"]],
address=row[header_indexes["ADDRESS"]],
ownership=row[header_indexes["OWNERSHIP"]],
prop_status=row[header_indexes["PROP STATUS"]],
prop_type=row[header_indexes["PROP TYPE"]],
prop_sub_type=row[header_indexes["PROP SUB TYPE"]],
element_group=row[header_indexes["ELEMENT GROUP"]],
element_code=row[header_indexes["ELEMENT CODE"]],
element_code_description=row[header_indexes["ELEMENT CODE DESCRIPTION"]],
attribute_code=row[header_indexes["ATTRIBUTE CODE"]],
attribute_code_description=row[header_indexes["ATTRIBUTE CODE DESCRIPTION"]],
element_date_value=row[header_indexes["ELEMENT DATE VALUE"]],
element_numerical_value=row[header_indexes["ELEMENT NUMERIC VALUE"]],
element_text_value=row[header_indexes["ELEMENT TEXT VALUE"]],
quantity=row[header_indexes["QUANTITY"]],
install_date=normalise_date(row[header_indexes["INSTALL DATE"]]),
remaining_life=row[header_indexes["REMAINING LIFE"]],
element_comments=row[header_indexes["ELEMENT COMMENTS"]],
)
@staticmethod
def _generate_address_to_uprn_dict(wb: Workbook) -> Dict[str, int | None]:
sheet: Workbook = wb["All Energy Breakdown "]
rows: Iterator[Tuple[object | None, ...]] = sheet.iter_rows(values_only=True)
headers = next(rows)
header_indexes: Dict[str, int] = LbwfParser._get_column_indexes_by_name(headers)
address_idx = header_indexes["Address"]
uprn_idx = header_indexes["UPRN"]
mapping: Dict[str, int | None] = {}
for row in rows:
address = row[address_idx]
uprn = row[uprn_idx]
if not isinstance(address, str):
continue
if uprn is not None and not isinstance(uprn, int):
raise ValueError(f"Unexpected UPRN value: {uprn!r}")
mapping[address] = uprn
return mapping
def _get_column_indexes_by_name(
headers: Tuple[object | None, ...]
) -> Dict[str, int]:
index: Dict[str, int] = {}
for i, header in enumerate(headers):
if isinstance(header, str):
index[header] = i
return index
def _get_uprn_from_address(address: str, address_to_uprn_map: Dict[str, int]) -> int | None:
pseudo_name = address.split(",")[0]
if pseudo_name.lower() in (k.lower() for k in address_to_uprn_map.keys()):
return address_to_uprn_map[pseudo_name.upper()]
return None