from datetime import datetime from typing import List, Optional from datatypes.epc.surveys.pashub_rdsap_site_notes import ( BuildingConstruction, InspectionMetadata, BuildingMeasurements, Conservatories, CustomerResponse, ExtensionConstruction, ExtensionMeasurements, ExtensionRoofSpace, FloorConstruction, FloorMeasurement, General, HeatingAndHotWater, MainBuildingConstruction, MainBuildingMeasurements, MainHeating, PasHubRdSapSiteNotes, Renewables, RoomCountElements, RoofSpace, RoofSpaceDetail, SecondaryHeating, Shower, SurveyAddendum, Ventilation, WaterHeating, WaterUse, Window, ) class PasHubRdSapSiteNotesExtractor: def __init__(self, text_list: list[str]) -> None: self.text_list = text_list # --- generic helpers --- def _get(self, key: str, offset: int = 1) -> Optional[str]: try: idx = self.text_list.index(key) return self.text_list[idx + offset].strip() or None except (ValueError, IndexError): return None def _bool(self, key: str, offset: int = 1) -> bool: val = self._get(key, offset) return val is not None and val.lower() == "yes" def _get_in(self, lst: List[str], key: str, offset: int = 1) -> Optional[str]: try: idx = lst.index(key) return lst[idx + offset].strip() or None except (ValueError, IndexError): return None def _bool_in(self, lst: List[str], key: str, offset: int = 1) -> bool: val = self._get_in(lst, key, offset) return val is not None and val.lower() == "yes" def _optional_bool_in(self, lst: List[str], key: str) -> Optional[bool]: val = self._get_in(lst, key) return None if val is None else val.lower() == "yes" def _is_known_in(self, lst: List[str], key: str) -> bool: val = self._get_in(lst, key) return val is not None and val.lower() != "not known" def _wall_thickness_in(self, lst: List[str]) -> int: val = self._get_in(lst, "Wall thickness:") return int(val.split()[0]) if val else 0 def _section(self, start: str, end: str) -> List[str]: try: start_idx = self.text_list.index(start) end_idx = self.text_list.index(end, start_idx) return self.text_list[start_idx:end_idx] except ValueError: return [] # --- public extract methods --- def extract_inspection_metadata(self) -> InspectionMetadata: try: addr_start = self.text_list.index("Property Address:") + 1 addr_end = self.text_list.index("Property Photo", addr_start) property_address = ", ".join( t.rstrip(",") for t in self.text_list[addr_start:addr_end] ) except ValueError: property_address = "" created_on_raw = self._get("Created On:") created_on = ( datetime.strptime(created_on_raw, "%d %B %Y").strftime("%Y-%m-%d") if created_on_raw else "" ) date_of_inspection_raw = self._get("Date of Inspection:") if not date_of_inspection_raw: raise ValueError("Date of Inspection not found in document") date_of_inspection = datetime.strptime(date_of_inspection_raw, "%d %B %Y").date() return InspectionMetadata( inspection_surveyor=self._get("Inspection Surveyor:") or "", email_address=self._get("E-Mail Address:") or "", report_reference=self._get("Report Reference:") or "", created_on=created_on, date_of_inspection=date_of_inspection, property_address=property_address, property_photo="Property Photo" in self.text_list, ) def extract(self) -> PasHubRdSapSiteNotes: return PasHubRdSapSiteNotes( inspection_metadata=self.extract_inspection_metadata(), general=self.extract_general(), building_construction=self.extract_building_construction(), building_measurements=self.extract_building_measurements(), roof_space=self.extract_roof_space(), windows=self.extract_windows(), heating_and_hot_water=self.extract_heating_and_hot_water(), ventilation=self.extract_ventilation(), conservatories=self.extract_conservatories(), renewables=self.extract_renewables(), room_count_elements=self.extract_room_count_elements(), water_use=self.extract_water_use(), customer_response=self.extract_customer_response(), addendum=self.extract_addendum(), ) def extract_general(self) -> General: inspection_date_raw = self._get("Inspection Date:") if not inspection_date_raw: raise ValueError("Inspection Date not found in document") inspection_date = datetime.strptime(inspection_date_raw, "%d/%m/%Y").date() storeys_raw = self._get("Number of storeys:") or "0" extensions_raw = self._get("Number of Extensions:") or "0" return General( epc_checked_before_assessment=self._bool( "Confirm you have checked for the existence of an", offset=2 ), epc_exists_at_point_of_assessment=self._bool( "Does an EPC exist at the point of carrying out this", offset=2 ), inspection_date=inspection_date, transaction_type=self._get("Transaction Type:") or "", tenure=self._get("Tenure:") or "", property_type=self._get("Type of Property:") or "", detachment_type=self._get("Detachment Type:") or "", number_of_storeys=int(storeys_raw.split()[0]), terrain_type=self._get("Terrain Type:") or "", number_of_extensions=int(extensions_raw.split()[0]), electricity_smart_meter=self._bool("Is an electricity smart meter present?"), electric_meter_type=self._get("Electric meter type:") or "", dwelling_export_capable=self._bool("Is the dwelling export-capable?"), mains_gas_available=self._bool("Is mains gas available?"), gas_smart_meter=self._bool("Is there a gas smart meter?"), gas_meter_accessible=self._bool("Is the gas meter accessible?"), measurements_location=self._get("Select Measurements Location:") or "", ) def extract_building_construction(self) -> BuildingConstruction: bc_section = self._section("Building Construction", "Building Measurements") # Find extension markers within this section extension_markers = [] i = 1 while f"Extension {i}" in bc_section: extension_markers.append(f"Extension {i}") i += 1 # Slice main building data: from "Main Building" to first extension or end main_start = bc_section.index("Main Building") main_end = ( bc_section.index(extension_markers[0]) if extension_markers else len(bc_section) ) main_data = bc_section[main_start:main_end] # Slice each extension's data extensions = [] for n, marker in enumerate(extension_markers): ext_start = bc_section.index(marker) ext_end = ( bc_section.index(extension_markers[n + 1]) if n + 1 < len(extension_markers) else len(bc_section) ) ext_data = bc_section[ext_start:ext_end] extensions.append(self._parse_extension_construction(n + 1, ext_data)) return BuildingConstruction( main_building=self._parse_main_building_construction(main_data), floor=self._parse_floor_construction(main_data), extensions=extensions if extensions else None, ) # --- private parsing helpers --- def _parse_main_building_construction( self, data: List[str] ) -> MainBuildingConstruction: return MainBuildingConstruction( age_range=self._get_in(data, "Age Range:") or "", age_indicators=self._get_in(data, "Record indicators of property age:") or "", walls_construction_type=self._get_in(data, "Walls - Construction Type:") or "", cavity_construction_indicators=self._get_in( data, "Record external indicators of Cavity Construction:" ) or "", walls_insulation_type=self._get_in(data, "Walls - Insulation Type:") or "", filled_cavity_indicators=self._get_in( data, "Record indicators of filled cavity:" ), thermal_conductivity_of_wall_insulation=self._get_in( data, "Thermal conductivity of wall insulation:" ) or "", wall_u_value_known=self._is_known_in(data, "Wall U-Value known?"), wall_thickness_mm=self._wall_thickness_in(data), party_wall_construction_type=self._get_in( data, "Party wall construction type:" ) or "", ) def _parse_extension_construction( self, ext_id: int, data: List[str] ) -> ExtensionConstruction: return ExtensionConstruction( id=ext_id, age_range=self._get_in(data, "Age Range:") or "", age_indicators=self._get_in(data, "Record indicators of property age:") or "", walls_construction_type=self._get_in(data, "Walls - Construction Type:") or "", cavity_construction_indicators=self._get_in( data, "Record external indicators of Cavity Construction:" ) or "", walls_insulation_type=self._get_in(data, "Walls - Insulation Type:") or "", filled_cavity_indicators=self._get_in( data, "Record indicators of filled cavity:" ), thermal_conductivity_of_wall_insulation=self._get_in( data, "Thermal conductivity of wall insulation:" ) or "", wall_u_value_known=self._is_known_in(data, "Wall U-Value known?"), wall_thickness_mm=self._wall_thickness_in(data), party_wall_construction_type=self._get_in( data, "Party wall construction type:" ) or "", ) def extract_building_measurements(self) -> BuildingMeasurements: bm_section = self._section("Building Measurements", "Roof Space") extension_markers = [] i = 1 while f"Extension {i}" in bm_section: extension_markers.append(f"Extension {i}") i += 1 main_start = bm_section.index("Main Building") main_end = ( bm_section.index(extension_markers[0]) if extension_markers else len(bm_section) ) main_floors = self._parse_floor_measurements(bm_section[main_start:main_end]) extensions = [] for n, marker in enumerate(extension_markers): ext_start = bm_section.index(marker) ext_end = ( bm_section.index(extension_markers[n + 1]) if n + 1 < len(extension_markers) else len(bm_section) ) extensions.append( ExtensionMeasurements( id=n + 1, floors=self._parse_floor_measurements(bm_section[ext_start:ext_end]), ) ) return BuildingMeasurements( main_building=MainBuildingMeasurements(floors=main_floors), extensions=extensions if extensions else None, ) def extract_roof_space(self) -> RoofSpace: rs_section = self._section("Roof Space", "Windows") extension_markers = [] i = 1 while f"Extension {i}" in rs_section: extension_markers.append(f"Extension {i}") i += 1 main_start = rs_section.index("Main Building") main_end = ( rs_section.index(extension_markers[0]) if extension_markers else len(rs_section) ) main_data = rs_section[main_start:main_end] extensions = [] for n, marker in enumerate(extension_markers): ext_start = rs_section.index(marker) ext_end = ( rs_section.index(extension_markers[n + 1]) if n + 1 < len(extension_markers) else len(rs_section) ) ext_data = rs_section[ext_start:ext_end] extensions.append(self._parse_extension_roof_space(n + 1, ext_data)) return RoofSpace( main_building=self._parse_roof_space_detail(main_data), extensions=extensions if extensions else None, ) def extract_windows(self) -> List[Window]: w_section = self._section("Windows", "Heating & Hot Water") windows = [] n = 1 while f"Window {n}" in w_section: start = w_section.index(f"Window {n}") end = ( w_section.index(f"Window {n + 1}") if f"Window {n + 1}" in w_section else len(w_section) ) windows.append(self._parse_window(n, w_section[start:end])) n += 1 return windows def extract_heating_and_hot_water(self) -> HeatingAndHotWater: hhw_section = self._section("Heating & Hot Water", "Ventilation") return HeatingAndHotWater( main_heating=self._parse_main_heating(hhw_section), secondary_heating=self._parse_secondary_heating(hhw_section), water_heating=self._parse_water_heating(hhw_section), ) def extract_ventilation(self) -> Ventilation: v_section = self._section("Ventilation", "Conservatories") return Ventilation( ventilation_type=self._get_in(v_section, "Ventilation type:") or "", has_fixed_air_conditioning=self._bool_in(v_section, "Has fixed air conditioning?"), number_of_open_flues=int(self._get_in(v_section, "Number of open flues:") or 0), number_of_closed_flues=int(self._get_in(v_section, "Number of closed flues:") or 0), number_of_boiler_flues=int(self._get_in(v_section, "Number of boiler flues:") or 0), number_of_other_flues=int(self._get_in(v_section, "Number of other flues:") or 0), number_of_extract_fans=int(self._get_in(v_section, "Number of extract fans:") or 0), number_of_passive_vents=int(self._get_in(v_section, "Number of passive vents:") or 0), number_of_flueless_gas_fires=int(self._get_in(v_section, "Number of flueless gas fires:") or 0), pressure_test=self._get_in(v_section, "Pressure test:") or "", draught_lobby=self._bool_in(v_section, "Is there a draught lobby?"), ventilation_in_pcdf_database=self._optional_bool_in( v_section, "Is the ventilation in the PCDF database?" ), ) def extract_conservatories(self) -> Conservatories: c_section = self._section("Conservatories", "Renewables") val = self._get_in(c_section, "Is there conservatory?") return Conservatories( has_conservatory=val is not None and val.lower() != "no conservatory" ) def extract_renewables(self) -> Renewables: r_section = self._section("Renewables", "Room Count Elements") batteries_raw = self._get_in(r_section, "Number of PV batteries:") batteries = ( 0 if batteries_raw is None or batteries_raw.lower() == "none" else int(batteries_raw) ) return Renewables( wind_turbines=self._bool_in(r_section, "Has wind turbines?"), solar_hot_water=self._bool_in(r_section, "Has solar hot water?"), photovoltaic_array=self._bool_in(r_section, "Has photovoltaic array?"), number_of_pv_batteries=batteries, hydro=self._bool_in(r_section, "Is the dwelling connected to Hydro?"), ) def extract_room_count_elements(self) -> RoomCountElements: rce_section = self._section("Room Count Elements", "Customer Response") heated_rooms_raw = self._get_in(rce_section, "Number of heated rooms?") return RoomCountElements( number_of_habitable_rooms=int( self._get_in(rce_section, "Number of habitable rooms?") or 0 ), any_unheated_rooms=self._bool_in( rce_section, "Are any of these rooms unheated?" ), number_of_heated_rooms=int(heated_rooms_raw) if heated_rooms_raw else None, number_of_external_doors=int( self._get_in(rce_section, "Number of external doors?") or 0 ), number_of_insulated_external_doors=int( self._get_in(rce_section, "Number of insulated external doors?") or 0 ), number_of_draughtproofed_external_doors=int( self._get_in(rce_section, "Number of draughtproofed external doors?") or 0 ), number_of_open_chimneys=int( self._get_in(rce_section, "Number of open chimneys?") or 0 ), number_of_blocked_chimneys=int( self._get_in(rce_section, "Number of blocked chimneys?") or 0 ), number_of_fixed_incandescent_bulbs=int( self._get_in(rce_section, "Number of fixed incandescent bulbs:") or 0 ), exact_led_cfl_count_known=self._bool_in( rce_section, "Is the exact number of LED and CFL bulbs known?" ), number_of_fixed_led_bulbs=int( self._get_in(rce_section, "Number of fixed LED bulbs:") or 0 ), number_of_fixed_cfl_bulbs=int( self._get_in(rce_section, "Number of fixed CFL bulbs:") or 0 ), waste_water_heat_recovery=self._get_in( rce_section, "Are there any waste water heat recovery systems?" ) or "", ) def extract_water_use(self) -> WaterUse: wu_section = self._section("Room Count Elements", "Customer Response") baths_raw = self._get_in(wu_section, "Number of baths:") or "0" special_raw = self._get_in( wu_section, "How many special features are there at the", offset=2 ) or "0" showers = [] n = 1 while f"Shower {n}" in wu_section: start = wu_section.index(f"Shower {n}") end = ( wu_section.index(f"Shower {n + 1}") if f"Shower {n + 1}" in wu_section else len(wu_section) ) shower_data = wu_section[start:end] showers.append( Shower( id=n, outlet_type=self._get_in(shower_data, "Shower outlet type:") or "", ) ) n += 1 return WaterUse( number_of_baths=int(baths_raw), number_of_special_features=int(special_raw), showers=showers, ) def extract_customer_response(self) -> CustomerResponse: cr_section = self._section( "Customer Response", "Addendum + Related Party Disclosure" ) return CustomerResponse( customer_present=self._bool_in(cr_section, "Customer present?"), willing_to_answer_satisfaction_survey=self._bool_in( cr_section, "Customer willing to answer satisfaction survey?" ), ) def extract_addendum(self) -> SurveyAddendum: a_section = self._section( "Addendum + Related Party Disclosure", "Photographs Required" ) return SurveyAddendum( addendum=self._get_in(a_section, "Addendum") or "", related_party_disclosure=self._get_in( a_section, "Related party disclosure" ) or "", hard_to_treat_cavity_access_issues=self._bool_in( a_section, "Hard to treat cavity walls: Property has access", offset=2, ), hard_to_treat_cavity_high_exposure=self._bool_in( a_section, "Hard to treat cavity walls: Property has high", offset=2, ), hard_to_treat_cavity_narrow_cavities=self._bool_in( a_section, "Hard to treat cavity walls: Property has narrow", offset=2, ), ) def _parse_main_heating(self, data: List[str]) -> MainHeating: return MainHeating( selection_method=self._get_in(data, "How would you like to select the Heating System?") or "", system_type=self._get_in(data, "System type:") or "", product_id=int(self._get_in(data, "Product Id") or 0), manufacturer=self._get_in(data, "Manufacturer") or "", model=self._get_in(data, "Model") or "", orig_manufacturer=self._get_in(data, "Orig Manuf") or "", fuel=self._get_in(data, "Fuel") or "", summer_efficiency=float(self._get_in(data, "S. Efficiency") or 0), type=self._get_in(data, "Type") or "", condensing=self._bool_in(data, "Condensing"), year=self._get_in(data, "Year") or "", mount=self._get_in(data, "Mount") or "", open_flue=self._get_in(data, "Open Flue") or "", fan_assist=self._bool_in(data, "Fan Assist"), status=self._get_in(data, "Status") or "", central_heating_pump_age=self._get_in(data, "Central heating pump age:") or "", controls=self._get_in(data, "Controls:") or "", flue_gas_heat_recovery_system=self._bool_in( data, "Does the boiler have a Flue Gas Heat Recover", offset=2 ), weather_compensator=self._bool_in(data, "Is there a weather compensator?"), emitter=self._get_in(data, "Emitter:") or "", emitter_temperature=self._get_in(data, "Emitter Temperature:") or "", ) def _parse_secondary_heating(self, data: List[str]) -> SecondaryHeating: return SecondaryHeating( secondary_fuel=self._get_in(data, "Secondary Fuel") or "", ) def _parse_water_heating(self, data: List[str]) -> WaterHeating: thickness_raw = self._get_in(data, "Insulation Thickness (mm):") return WaterHeating( type=self._get_in(data, "Water Heating Type:") or "", system=self._get_in(data, "Water Heating System:") or "", cylinder_size=self._get_in(data, "Cylinder Size:") or "", cylinder_measured_heat_loss=self._get_in(data, "Cylinder Measured Heat Loss:"), insulation_type=self._get_in(data, "Insulation Type:"), insulation_thickness_mm=int(thickness_raw) if thickness_raw else None, has_thermostat=self._optional_bool_in(data, "Cylinder Thermostat:"), ) def _parse_window(self, window_id: int, data: List[str]) -> Window: height_raw = self._get_in(data, "Window height:") width_raw = self._get_in(data, "Window width:") return Window( id=window_id, location=self._get_in(data, "Window location:") or "", wall_type=self._get_in(data, "Window wall type:") or "", glazing_type=self._get_in(data, "Glazing Type:") or "", window_type=self._get_in(data, "Window type:") or "", frame_type=self._get_in(data, "Window frame type:") or "", glazing_gap=self._get_in(data, "What size is the glazing gap?") or "", draught_proofed=self._bool_in(data, "Is the window draught proofed?"), permanent_shutters=self._bool_in(data, "Are there permanent shutters present?"), height_m=float(height_raw.split()[0]) if height_raw else 0.0, width_m=float(width_raw.split()[0]) if width_raw else 0.0, orientation=self._get_in(data, "Orientation:") or "", ) def _parse_insulation_thickness( self, val: Optional[str] ) -> tuple[Optional[int], Optional[str]]: if val is None: return None, None try: return int(val.split()[0]), None except (ValueError, IndexError): return None, val def _parse_roof_space_detail(self, data: List[str]) -> RoofSpaceDetail: thickness_mm, thickness_str = self._parse_insulation_thickness( self._get_in(data, "Roofs - Insulation Thickness:") ) return RoofSpaceDetail( construction_type=self._get_in(data, "Roofs - Construction Type:") or "", insulation_at=self._get_in(data, "Roofs - Insulation At:") or "", roof_u_value_known=self._is_known_in(data, "Roof U-Value:"), cavity_wall_construction_indicators=self._get_in( data, "Record indicators of Cavity Wall Construction in roof", offset=2 ) or "", rooms_in_roof=self._bool_in(data, "Are there rooms in the roof?"), insulation_thickness_mm=thickness_mm, insulation_thickness=thickness_str, ) def _parse_extension_roof_space( self, ext_id: int, data: List[str] ) -> ExtensionRoofSpace: thickness_mm, thickness_str = self._parse_insulation_thickness( self._get_in(data, "Roofs - Insulation Thickness:") ) return ExtensionRoofSpace( id=ext_id, construction_type=self._get_in(data, "Roofs - Construction Type:") or "", insulation_at=self._get_in(data, "Roofs - Insulation At:") or "", roof_u_value_known=self._is_known_in(data, "Roof U-Value:"), cavity_wall_construction_indicators=self._get_in( data, "Record indicators of Cavity Wall Construction in roof", offset=2 ) or "", rooms_in_roof=self._bool_in(data, "Are there rooms in the roof?"), insulation_thickness_mm=thickness_mm, insulation_thickness=thickness_str, ) def _parse_floor_measurements(self, data: List[str]) -> List[FloorMeasurement]: floors = [] i = 0 while i < len(data): if data[i].startswith("Floor") and i + 4 < len(data): floors.append( FloorMeasurement( name=data[i], area_m2=float(data[i + 1]), height_m=float(data[i + 2]), heat_loss_perimeter_m=float(data[i + 3]), pwl_m=float(data[i + 4]), ) ) i += 5 else: i += 1 return floors def _parse_floor_construction(self, data: List[str]) -> FloorConstruction: return FloorConstruction( floor_type=self._get_in(data, "Floor type:") or "", floor_construction=self._get_in(data, "Floor Construction:") or "", floor_insulation_type=self._get_in(data, "Floor Insulation Type:") or "", floor_u_value_known=self._is_known_in(data, "Floor U-Value known?"), )