Map peabody data to new structure 🟪

This commit is contained in:
Daniel Roth 2026-01-27 14:47:17 +00:00
parent 0bd5106cb4
commit dc5b43d453

View file

@ -1,4 +1,4 @@
from typing import Any, List, Optional
from typing import Any, Dict, List, Optional, Tuple
from datetime import date
from backend.condition.domain.aspect_condition import AspectCondition
@ -24,57 +24,85 @@ class PeabodyMapper(Mapper):
client_property_data, PeabodyProperty
) # TODO: think of a better way to do this
mapped_elements: List[Element] = []
elements_by_key: dict[tuple[ElementType, int], Element] = {}
for raw_asset in client_property_data.assets:
try:
element_mapping: ElementMapping = PeabodyMapper._map_element(
raw_asset.element_code, raw_asset.sub_element_code
)
except:
logger.warning(
f"""Unrecognised Peabody Asset Element: {raw_asset.element} ({raw_asset.element_code}),
Sub-Element: {raw_asset.sub_element} ({raw_asset.sub_element_code}). Skipping record"""
)
continue
element_mapping = PeabodyMapper._safe_map_element(raw_asset)
aspect_condition = AspectCondition(
aspect_type=element_mapping.aspect_type,
aspect_instance=element_mapping.aspect_instance or 1,
value=raw_asset.material_or_answer,
quantity=raw_asset.renewal_quantity,
install_date=None, # Not available in peabody data
renewal_year=raw_asset.renewal_year,
comments=None, # Not available in peabody data
)
matching_element_type_instance: Optional[Element] = (
PeabodyMapper._check_for_element_type_and_instance(
mapped_elements,
element_mapping.element,
element_mapping.element_instance or 1,
)
aspect_condition = PeabodyMapper._build_aspect_condition(
raw_asset, element_mapping
)
if not matching_element_type_instance:
mapped_elements.append(
Element(
element_type=element_mapping.element,
element_instance=element_mapping.element_instance or 1,
aspect_conditions=[aspect_condition],
)
)
else:
matching_element_type_instance.aspect_conditions.append(
aspect_condition
)
element_key = (
element_mapping.element,
element_mapping.element_instance or 1,
)
PeabodyMapper._attach_aspect_condition_to_element(
elements_by_key,
element_key,
aspect_condition,
)
return PropertyConditionSurvey(
uprn=client_property_data.uprn,
elements=mapped_elements,
date=date(2000, 1, 1), # Temp. Not sure how to get this
elements=list(elements_by_key.values()),
date=date(2000, 1, 1), # Temp - not sure how to get this
source="Peabody", # TODO: Make this the system, not the client
)
@staticmethod
def _safe_map_element(raw_asset) -> Optional[ElementMapping]:
try:
return PeabodyMapper._map_element(
raw_asset.element_code,
raw_asset.sub_element_code,
)
except KeyError:
logger.warning(
f"Unrecognised Peabody Asset Element: "
f"{raw_asset.element} ({raw_asset.element_code}), "
f"Sub-Element: {raw_asset.sub_element} ({raw_asset.sub_element_code}). "
"Skipping record"
)
return None
@staticmethod
def _map_element(element_code: int, sub_element_code: int) -> ElementMapping:
return PEABODY_ELEMENT_MAP[(element_code, sub_element_code)]
@staticmethod
def _attach_aspect_condition_to_element(
elements_by_key: Dict[Tuple[ElementType, int], Element],
element_key: Tuple[ElementType, int],
aspect_condition: AspectCondition,
) -> None:
element = elements_by_key.get(element_key)
if element is None:
element = Element(
element_type=element_key[0],
element_instance=element_key[1],
aspect_conditions=[],
)
elements_by_key[element_key] = element
element.aspect_conditions.append(aspect_condition)
@staticmethod
def _build_aspect_condition(
raw_asset, element_mapping: ElementMapping
) -> AspectCondition:
return AspectCondition(
aspect_type=element_mapping.aspect_type,
aspect_instance=element_mapping.aspect_instance or 1,
value=raw_asset.material_or_answer,
quantity=raw_asset.renewal_quantity,
install_date=None,
renewal_year=raw_asset.renewal_year,
comments=None,
)
@staticmethod
def _check_for_element_type_and_instance(
elements: List[Element], type: ElementType, instance: int
@ -83,7 +111,3 @@ class PeabodyMapper(Mapper):
if e.element_type == type and e.element_instance == instance:
return e
return None
@staticmethod
def _map_element(element_code: int, sub_element_code: int) -> ElementMapping:
return PEABODY_ELEMENT_MAP[(element_code, sub_element_code)]