Model/backend/documents_parser/extractor.py
2026-04-21 15:17:34 +00:00

692 lines
28 KiB
Python

from datetime import datetime
from typing import List, Optional
from datatypes.epc.surveys.pashub_rdsap_site_notes import (
BuildingConstruction,
InspectionMetadata,
BuildingMeasurements,
Conservatories,
CustomerResponse,
ExtensionConstruction,
ExtensionMeasurements,
ExtensionRoofSpace,
FloorConstruction,
FloorMeasurement,
General,
HeatingAndHotWater,
MainBuildingConstruction,
MainBuildingMeasurements,
MainHeating,
PasHubRdSapSiteNotes,
Renewables,
RoomCountElements,
RoofSpace,
RoofSpaceDetail,
SecondaryHeating,
Shower,
SurveyAddendum,
Ventilation,
WaterHeating,
WaterUse,
Window,
)
class PasHubRdSapSiteNotesExtractor:
def __init__(self, text_list: list[str]) -> None:
self.text_list = text_list
# --- generic helpers ---
def _get_in_doc(self, key: str, offset: int = 1) -> Optional[str]:
return self._get_in(self.text_list, key, offset)
def _bool(self, key: str, offset: int = 1) -> bool:
val = self._get_in_doc(key, offset)
return val is not None and val.lower() == "yes"
def _get_in(
self, list_to_process: List[str], key: str, offset: int = 1
) -> Optional[str]:
try:
idx = list_to_process.index(key)
return list_to_process[idx + offset].strip() or None
except (ValueError, IndexError):
return None
def _bool_in(self, list_to_process: List[str], key: str, offset: int = 1) -> bool:
val = self._get_in(list_to_process, key, offset)
return val is not None and val.lower() == "yes"
def _optional_bool_in(self, list_to_process: List[str], key: str) -> Optional[bool]:
val = self._get_in(list_to_process, key)
return None if val is None else val.lower() == "yes"
def _is_known_in(self, list_to_process: List[str], key: str) -> bool:
val = self._get_in(list_to_process, key)
return val is not None and val.lower() != "not known"
def _wall_thickness_in(self, list_to_process: List[str]) -> int:
val = self._get_in(list_to_process, "Wall thickness:")
return int(val.split()[0]) if val else 0
def _section(self, start: str, end: str) -> List[str]:
try:
start_idx = self.text_list.index(start)
end_idx = self.text_list.index(end, start_idx)
return self.text_list[start_idx:end_idx]
except ValueError:
return []
# --- public extract methods ---
def extract_inspection_metadata(self) -> InspectionMetadata:
try:
addr_start = self.text_list.index("Property Address:") + 1
addr_end = self.text_list.index("Property Photo", addr_start)
property_address = ", ".join(
t.rstrip(",") for t in self.text_list[addr_start:addr_end]
)
except ValueError:
property_address = ""
created_on_raw = self._get_in_doc("Created On:")
created_on = (
datetime.strptime(created_on_raw, "%d %B %Y").strftime("%Y-%m-%d")
if created_on_raw
else ""
)
date_of_inspection_raw = self._get_in_doc("Date of Inspection:")
if not date_of_inspection_raw:
raise ValueError("Date of Inspection not found in document")
date_of_inspection = datetime.strptime(
date_of_inspection_raw, "%d %B %Y"
).date()
return InspectionMetadata(
inspection_surveyor=self._get_in_doc("Inspection Surveyor:") or "",
email_address=self._get_in_doc("E-Mail Address:") or "",
report_reference=self._get_in_doc("Report Reference:") or "",
created_on=created_on,
date_of_inspection=date_of_inspection,
property_address=property_address,
property_photo="Property Photo" in self.text_list,
)
def extract(self) -> PasHubRdSapSiteNotes:
return PasHubRdSapSiteNotes(
inspection_metadata=self.extract_inspection_metadata(),
general=self.extract_general(),
building_construction=self.extract_building_construction(),
building_measurements=self.extract_building_measurements(),
roof_space=self.extract_roof_space(),
windows=self.extract_windows(),
heating_and_hot_water=self.extract_heating_and_hot_water(),
ventilation=self.extract_ventilation(),
conservatories=self.extract_conservatories(),
renewables=self.extract_renewables(),
room_count_elements=self.extract_room_count_elements(),
water_use=self.extract_water_use(),
customer_response=self.extract_customer_response(),
addendum=self.extract_addendum(),
)
def extract_general(self) -> General:
inspection_date_raw = self._get_in_doc("Inspection Date:")
if not inspection_date_raw:
raise ValueError("Inspection Date not found in document")
inspection_date = datetime.strptime(inspection_date_raw, "%d/%m/%Y").date()
storeys_raw = self._get_in_doc("Number of storeys:") or "0"
extensions_raw = self._get_in_doc("Number of Extensions:") or "0"
_extensions_first = extensions_raw.split()[0]
extensions_count = int(_extensions_first) if _extensions_first.isdigit() else 0
return General(
epc_checked_before_assessment=self._bool(
"Confirm you have checked for the existence of an", offset=2
),
epc_exists_at_point_of_assessment=self._bool(
"Does an EPC exist at the point of carrying out this", offset=2
),
inspection_date=inspection_date,
transaction_type=self._get_in_doc("Transaction Type:") or "",
tenure=self._get_in_doc("Tenure:") or "",
property_type=self._get_in_doc("Type of Property:") or "",
detachment_type=self._get_in_doc("Detachment Type:") or "",
number_of_storeys=int(storeys_raw.split()[0]),
terrain_type=self._get_in_doc("Terrain Type:") or "",
number_of_extensions=extensions_count,
electricity_smart_meter=self._bool(
"Is an electricity smart meter present?"
),
electric_meter_type=self._get_in_doc("Electric meter type:") or "",
dwelling_export_capable=self._bool("Is the dwelling export-capable?"),
mains_gas_available=self._bool("Is mains gas available?"),
gas_smart_meter=self._bool("Is there a gas smart meter?"),
gas_meter_accessible=self._bool("Is the gas meter accessible?"),
measurements_location=self._get_in_doc("Select Measurements Location:")
or "",
)
def extract_building_construction(self) -> BuildingConstruction:
bc_section = self._section("Building Construction", "Building Measurements")
# Find extension markers within this section
extension_markers = []
i = 1
while f"Extension {i}" in bc_section:
extension_markers.append(f"Extension {i}")
i += 1
# Slice main building data: from "Main Building" to first extension or end
main_start = bc_section.index("Main Building")
main_end = (
bc_section.index(extension_markers[0])
if extension_markers
else len(bc_section)
)
main_data = bc_section[main_start:main_end]
# Slice each extension's data
extensions = []
for n, marker in enumerate(extension_markers):
ext_start = bc_section.index(marker)
ext_end = (
bc_section.index(extension_markers[n + 1])
if n + 1 < len(extension_markers)
else len(bc_section)
)
ext_data = bc_section[ext_start:ext_end]
extensions.append(self._parse_extension_construction(n + 1, ext_data))
return BuildingConstruction(
main_building=self._parse_main_building_construction(main_data),
floor=self._parse_floor_construction(main_data),
extensions=extensions if extensions else None,
)
# --- private parsing helpers ---
def _parse_main_building_construction(
self, data: List[str]
) -> MainBuildingConstruction:
return MainBuildingConstruction(
age_range=self._get_in(data, "Age Range:") or "",
age_indicators=self._get_in(data, "Record indicators of property age:")
or "",
walls_construction_type=self._get_in(data, "Walls - Construction Type:")
or "",
cavity_construction_indicators=self._get_in(
data, "Record external indicators of Cavity Construction:"
)
or "",
walls_insulation_type=self._get_in(data, "Walls - Insulation Type:") or "",
filled_cavity_indicators=self._get_in(
data, "Record indicators of filled cavity:"
),
thermal_conductivity_of_wall_insulation=self._get_in(
data, "Thermal conductivity of wall insulation:"
)
or "",
wall_u_value_known=self._is_known_in(data, "Wall U-Value known?"),
wall_thickness_mm=self._wall_thickness_in(data),
party_wall_construction_type=self._get_in(
data, "Party wall construction type:"
)
or "",
)
def _parse_extension_construction(
self, ext_id: int, data: List[str]
) -> ExtensionConstruction:
return ExtensionConstruction(
id=ext_id,
age_range=self._get_in(data, "Age Range:") or "",
age_indicators=self._get_in(data, "Record indicators of property age:")
or "",
walls_construction_type=self._get_in(data, "Walls - Construction Type:")
or "",
cavity_construction_indicators=self._get_in(
data, "Record external indicators of Cavity Construction:"
)
or "",
walls_insulation_type=self._get_in(data, "Walls - Insulation Type:") or "",
filled_cavity_indicators=self._get_in(
data, "Record indicators of filled cavity:"
),
thermal_conductivity_of_wall_insulation=self._get_in(
data, "Thermal conductivity of wall insulation:"
)
or "",
wall_u_value_known=self._is_known_in(data, "Wall U-Value known?"),
wall_thickness_mm=self._wall_thickness_in(data),
party_wall_construction_type=self._get_in(
data, "Party wall construction type:"
)
or "",
)
def extract_building_measurements(self) -> BuildingMeasurements:
bm_section = self._section("Building Measurements", "Roof Space")
extension_markers = []
i = 1
while f"Extension {i}" in bm_section:
extension_markers.append(f"Extension {i}")
i += 1
main_start = bm_section.index("Main Building")
main_end = (
bm_section.index(extension_markers[0])
if extension_markers
else len(bm_section)
)
main_floors = self._parse_floor_measurements(bm_section[main_start:main_end])
extensions = []
for n, marker in enumerate(extension_markers):
ext_start = bm_section.index(marker)
ext_end = (
bm_section.index(extension_markers[n + 1])
if n + 1 < len(extension_markers)
else len(bm_section)
)
extensions.append(
ExtensionMeasurements(
id=n + 1,
floors=self._parse_floor_measurements(
bm_section[ext_start:ext_end]
),
)
)
return BuildingMeasurements(
main_building=MainBuildingMeasurements(floors=main_floors),
extensions=extensions if extensions else None,
)
def extract_roof_space(self) -> RoofSpace:
rs_section = self._section("Roof Space", "Windows")
extension_markers = []
i = 1
while f"Extension {i}" in rs_section:
extension_markers.append(f"Extension {i}")
i += 1
main_start = rs_section.index("Main Building")
main_end = (
rs_section.index(extension_markers[0])
if extension_markers
else len(rs_section)
)
main_data = rs_section[main_start:main_end]
extensions = []
for n, marker in enumerate(extension_markers):
ext_start = rs_section.index(marker)
ext_end = (
rs_section.index(extension_markers[n + 1])
if n + 1 < len(extension_markers)
else len(rs_section)
)
ext_data = rs_section[ext_start:ext_end]
extensions.append(self._parse_extension_roof_space(n + 1, ext_data))
return RoofSpace(
main_building=self._parse_roof_space_detail(main_data),
extensions=extensions if extensions else None,
)
def extract_windows(self) -> List[Window]:
w_section = self._section("Windows", "Heating & Hot Water")
windows = []
n = 1
while f"Window {n}" in w_section:
start = w_section.index(f"Window {n}")
end = (
w_section.index(f"Window {n + 1}")
if f"Window {n + 1}" in w_section
else len(w_section)
)
windows.append(self._parse_window(n, w_section[start:end]))
n += 1
return windows
def extract_heating_and_hot_water(self) -> HeatingAndHotWater:
hhw_section = self._section("Heating & Hot Water", "Ventilation")
return HeatingAndHotWater(
main_heating=self._parse_main_heating(hhw_section),
secondary_heating=self._parse_secondary_heating(hhw_section),
water_heating=self._parse_water_heating(hhw_section),
)
def extract_ventilation(self) -> Ventilation:
v_section = self._section("Ventilation", "Conservatories")
return Ventilation(
ventilation_type=self._get_in(v_section, "Ventilation type:") or "",
has_fixed_air_conditioning=self._bool_in(
v_section, "Has fixed air conditioning?"
),
number_of_open_flues=int(
self._get_in(v_section, "Number of open flues:") or 0
),
number_of_closed_flues=int(
self._get_in(v_section, "Number of closed flues:") or 0
),
number_of_boiler_flues=int(
self._get_in(v_section, "Number of boiler flues:") or 0
),
number_of_other_flues=int(
self._get_in(v_section, "Number of other flues:") or 0
),
number_of_extract_fans=int(
self._get_in(v_section, "Number of extract fans:") or 0
),
number_of_passive_vents=int(
self._get_in(v_section, "Number of passive vents:") or 0
),
number_of_flueless_gas_fires=int(
self._get_in(v_section, "Number of flueless gas fires:") or 0
),
pressure_test=self._get_in(v_section, "Pressure test:") or "",
draught_lobby=self._bool_in(v_section, "Is there a draught lobby?"),
ventilation_in_pcdf_database=self._optional_bool_in(
v_section, "Is the ventilation in the PCDF database?"
),
)
def extract_conservatories(self) -> Conservatories:
c_section = self._section("Conservatories", "Renewables")
val = self._get_in(c_section, "Is there conservatory?")
return Conservatories(
has_conservatory=val is not None and val.lower() != "no conservatory"
)
def extract_renewables(self) -> Renewables:
r_section = self._section("Renewables", "Room Count Elements")
batteries_raw = self._get_in(r_section, "Number of PV batteries:")
batteries = (
0
if batteries_raw is None or batteries_raw.lower() == "none"
else int(batteries_raw)
)
pv_connection = self._get_in(r_section, "PV Connection:")
percent_raw = self._get_in(r_section, "Percentage of roof covered with photovoltaic array?")
percent_roof = int(percent_raw.split()[0]) if percent_raw else None
return Renewables(
wind_turbines=self._bool_in(r_section, "Has wind turbines?"),
solar_hot_water=self._bool_in(r_section, "Has solar hot water?"),
photovoltaic_array=self._bool_in(r_section, "Has photovoltaic array?"),
number_of_pv_batteries=batteries,
hydro=self._bool_in(r_section, "Is the dwelling connected to Hydro?"),
pv_connection=pv_connection,
percent_roof_covered_pv=percent_roof,
)
def extract_room_count_elements(self) -> RoomCountElements:
rce_section = self._section("Room Count Elements", "Customer Response")
heated_rooms_raw = self._get_in(rce_section, "Number of heated rooms?")
return RoomCountElements(
number_of_habitable_rooms=int(
self._get_in(rce_section, "Number of habitable rooms?") or 0
),
any_unheated_rooms=self._bool_in(
rce_section, "Are any of these rooms unheated?"
),
number_of_heated_rooms=int(heated_rooms_raw) if heated_rooms_raw else None,
number_of_external_doors=int(
self._get_in(rce_section, "Number of external doors?") or 0
),
number_of_insulated_external_doors=int(
self._get_in(rce_section, "Number of insulated external doors?") or 0
),
number_of_draughtproofed_external_doors=int(
self._get_in(rce_section, "Number of draughtproofed external doors?")
or 0
),
number_of_open_chimneys=int(
self._get_in(rce_section, "Number of open chimneys?") or 0
),
number_of_blocked_chimneys=int(
self._get_in(rce_section, "Number of blocked chimneys?") or 0
),
number_of_fixed_incandescent_bulbs=int(
self._get_in(rce_section, "Number of fixed incandescent bulbs:") or 0
),
exact_led_cfl_count_known=self._bool_in(
rce_section, "Is the exact number of LED and CFL bulbs known?"
),
number_of_fixed_led_bulbs=int(
self._get_in(rce_section, "Number of fixed LED bulbs:") or 0
),
number_of_fixed_cfl_bulbs=int(
self._get_in(rce_section, "Number of fixed CFL bulbs:") or 0
),
waste_water_heat_recovery=self._get_in(
rce_section, "Are there any waste water heat recovery systems?"
)
or "",
)
def extract_water_use(self) -> WaterUse:
wu_section = self._section("Room Count Elements", "Customer Response")
baths_raw = self._get_in(wu_section, "Number of baths:") or "0"
special_raw = (
self._get_in(
wu_section, "How many special features are there at the", offset=2
)
or "0"
)
showers = []
n = 1
while f"Shower {n}" in wu_section:
start = wu_section.index(f"Shower {n}")
end = (
wu_section.index(f"Shower {n + 1}")
if f"Shower {n + 1}" in wu_section
else len(wu_section)
)
shower_data = wu_section[start:end]
showers.append(
Shower(
id=n,
outlet_type=self._get_in(shower_data, "Shower outlet type:") or "",
)
)
n += 1
return WaterUse(
number_of_baths=int(baths_raw),
number_of_special_features=int(special_raw),
showers=showers,
)
def extract_customer_response(self) -> CustomerResponse:
cr_section = self._section(
"Customer Response", "Addendum + Related Party Disclosure"
)
return CustomerResponse(
customer_present=self._bool_in(cr_section, "Customer present?"),
willing_to_answer_satisfaction_survey=self._bool_in(
cr_section, "Customer willing to answer satisfaction survey?"
),
)
def extract_addendum(self) -> SurveyAddendum:
a_section = self._section(
"Addendum + Related Party Disclosure", "Photographs Required"
)
return SurveyAddendum(
addendum=self._get_in(a_section, "Addendum") or "",
related_party_disclosure=self._get_in(a_section, "Related party disclosure")
or "",
hard_to_treat_cavity_access_issues=self._bool_in(
a_section,
"Hard to treat cavity walls: Property has access",
offset=2,
),
hard_to_treat_cavity_high_exposure=self._bool_in(
a_section,
"Hard to treat cavity walls: Property has high",
offset=2,
),
hard_to_treat_cavity_narrow_cavities=self._bool_in(
a_section,
"Hard to treat cavity walls: Property has narrow",
offset=2,
),
)
def _parse_main_heating(self, data: List[str]) -> MainHeating:
return MainHeating(
selection_method=self._get_in(
data, "How would you like to select the Heating System?"
)
or "",
system_type=self._get_in(data, "System type:") or "",
product_id=int(self._get_in(data, "Product Id") or 0),
manufacturer=self._get_in(data, "Manufacturer") or "",
model=self._get_in(data, "Model") or "",
orig_manufacturer=self._get_in(data, "Orig Manuf") or "",
fuel=self._get_in(data, "Fuel") or "",
summer_efficiency=float(self._get_in(data, "S. Efficiency") or 0),
type=self._get_in(data, "Type") or "",
condensing=self._bool_in(data, "Condensing"),
year=self._get_in(data, "Year") or "",
mount=self._get_in(data, "Mount") or "",
open_flue=self._get_in(data, "Open Flue") or "",
fan_assist=self._bool_in(data, "Fan Assist"),
status=self._get_in(data, "Status") or "",
central_heating_pump_age=self._get_in(data, "Central heating pump age:")
or "",
controls=self._get_in(data, "Controls:") or "",
flue_gas_heat_recovery_system=self._bool_in(
data, "Does the boiler have a Flue Gas Heat Recover", offset=2
),
weather_compensator=self._bool_in(data, "Is there a weather compensator?"),
emitter=self._get_in(data, "Emitter:") or "",
emitter_temperature=self._get_in(data, "Emitter Temperature:") or "",
)
def _parse_secondary_heating(self, data: List[str]) -> SecondaryHeating:
system_raw = self._get_in(data, "Secondary System:")
return SecondaryHeating(
secondary_fuel=self._get_in(data, "Secondary Fuel") or "",
secondary_system=system_raw if system_raw else None,
)
def _parse_water_heating(self, data: List[str]) -> WaterHeating:
thickness_raw = self._get_in(data, "Insulation Thickness (mm):") or self._get_in(data, "Thickness:")
thickness_mm = int(thickness_raw.split()[0]) if thickness_raw else None
return WaterHeating(
type=self._get_in(data, "Water Heating Type:") or "",
system=self._get_in(data, "Water Heating System:") or "",
cylinder_size=self._get_in(data, "Cylinder Size:") or "",
cylinder_measured_heat_loss=self._get_in(
data, "Cylinder Measured Heat Loss:"
),
insulation_type=self._get_in(data, "Insulation Type:"),
insulation_thickness_mm=thickness_mm,
has_thermostat=self._optional_bool_in(data, "Cylinder Thermostat:") or self._optional_bool_in(data, "Has thermostat?"),
immersion_type=self._get_in(data, "Immersion:"),
)
def _parse_window(self, window_id: int, data: List[str]) -> Window:
height_raw = self._get_in(data, "Window height:")
width_raw = self._get_in(data, "Window width:")
return Window(
id=window_id,
location=self._get_in(data, "Window location:") or "",
wall_type=self._get_in(data, "Window wall type:") or "",
glazing_type=self._get_in(data, "Glazing Type:") or "",
window_type=self._get_in(data, "Window type:") or "",
frame_type=self._get_in(data, "Window frame type:") or "",
glazing_gap=self._get_in(data, "What size is the glazing gap?") or "",
draught_proofed=self._bool_in(data, "Is the window draught proofed?"),
permanent_shutters=self._bool_in(
data, "Are there permanent shutters present?"
),
height_m=float(height_raw.split()[0]) if height_raw else 0.0,
width_m=float(width_raw.split()[0]) if width_raw else 0.0,
orientation=self._get_in(data, "Orientation:") or "",
)
def _parse_insulation_thickness(
self, val: Optional[str]
) -> tuple[Optional[int], Optional[str]]:
if val is None:
return None, None
try:
return int(val.split()[0]), None
except (ValueError, IndexError):
return None, val
def _parse_roof_space_detail(self, data: List[str]) -> RoofSpaceDetail:
thickness_mm, thickness_str = self._parse_insulation_thickness(
self._get_in(data, "Roofs - Insulation Thickness:")
)
return RoofSpaceDetail(
construction_type=self._get_in(data, "Roofs - Construction Type:") or "",
insulation_at=self._get_in(data, "Roofs - Insulation At:") or "",
roof_u_value_known=self._is_known_in(data, "Roof U-Value:"),
cavity_wall_construction_indicators=self._get_in(
data, "Record indicators of Cavity Wall Construction in roof", offset=2
)
or "",
rooms_in_roof=self._bool_in(data, "Are there rooms in the roof?"),
insulation_thickness_mm=thickness_mm,
insulation_thickness=thickness_str,
)
def _parse_extension_roof_space(
self, ext_id: int, data: List[str]
) -> ExtensionRoofSpace:
thickness_mm, thickness_str = self._parse_insulation_thickness(
self._get_in(data, "Roofs - Insulation Thickness:")
)
return ExtensionRoofSpace(
id=ext_id,
construction_type=self._get_in(data, "Roofs - Construction Type:") or "",
insulation_at=self._get_in(data, "Roofs - Insulation At:") or "",
roof_u_value_known=self._is_known_in(data, "Roof U-Value:"),
cavity_wall_construction_indicators=self._get_in(
data, "Record indicators of Cavity Wall Construction in roof", offset=2
)
or "",
rooms_in_roof=self._bool_in(data, "Are there rooms in the roof?"),
insulation_thickness_mm=thickness_mm,
insulation_thickness=thickness_str,
)
def _parse_floor_measurements(self, data: List[str]) -> List[FloorMeasurement]:
floors = []
i = 0
while i < len(data):
if data[i].startswith("Floor") and i + 4 < len(data):
floors.append(
FloorMeasurement(
name=data[i],
area_m2=float(data[i + 1]),
height_m=float(data[i + 2]),
heat_loss_perimeter_m=float(data[i + 3]),
pwl_m=float(data[i + 4]),
)
)
i += 5
else:
i += 1
return floors
def _parse_floor_construction(self, data: List[str]) -> FloorConstruction:
return FloorConstruction(
floor_type=self._get_in(data, "Floor type:") or "",
floor_construction=self._get_in(data, "Floor Construction:") or "",
floor_insulation_type=self._get_in(data, "Floor Insulation Type:") or "",
floor_u_value_known=self._is_known_in(data, "Floor U-Value known?"),
)