Model/backend/documents_parser/extractor.py
2026-04-16 14:11:44 +00:00

258 lines
10 KiB
Python

from datetime import datetime
from typing import List, Optional
from datatypes.epc.surveys.pashub_rdsap_site_notes import (
BuildingConstruction,
BuildingMeasurements,
ExtensionConstruction,
ExtensionMeasurements,
ExtensionRoofSpace,
FloorConstruction,
FloorMeasurement,
General,
MainBuildingConstruction,
MainBuildingMeasurements,
PasHubRdSapSiteNotes,
RoofSpace,
RoofSpaceDetail,
)
class PasHubRdSapSiteNotesExtractor:
def __init__(self, text_list: list[str]) -> None:
self.text_list = text_list
# --- generic helpers ---
def _get(self, key: str, offset: int = 1) -> Optional[str]:
try:
idx = self.text_list.index(key)
return self.text_list[idx + offset].strip() or None
except (ValueError, IndexError):
return None
def _bool(self, key: str, offset: int = 1) -> bool:
val = self._get(key, offset)
return val is not None and val.lower() == "yes"
def _get_in(self, lst: List[str], key: str, offset: int = 1) -> Optional[str]:
try:
idx = lst.index(key)
return lst[idx + offset].strip() or None
except (ValueError, IndexError):
return None
def _bool_in(self, lst: List[str], key: str) -> bool:
val = self._get_in(lst, key)
return val is not None and val.lower() == "yes"
def _is_known_in(self, lst: List[str], key: str) -> bool:
val = self._get_in(lst, key)
return val is not None and val.lower() != "not known"
def _wall_thickness_in(self, lst: List[str]) -> int:
val = self._get_in(lst, "Wall thickness:")
return int(val.split()[0]) if val else 0
def _section(self, start: str, end: str) -> List[str]:
try:
start_idx = self.text_list.index(start)
end_idx = self.text_list.index(end, start_idx)
return self.text_list[start_idx:end_idx]
except ValueError:
return []
# --- public extract methods ---
def extract(self) -> PasHubRdSapSiteNotes:
raise NotImplementedError
def extract_general(self) -> General:
inspection_date_raw = self._get("Inspection Date:")
inspection_date = (
datetime.strptime(inspection_date_raw, "%d/%m/%Y").strftime("%Y-%m-%d")
if inspection_date_raw
else ""
)
storeys_raw = self._get("Number of storeys:") or "0"
extensions_raw = self._get("Number of Extensions:") or "0"
return General(
epc_checked_before_assessment=self._bool(
"Confirm you have checked for the existence of an", offset=2
),
epc_exists_at_point_of_assessment=self._bool(
"Does an EPC exist at the point of carrying out this", offset=2
),
inspection_date=inspection_date,
transaction_type=self._get("Transaction Type:") or "",
tenure=self._get("Tenure:") or "",
property_type=self._get("Type of Property:") or "",
detachment_type=self._get("Detachment Type:") or "",
number_of_storeys=int(storeys_raw.split()[0]),
terrain_type=self._get("Terrain Type:") or "",
number_of_extensions=int(extensions_raw.split()[0]),
electricity_smart_meter=self._bool("Is an electricity smart meter present?"),
electric_meter_type=self._get("Electric meter type:") or "",
dwelling_export_capable=self._bool("Is the dwelling export-capable?"),
mains_gas_available=self._bool("Is mains gas available?"),
gas_smart_meter=self._bool("Is there a gas smart meter?"),
gas_meter_accessible=self._bool("Is the gas meter accessible?"),
measurements_location=self._get("Select Measurements Location:") or "",
)
def extract_building_construction(self) -> BuildingConstruction:
bc_section = self._section("Building Construction", "Building Measurements")
# Find extension markers within this section
extension_markers = []
i = 1
while f"Extension {i}" in bc_section:
extension_markers.append(f"Extension {i}")
i += 1
# Slice main building data: from "Main Building" to first extension or end
main_start = bc_section.index("Main Building")
main_end = (
bc_section.index(extension_markers[0])
if extension_markers
else len(bc_section)
)
main_data = bc_section[main_start:main_end]
# Slice each extension's data
extensions = []
for n, marker in enumerate(extension_markers):
ext_start = bc_section.index(marker)
ext_end = (
bc_section.index(extension_markers[n + 1])
if n + 1 < len(extension_markers)
else len(bc_section)
)
ext_data = bc_section[ext_start:ext_end]
extensions.append(self._parse_extension_construction(n + 1, ext_data))
return BuildingConstruction(
main_building=self._parse_main_building_construction(main_data),
floor=self._parse_floor_construction(main_data),
extensions=extensions if extensions else None,
)
# --- private parsing helpers ---
def _parse_main_building_construction(
self, data: List[str]
) -> MainBuildingConstruction:
return MainBuildingConstruction(
age_range=self._get_in(data, "Age Range:") or "",
age_indicators=self._get_in(data, "Record indicators of property age:") or "",
walls_construction_type=self._get_in(data, "Walls - Construction Type:") or "",
cavity_construction_indicators=self._get_in(
data, "Record external indicators of Cavity Construction:"
) or "",
walls_insulation_type=self._get_in(data, "Walls - Insulation Type:") or "",
filled_cavity_indicators=self._get_in(
data, "Record indicators of filled cavity:"
),
thermal_conductivity_of_wall_insulation=self._get_in(
data, "Thermal conductivity of wall insulation:"
) or "",
wall_u_value_known=self._is_known_in(data, "Wall U-Value known?"),
wall_thickness_mm=self._wall_thickness_in(data),
party_wall_construction_type=self._get_in(
data, "Party wall construction type:"
) or "",
)
def _parse_extension_construction(
self, ext_id: int, data: List[str]
) -> ExtensionConstruction:
return ExtensionConstruction(
id=ext_id,
age_range=self._get_in(data, "Age Range:") or "",
age_indicators=self._get_in(data, "Record indicators of property age:") or "",
walls_construction_type=self._get_in(data, "Walls - Construction Type:") or "",
cavity_construction_indicators=self._get_in(
data, "Record external indicators of Cavity Construction:"
) or "",
walls_insulation_type=self._get_in(data, "Walls - Insulation Type:") or "",
filled_cavity_indicators=self._get_in(
data, "Record indicators of filled cavity:"
),
thermal_conductivity_of_wall_insulation=self._get_in(
data, "Thermal conductivity of wall insulation:"
) or "",
wall_u_value_known=self._is_known_in(data, "Wall U-Value known?"),
wall_thickness_mm=self._wall_thickness_in(data),
party_wall_construction_type=self._get_in(
data, "Party wall construction type:"
) or "",
)
def extract_building_measurements(self) -> BuildingMeasurements:
bm_section = self._section("Building Measurements", "Roof Space")
extension_markers = []
i = 1
while f"Extension {i}" in bm_section:
extension_markers.append(f"Extension {i}")
i += 1
main_start = bm_section.index("Main Building")
main_end = (
bm_section.index(extension_markers[0])
if extension_markers
else len(bm_section)
)
main_floors = self._parse_floor_measurements(bm_section[main_start:main_end])
extensions = []
for n, marker in enumerate(extension_markers):
ext_start = bm_section.index(marker)
ext_end = (
bm_section.index(extension_markers[n + 1])
if n + 1 < len(extension_markers)
else len(bm_section)
)
extensions.append(
ExtensionMeasurements(
id=n + 1,
floors=self._parse_floor_measurements(bm_section[ext_start:ext_end]),
)
)
return BuildingMeasurements(
main_building=MainBuildingMeasurements(floors=main_floors),
extensions=extensions if extensions else None,
)
def extract_roof_space(self) -> RoofSpace:
raise NotImplementedError
def _parse_floor_measurements(self, data: List[str]) -> List[FloorMeasurement]:
floors = []
i = 0
while i < len(data):
if data[i].startswith("Floor") and i + 4 < len(data):
floors.append(
FloorMeasurement(
name=data[i],
area_m2=float(data[i + 1]),
height_m=float(data[i + 2]),
heat_loss_perimeter_m=float(data[i + 3]),
pwl_m=float(data[i + 4]),
)
)
i += 5
else:
i += 1
return floors
def _parse_floor_construction(self, data: List[str]) -> FloorConstruction:
return FloorConstruction(
floor_type=self._get_in(data, "Floor type:") or "",
floor_construction=self._get_in(data, "Floor Construction:") or "",
floor_insulation_type=self._get_in(data, "Floor Insulation Type:") or "",
floor_u_value_known=self._is_known_in(data, "Floor U-Value known?"),
)