Model/backend/ecmk_fetcher/xml_processor.py
2026-04-15 14:29:03 +00:00

221 lines
6.7 KiB
Python

import xml.etree.ElementTree as ET
from typing import Any, List, Optional, TypedDict
from etl.xml_survey_extraction.XmlParser import PROPERTY_TYPE_LOOKUP
from backend.ecmk_fetcher.reports import build_property_id
# This file should ultimately live somewhere different, probably
class Floor(TypedDict):
area_m2: float
height_m: float
heat_loss_perimeter_m: float
party_wall_length_m: float
class Roof(TypedDict, total=False):
construction: int # TODO: map to str
insulation_location: int # TODO: map to str
insulation_thickness_mm: float | str
class BuildingPart(TypedDict):
identifier: str # e.g. "Main Dwelling", "Extension"
floors: List[Floor]
roof: Optional[Roof]
class SapPropertyDetails(TypedDict):
reference: str
address: str
property_type: str
building_parts: List[BuildingPart]
def _get_namespace(tag: str) -> str:
return tag.split("}")[0].strip("{")
def _require_text(value: Optional[str], field: str) -> str:
if value is None:
raise ValueError(f"Missing required field: {field}")
return value
def _parse_float(value: Optional[str], field: str) -> float:
if value is None:
raise ValueError(f"Missing float field: {field}")
return float(value)
def _parse_int(value: Optional[str], field: str) -> int:
if value is None:
raise ValueError(f"Missing int field: {field}")
return int(value)
def _parse_thickness_mm(value: Optional[str]) -> Optional[float | str]:
if value is None:
return None
stripped = value.replace("mm", "").strip()
try:
return float(stripped)
except ValueError:
return stripped
def parse_rdsap(xml_string: str) -> SapPropertyDetails:
root = ET.fromstring(xml_string)
ns_uri: str = _get_namespace(root.tag)
ns: dict[str, str] = {"r": ns_uri}
# --- Address ---
addr_elem = root.find(".//r:Address", ns)
if addr_elem is None:
raise ValueError("Address element not found")
address_line_1: str = addr_elem.findtext("r:Address-Line-1", default="", namespaces=ns)
postcode: str = addr_elem.findtext("r:Postcode", default="", namespaces=ns)
address_parts: List[str] = [
address_line_1,
addr_elem.findtext("r:Address-Line-2", default="", namespaces=ns),
addr_elem.findtext("r:Post-Town", default="", namespaces=ns),
postcode,
]
address: str = ", ".join(part for part in address_parts if part)
reference: str = build_property_id(address_line_1, postcode)
# --- Property Type ---
prop_type_text = root.findtext(".//r:Property-Type", namespaces=ns)
prop_type_code: str = str(_parse_int(prop_type_text, "Property-Type"))
property_type: str = PROPERTY_TYPE_LOOKUP[prop_type_code]
# --- Building Parts ---
building_parts: List[BuildingPart] = []
for bp in root.findall(".//r:SAP-Building-Part", ns):
identifier_text = bp.findtext("r:Identifier", namespaces=ns)
identifier: str = _require_text(identifier_text, "Identifier")
# Floors
floors: List[Floor] = []
for f in bp.findall(".//r:SAP-Floor-Dimension", ns):
area = _parse_float(
f.findtext("r:Total-Floor-Area", namespaces=ns),
"Total-Floor-Area",
)
height = _parse_float(
f.findtext("r:Room-Height", namespaces=ns),
"Room-Height",
)
heat_loss = _parse_float(
f.findtext("r:Heat-Loss-Perimeter", namespaces=ns),
"Heat-Loss-Perimeter",
)
party_wall = _parse_float(
f.findtext("r:Party-Wall-Length", namespaces=ns),
"Party-Wall-Length",
)
floor: Floor = {
"area_m2": area,
"height_m": height,
"heat_loss_perimeter_m": heat_loss,
"party_wall_length_m": party_wall,
}
floors.append(floor)
# Roof (optional)
roof: Optional[Roof] = None
roof_construction_text = bp.findtext("r:Roof-Construction", namespaces=ns)
roof_ins_loc_text = bp.findtext("r:Roof-Insulation-Location", namespaces=ns)
roof_thickness_text = bp.findtext("r:Roof-Insulation-Thickness", namespaces=ns)
if (
roof_construction_text is not None
or roof_ins_loc_text is not None
or roof_thickness_text is not None
):
roof_dict: Roof = {}
if roof_construction_text is not None:
roof_dict["construction"] = _parse_int(
roof_construction_text, "Roof-Construction"
)
if roof_ins_loc_text is not None:
roof_dict["insulation_location"] = _parse_int(
roof_ins_loc_text, "Roof-Insulation-Location"
)
thickness = _parse_thickness_mm(roof_thickness_text)
if thickness is not None:
roof_dict["insulation_thickness_mm"] = thickness
roof = roof_dict
building_part: BuildingPart = {
"identifier": identifier,
"floors": floors,
"roof": roof,
}
building_parts.append(building_part)
result: SapPropertyDetails = {
"reference": reference,
"address": address,
"property_type": property_type,
"building_parts": building_parts,
}
return result
def _normalise_identifier(identifier: str) -> str:
return identifier.lower().replace(" ", "_").replace("-", "_")
def flatten_sap_property(details: SapPropertyDetails) -> dict[str, Any]:
row: dict[str, Any] = {}
row["reference"] = details["reference"]
row["address"] = details["address"]
row["property_type"] = details["property_type"]
for bp in details["building_parts"]:
prefix = _normalise_identifier(bp["identifier"])
for i, floor in enumerate(bp["floors"], start=1):
floor_prefix = f"{prefix}_floor_{i}"
row[f"{floor_prefix}_area_m2"] = floor["area_m2"]
row[f"{floor_prefix}_height_m"] = floor["height_m"]
row[f"{floor_prefix}_heat_loss_perimeter_m"] = floor[
"heat_loss_perimeter_m"
]
row[f"{floor_prefix}_party_wall_length_m"] = floor["party_wall_length_m"]
roof = bp.get("roof")
if roof:
if "construction" in roof:
row[f"{prefix}_roof_construction"] = roof["construction"]
if "insulation_location" in roof:
row[f"{prefix}_roof_insulation_location"] = roof["insulation_location"]
if "insulation_thickness_mm" in roof:
row[f"{prefix}_roof_insulation_thickness_mm"] = roof[
"insulation_thickness_mm"
]
return row