fixes so it actually runs

This commit is contained in:
Daniel Roth 2026-01-28 09:54:33 +00:00
parent 60241f947e
commit 751032e666
3 changed files with 26 additions and 14 deletions

View file

@ -29,8 +29,15 @@ class LbwfMapper(Mapper):
elements_by_key: dict[tuple[ElementType, int], Element] = {}
for raw_asset in client_property_data.assets:
if raw_asset.element_code in ["DECNTHMINC", "EICINSFREQ"]:
# skip metadata rows
continue
element_mapping = LbwfMapper._safe_map_element(raw_asset)
if not element_mapping:
continue
aspect_condition = LbwfMapper._build_aspect_condition(
raw_asset, element_mapping, survey_year
)

View file

@ -3,18 +3,23 @@ from openpyxl import Workbook, load_workbook
from collections import defaultdict
from backend.condition.parsing.parser import Parser
from backend.condition.parsing.records.lbwf.lbwf_asset_condition import LbwfAssetCondition
from backend.condition.parsing.records.lbwf.lbwf_asset_condition import (
LbwfAssetCondition,
)
from backend.condition.parsing.records.lbwf.lbwf_house import LbwfHouse
from backend.condition.utils.date_utils import normalise_date
from utils.logger import setup_logger
logger = setup_logger()
class LbwfParser(Parser):
def parse(self, file_stream: BinaryIO) -> Any:
wb: Workbook = load_workbook(file_stream)
address_to_uprn_map: Dict[str, int] = LbwfParser._generate_address_to_uprn_dict(wb)
address_to_uprn_map: Dict[str, int] = LbwfParser._generate_address_to_uprn_dict(
wb
)
assets = self._parse_assets(wb)
houses = self._parse_houses(wb, address_to_uprn_map)
@ -82,7 +87,6 @@ class LbwfParser(Parser):
for house in houses:
house.assets = assets_by_ref.get(house.reference, [])
@staticmethod
def _map_row_to_house_record(
row: Any | Tuple[object | None, ...],
@ -100,8 +104,8 @@ class LbwfParser(Parser):
house=row[header_indexes["HOSUE"]],
fail_decency=row[header_indexes["Fail Decency"]],
assets=[],
)
)
@staticmethod
def _map_row_to_asset_record(
row: Any | Tuple[object | None, ...],
@ -119,7 +123,9 @@ class LbwfParser(Parser):
element_code=row[header_indexes["ELEMENT CODE"]],
element_code_description=row[header_indexes["ELEMENT CODE DESCRIPTION"]],
attribute_code=row[header_indexes["ATTRIBUTE CODE"]],
attribute_code_description=row[header_indexes["ATTRIBUTE CODE DESCRIPTION"]],
attribute_code_description=row[
header_indexes["ATTRIBUTE CODE DESCRIPTION"]
],
element_date_value=row[header_indexes["ELEMENT DATE VALUE"]],
element_numerical_value=row[header_indexes["ELEMENT NUMERIC VALUE"]],
element_text_value=row[header_indexes["ELEMENT TEXT VALUE"]],
@ -128,7 +134,6 @@ class LbwfParser(Parser):
remaining_life=row[header_indexes["REMAINING LIFE"]],
element_comments=row[header_indexes["ELEMENT COMMENTS"]],
)
@staticmethod
def _generate_address_to_uprn_dict(wb: Workbook) -> Dict[str, int | None]:
@ -158,10 +163,9 @@ class LbwfParser(Parser):
return mapping
@staticmethod
def _get_column_indexes_by_name(
headers: Tuple[object | None, ...]
headers: Tuple[object | None, ...],
) -> Dict[str, int]:
index: Dict[str, int] = {}
@ -170,13 +174,14 @@ class LbwfParser(Parser):
index[header] = i
return index
@staticmethod
def _get_uprn_from_address(address: str, address_to_uprn_map: Dict[str, int]) -> int | None:
def _get_uprn_from_address(
address: str, address_to_uprn_map: Dict[str, int]
) -> int | None:
pseudo_name = address.split(",")[0]
if pseudo_name.lower() in (k.lower() for k in address_to_uprn_map.keys()):
return address_to_uprn_map[pseudo_name.upper()]
return None

View file

@ -25,7 +25,7 @@ def process_file(file_stream: BinaryIO, source_key: str) -> None:
property_condition_surveys: List[PropertyConditionSurvey] = []
for p in raw_properties:
property_condition_surveys.push(
property_condition_surveys.append(
mapper.map_asset_conditions_for_property(p, survey_year)
)