Parse Peabody condition data xlsx 🟩

This commit is contained in:
Daniel Roth 2026-01-21 16:25:58 +00:00
parent d43d9d9069
commit 4e190328cc
4 changed files with 147 additions and 7 deletions

View file

@ -8,13 +8,13 @@ from backend.condition.parsing.records.lbwf.lbwf_house import LbwfHouse
from backend.condition.utils.date_utils import normalise_date
from utils.logger import setup_logger
logger = setup_logger
logger = setup_logger()
class LbwfParser(Parser):
def parse(self, file_stream: BinaryIO) -> Any:
wb: Workbook = load_workbook(file_stream)
address_to_uprn_map: Dict[str, int] = self._generate_address_to_uprn_dict(wb)
address_to_uprn_map: Dict[str, int] = LbwfParser._generate_address_to_uprn_dict(wb)
assets = self._parse_assets(wb)
houses = self._parse_houses(wb, address_to_uprn_map)
@ -132,7 +132,7 @@ class LbwfParser(Parser):
@staticmethod
def _generate_address_to_uprn_dict(wb: Workbook) -> Dict[str, int | None]:
sheet: Workbook = wb["All Energy Breakdown "]
sheet = wb["All Energy Breakdown "]
rows: Iterator[Tuple[object | None, ...]] = sheet.iter_rows(values_only=True)
@ -159,6 +159,7 @@ class LbwfParser(Parser):
return mapping
@staticmethod
def _get_column_indexes_by_name(
headers: Tuple[object | None, ...]
) -> Dict[str, int]:
@ -170,6 +171,7 @@ class LbwfParser(Parser):
return index
@staticmethod
def _get_uprn_from_address(address: str, address_to_uprn_map: Dict[str, int]) -> int | None:
pseudo_name = address.split(",")[0]

View file

@ -1,7 +1,143 @@
from typing import Any, BinaryIO
from backend.condition.parsing.parser import Parser
from typing import Any, BinaryIO, Dict, Iterator, List, Tuple, DefaultDict
from openpyxl import Workbook, load_workbook
from collections import defaultdict
from backend.condition.parsing.parser import Parser
from backend.condition.parsing.records.peabody.peabody_asset_condition import PeabodyAssetCondition
from backend.condition.parsing.records.peabody.peabody_property import PeabodyProperty
from utils.logger import setup_logger
logger = setup_logger()
class PeabodyParser(Parser):
def parse(self, file_stream: BinaryIO) -> Any:
raise NotImplementedError
wb: Workbook = load_workbook(file_stream)
address_to_uprn_map: Dict[str, int] = PeabodyParser._generate_address_to_uprn_dict(wb)
assets = self._parse_assets(wb)
return self._group_assets_into_properties(
assets=assets,
address_to_uprn_map=address_to_uprn_map,
)
@staticmethod
def _parse_assets(wb: Workbook) -> List[PeabodyAssetCondition]:
assets_sheet = wb["Survey Records - D & Lower"]
asset_rows = assets_sheet.iter_rows(values_only=True)
asset_headers = next(asset_rows)
asset_header_indexes = PeabodyParser._get_column_indexes_by_name(asset_headers)
assets: List[PeabodyAssetCondition] = []
for row in asset_rows:
try:
assets.append(
PeabodyParser._map_row_to_asset_record(row, asset_header_indexes)
)
except Exception as e:
logger.error(f"Error mapping Peabody row to asset record: {e}")
continue
return assets
@staticmethod
def _group_assets_into_properties(
assets: List[PeabodyAssetCondition],
address_to_uprn_map: Dict[str, int],
) -> List[PeabodyProperty]:
assets_by_address: DefaultDict[str, List[PeabodyAssetCondition]] = defaultdict(list)
for asset in assets:
if asset.full_address is None:
continue
address = asset.full_address
assets_by_address[address].append(asset)
properties: List[PeabodyProperty] = []
for address, grouped_assets in assets_by_address.items():
uprn = address_to_uprn_map.get(address)
if uprn is None:
logger.warning(f"No UPRN found for address: {address}")
continue
properties.append(
PeabodyProperty(
uprn=uprn,
assets=grouped_assets,
)
)
return properties
@staticmethod
def _map_row_to_asset_record(
row: Any | Tuple[object | None, ...],
header_indexes: Dict[str, int],
) -> PeabodyAssetCondition:
return PeabodyAssetCondition(
lo_reference=row[header_indexes["Lo_Reference"]],
full_address=row[header_indexes["full_address"]],
location_type_code=row[header_indexes["location_type_code"]],
parent_lo_reference=row[header_indexes["Parent_Lo_Reference"]],
element_code=row[header_indexes["Element_Code"]],
element=row[header_indexes["Element"]],
sub_element_code=row[header_indexes["Sub_Element_Code"]],
sub_element=row[header_indexes["Sub_Element"]],
material_code=row[header_indexes["Material_Code"]],
material_or_answer=row[header_indexes["material_or_answer"]],
renewal_quantity=row[header_indexes["Renewal_Quantity"]],
renewal_year=row[header_indexes["Renewal_Year"]],
renewal_cost=row[header_indexes["Renewal_Cost"]],
cloned=row[header_indexes["cloned"]],
lo_type_code=row[header_indexes["lo_type_code"]],
condition_survey_date=row[header_indexes["condition_survey_date"]],
)
@staticmethod
def _generate_address_to_uprn_dict(wb: Workbook) -> Dict[str, int | None]:
sheet = wb["Survey Records - D & Lower"]
rows: Iterator[Tuple[object | None, ...]] = sheet.iter_rows(values_only=True)
headers = next(rows)
header_indexes: Dict[str, int] = PeabodyParser._get_column_indexes_by_name(headers)
address_idx = header_indexes["full_address"]
address_to_uprn: Dict[str, int] = {}
# Generate random UPRNs for now
next_uprn = 1 # TODO: get real UPRNs
for row in rows:
address = row[address_idx]
if address is None:
continue
# Optional normalization
address = address.strip()
if address not in address_to_uprn:
address_to_uprn[address] = next_uprn
next_uprn += 1
return address_to_uprn
@staticmethod
def _get_column_indexes_by_name(
headers: Tuple[object | None, ...]
) -> Dict[str, int]:
index: Dict[str, int] = {}
for i, header in enumerate(headers):
if isinstance(header, str):
index[header] = i
return index

View file

@ -15,7 +15,7 @@ class PeabodyAssetCondition:
material_code: int
material_or_answer: str
renewal_quantity: int
renewal: int
renewal_year: int
cloned: str
lo_type_code: int
renewal_cost: Optional[float] = None

View file

@ -5,5 +5,7 @@ from backend.condition.parsing.records.peabody.peabody_asset_condition import Pe
@dataclass
class PeabodyProperty:
# This could just be a uprn:assets dict, but making it a dataclass for consistency with
# other client models, might change in future
uprn: int
assets: List[PeabodyAssetCondition]