mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
182 lines
6.4 KiB
Python
182 lines
6.4 KiB
Python
from typing import BinaryIO, Any, Dict, Iterator, List, Tuple
|
|
from openpyxl import Workbook, load_workbook
|
|
from collections import defaultdict
|
|
|
|
from backend.condition.parsing.parser import Parser
|
|
from backend.condition.parsing.records.lbwf.lbwf_asset_condition import LbwfAssetCondition
|
|
from backend.condition.parsing.records.lbwf.lbwf_house import LbwfHouse
|
|
from backend.condition.utils.date_utils import normalise_date
|
|
from utils.logger import setup_logger
|
|
|
|
logger = setup_logger()
|
|
|
|
class LbwfParser(Parser):
|
|
|
|
def parse(self, file_stream: BinaryIO) -> Any:
|
|
wb: Workbook = load_workbook(file_stream)
|
|
address_to_uprn_map: Dict[str, int] = LbwfParser._generate_address_to_uprn_dict(wb)
|
|
|
|
assets = self._parse_assets(wb)
|
|
houses = self._parse_houses(wb, address_to_uprn_map)
|
|
|
|
self._merge_assets_into_houses(assets, houses)
|
|
|
|
return houses
|
|
|
|
@staticmethod
|
|
def _parse_assets(wb: Workbook) -> List[LbwfAssetCondition]:
|
|
assets_sheet = wb["Houses Asset Data"]
|
|
asset_rows = assets_sheet.iter_rows(values_only=True)
|
|
|
|
asset_headers = next(asset_rows)
|
|
asset_header_indexes = LbwfParser._get_column_indexes_by_name(asset_headers)
|
|
|
|
assets: List[LbwfAssetCondition] = []
|
|
for row in asset_rows:
|
|
try:
|
|
assets.append(
|
|
LbwfParser._map_row_to_asset_record(row, asset_header_indexes)
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error mapping LBWF row to asset record: {e}")
|
|
continue
|
|
|
|
return assets
|
|
|
|
@staticmethod
|
|
def _parse_houses(
|
|
wb: Workbook,
|
|
address_to_uprn_map: Dict[str, int],
|
|
) -> List[LbwfHouse]:
|
|
houses_sheet = wb["Houses"]
|
|
house_rows = houses_sheet.iter_rows(values_only=True)
|
|
|
|
house_headers = next(house_rows)
|
|
house_header_indexes = LbwfParser._get_column_indexes_by_name(house_headers)
|
|
|
|
houses: List[LbwfHouse] = []
|
|
for row in house_rows:
|
|
try:
|
|
houses.append(
|
|
LbwfParser._map_row_to_house_record(
|
|
row,
|
|
house_header_indexes,
|
|
address_to_uprn_map,
|
|
)
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error mapping LBWF row to house record: {e}")
|
|
continue
|
|
|
|
return houses
|
|
|
|
@staticmethod
|
|
def _merge_assets_into_houses(
|
|
assets: List[LbwfAssetCondition],
|
|
houses: List[LbwfHouse],
|
|
) -> None:
|
|
assets_by_ref: Dict[int, List[LbwfAssetCondition]] = defaultdict(list)
|
|
for asset in assets:
|
|
assets_by_ref[asset.prop_ref].append(asset)
|
|
|
|
for house in houses:
|
|
house.assets = assets_by_ref.get(house.reference, [])
|
|
|
|
|
|
@staticmethod
|
|
def _map_row_to_house_record(
|
|
row: Any | Tuple[object | None, ...],
|
|
header_indexes: Dict[str, int],
|
|
address_to_uprn_map: Dict[str, int],
|
|
) -> LbwfHouse:
|
|
address: str = row[header_indexes["Address"]]
|
|
|
|
return LbwfHouse(
|
|
uprn=LbwfParser._get_uprn_from_address(address, address_to_uprn_map),
|
|
reference=row[header_indexes["Reference"]],
|
|
address=address,
|
|
epc=row[header_indexes["EPC "]],
|
|
shdf=row[header_indexes["SHDF"]],
|
|
house=row[header_indexes["HOSUE"]],
|
|
fail_decency=row[header_indexes["Fail Decency"]],
|
|
assets=[],
|
|
)
|
|
|
|
@staticmethod
|
|
def _map_row_to_asset_record(
|
|
row: Any | Tuple[object | None, ...],
|
|
header_indexes: Dict[str, int],
|
|
) -> LbwfAssetCondition:
|
|
return LbwfAssetCondition(
|
|
prop_ref=row[header_indexes["PROP REF"]],
|
|
domna=row[header_indexes["Domna"]],
|
|
address=row[header_indexes["ADDRESS"]],
|
|
ownership=row[header_indexes["OWNERSHIP"]],
|
|
prop_status=row[header_indexes["PROP STATUS"]],
|
|
prop_type=row[header_indexes["PROP TYPE"]],
|
|
prop_sub_type=row[header_indexes["PROP SUB TYPE"]],
|
|
element_group=row[header_indexes["ELEMENT GROUP"]],
|
|
element_code=row[header_indexes["ELEMENT CODE"]],
|
|
element_code_description=row[header_indexes["ELEMENT CODE DESCRIPTION"]],
|
|
attribute_code=row[header_indexes["ATTRIBUTE CODE"]],
|
|
attribute_code_description=row[header_indexes["ATTRIBUTE CODE DESCRIPTION"]],
|
|
element_date_value=row[header_indexes["ELEMENT DATE VALUE"]],
|
|
element_numerical_value=row[header_indexes["ELEMENT NUMERIC VALUE"]],
|
|
element_text_value=row[header_indexes["ELEMENT TEXT VALUE"]],
|
|
quantity=row[header_indexes["QUANTITY"]],
|
|
install_date=normalise_date(row[header_indexes["INSTALL DATE"]]),
|
|
remaining_life=row[header_indexes["REMAINING LIFE"]],
|
|
element_comments=row[header_indexes["ELEMENT COMMENTS"]],
|
|
)
|
|
|
|
|
|
@staticmethod
|
|
def _generate_address_to_uprn_dict(wb: Workbook) -> Dict[str, int | None]:
|
|
sheet = wb["All Energy Breakdown "]
|
|
|
|
rows: Iterator[Tuple[object | None, ...]] = sheet.iter_rows(values_only=True)
|
|
|
|
headers = next(rows)
|
|
header_indexes: Dict[str, int] = LbwfParser._get_column_indexes_by_name(headers)
|
|
|
|
address_idx = header_indexes["Address"]
|
|
uprn_idx = header_indexes["UPRN"]
|
|
|
|
mapping: Dict[str, int | None] = {}
|
|
|
|
for row in rows:
|
|
address = row[address_idx]
|
|
uprn = row[uprn_idx]
|
|
|
|
if not isinstance(address, str):
|
|
continue
|
|
|
|
if uprn is not None and not isinstance(uprn, int):
|
|
raise ValueError(f"Unexpected UPRN value: {uprn!r}")
|
|
|
|
mapping[address] = uprn
|
|
|
|
return mapping
|
|
|
|
|
|
@staticmethod
|
|
def _get_column_indexes_by_name(
|
|
headers: Tuple[object | None, ...]
|
|
) -> Dict[str, int]:
|
|
index: Dict[str, int] = {}
|
|
|
|
for i, header in enumerate(headers):
|
|
if isinstance(header, str):
|
|
index[header] = i
|
|
|
|
return index
|
|
|
|
@staticmethod
|
|
def _get_uprn_from_address(address: str, address_to_uprn_map: Dict[str, int]) -> int | None:
|
|
pseudo_name = address.split(",")[0]
|
|
|
|
if pseudo_name.lower() in (k.lower() for k in address_to_uprn_map.keys()):
|
|
return address_to_uprn_map[pseudo_name.upper()]
|
|
|
|
return None
|
|
|