from datetime import datetime from typing import List, Optional from datatypes.epc.surveys.pashub_rdsap_site_notes import ( BuildingConstruction, BuildingMeasurements, Conservatories, CustomerResponse, ExtensionConstruction, ExtensionMeasurements, ExtensionRoofSpace, FloorConstruction, FloorMeasurement, General, HeatingAndHotWater, MainBuildingConstruction, MainBuildingMeasurements, MainHeating, PasHubRdSapSiteNotes, Renewables, RoomCountElements, RoofSpace, RoofSpaceDetail, SecondaryHeating, Shower, SurveyAddendum, Ventilation, WaterHeating, WaterUse, Window, ) class PasHubRdSapSiteNotesExtractor: def __init__(self, text_list: list[str]) -> None: self.text_list = text_list # --- generic helpers --- def _get(self, key: str, offset: int = 1) -> Optional[str]: try: idx = self.text_list.index(key) return self.text_list[idx + offset].strip() or None except (ValueError, IndexError): return None def _bool(self, key: str, offset: int = 1) -> bool: val = self._get(key, offset) return val is not None and val.lower() == "yes" def _get_in(self, lst: List[str], key: str, offset: int = 1) -> Optional[str]: try: idx = lst.index(key) return lst[idx + offset].strip() or None except (ValueError, IndexError): return None def _bool_in(self, lst: List[str], key: str, offset: int = 1) -> bool: val = self._get_in(lst, key, offset) return val is not None and val.lower() == "yes" def _optional_bool_in(self, lst: List[str], key: str) -> Optional[bool]: val = self._get_in(lst, key) return None if val is None else val.lower() == "yes" def _is_known_in(self, lst: List[str], key: str) -> bool: val = self._get_in(lst, key) return val is not None and val.lower() != "not known" def _wall_thickness_in(self, lst: List[str]) -> int: val = self._get_in(lst, "Wall thickness:") return int(val.split()[0]) if val else 0 def _section(self, start: str, end: str) -> List[str]: try: start_idx = self.text_list.index(start) end_idx = self.text_list.index(end, start_idx) return self.text_list[start_idx:end_idx] except ValueError: return [] # --- public extract methods --- def extract(self) -> PasHubRdSapSiteNotes: raise NotImplementedError def extract_general(self) -> General: inspection_date_raw = self._get("Inspection Date:") inspection_date = ( datetime.strptime(inspection_date_raw, "%d/%m/%Y").strftime("%Y-%m-%d") if inspection_date_raw else "" ) storeys_raw = self._get("Number of storeys:") or "0" extensions_raw = self._get("Number of Extensions:") or "0" return General( epc_checked_before_assessment=self._bool( "Confirm you have checked for the existence of an", offset=2 ), epc_exists_at_point_of_assessment=self._bool( "Does an EPC exist at the point of carrying out this", offset=2 ), inspection_date=inspection_date, transaction_type=self._get("Transaction Type:") or "", tenure=self._get("Tenure:") or "", property_type=self._get("Type of Property:") or "", detachment_type=self._get("Detachment Type:") or "", number_of_storeys=int(storeys_raw.split()[0]), terrain_type=self._get("Terrain Type:") or "", number_of_extensions=int(extensions_raw.split()[0]), electricity_smart_meter=self._bool("Is an electricity smart meter present?"), electric_meter_type=self._get("Electric meter type:") or "", dwelling_export_capable=self._bool("Is the dwelling export-capable?"), mains_gas_available=self._bool("Is mains gas available?"), gas_smart_meter=self._bool("Is there a gas smart meter?"), gas_meter_accessible=self._bool("Is the gas meter accessible?"), measurements_location=self._get("Select Measurements Location:") or "", ) def extract_building_construction(self) -> BuildingConstruction: bc_section = self._section("Building Construction", "Building Measurements") # Find extension markers within this section extension_markers = [] i = 1 while f"Extension {i}" in bc_section: extension_markers.append(f"Extension {i}") i += 1 # Slice main building data: from "Main Building" to first extension or end main_start = bc_section.index("Main Building") main_end = ( bc_section.index(extension_markers[0]) if extension_markers else len(bc_section) ) main_data = bc_section[main_start:main_end] # Slice each extension's data extensions = [] for n, marker in enumerate(extension_markers): ext_start = bc_section.index(marker) ext_end = ( bc_section.index(extension_markers[n + 1]) if n + 1 < len(extension_markers) else len(bc_section) ) ext_data = bc_section[ext_start:ext_end] extensions.append(self._parse_extension_construction(n + 1, ext_data)) return BuildingConstruction( main_building=self._parse_main_building_construction(main_data), floor=self._parse_floor_construction(main_data), extensions=extensions if extensions else None, ) # --- private parsing helpers --- def _parse_main_building_construction( self, data: List[str] ) -> MainBuildingConstruction: return MainBuildingConstruction( age_range=self._get_in(data, "Age Range:") or "", age_indicators=self._get_in(data, "Record indicators of property age:") or "", walls_construction_type=self._get_in(data, "Walls - Construction Type:") or "", cavity_construction_indicators=self._get_in( data, "Record external indicators of Cavity Construction:" ) or "", walls_insulation_type=self._get_in(data, "Walls - Insulation Type:") or "", filled_cavity_indicators=self._get_in( data, "Record indicators of filled cavity:" ), thermal_conductivity_of_wall_insulation=self._get_in( data, "Thermal conductivity of wall insulation:" ) or "", wall_u_value_known=self._is_known_in(data, "Wall U-Value known?"), wall_thickness_mm=self._wall_thickness_in(data), party_wall_construction_type=self._get_in( data, "Party wall construction type:" ) or "", ) def _parse_extension_construction( self, ext_id: int, data: List[str] ) -> ExtensionConstruction: return ExtensionConstruction( id=ext_id, age_range=self._get_in(data, "Age Range:") or "", age_indicators=self._get_in(data, "Record indicators of property age:") or "", walls_construction_type=self._get_in(data, "Walls - Construction Type:") or "", cavity_construction_indicators=self._get_in( data, "Record external indicators of Cavity Construction:" ) or "", walls_insulation_type=self._get_in(data, "Walls - Insulation Type:") or "", filled_cavity_indicators=self._get_in( data, "Record indicators of filled cavity:" ), thermal_conductivity_of_wall_insulation=self._get_in( data, "Thermal conductivity of wall insulation:" ) or "", wall_u_value_known=self._is_known_in(data, "Wall U-Value known?"), wall_thickness_mm=self._wall_thickness_in(data), party_wall_construction_type=self._get_in( data, "Party wall construction type:" ) or "", ) def extract_building_measurements(self) -> BuildingMeasurements: bm_section = self._section("Building Measurements", "Roof Space") extension_markers = [] i = 1 while f"Extension {i}" in bm_section: extension_markers.append(f"Extension {i}") i += 1 main_start = bm_section.index("Main Building") main_end = ( bm_section.index(extension_markers[0]) if extension_markers else len(bm_section) ) main_floors = self._parse_floor_measurements(bm_section[main_start:main_end]) extensions = [] for n, marker in enumerate(extension_markers): ext_start = bm_section.index(marker) ext_end = ( bm_section.index(extension_markers[n + 1]) if n + 1 < len(extension_markers) else len(bm_section) ) extensions.append( ExtensionMeasurements( id=n + 1, floors=self._parse_floor_measurements(bm_section[ext_start:ext_end]), ) ) return BuildingMeasurements( main_building=MainBuildingMeasurements(floors=main_floors), extensions=extensions if extensions else None, ) def extract_roof_space(self) -> RoofSpace: rs_section = self._section("Roof Space", "Windows") extension_markers = [] i = 1 while f"Extension {i}" in rs_section: extension_markers.append(f"Extension {i}") i += 1 main_start = rs_section.index("Main Building") main_end = ( rs_section.index(extension_markers[0]) if extension_markers else len(rs_section) ) main_data = rs_section[main_start:main_end] extensions = [] for n, marker in enumerate(extension_markers): ext_start = rs_section.index(marker) ext_end = ( rs_section.index(extension_markers[n + 1]) if n + 1 < len(extension_markers) else len(rs_section) ) ext_data = rs_section[ext_start:ext_end] extensions.append(self._parse_extension_roof_space(n + 1, ext_data)) return RoofSpace( main_building=self._parse_roof_space_detail(main_data), extensions=extensions if extensions else None, ) def extract_windows(self) -> List[Window]: w_section = self._section("Windows", "Heating & Hot Water") windows = [] n = 1 while f"Window {n}" in w_section: start = w_section.index(f"Window {n}") end = ( w_section.index(f"Window {n + 1}") if f"Window {n + 1}" in w_section else len(w_section) ) windows.append(self._parse_window(n, w_section[start:end])) n += 1 return windows def extract_heating_and_hot_water(self) -> HeatingAndHotWater: hhw_section = self._section("Heating & Hot Water", "Ventilation") return HeatingAndHotWater( main_heating=self._parse_main_heating(hhw_section), secondary_heating=self._parse_secondary_heating(hhw_section), water_heating=self._parse_water_heating(hhw_section), ) def extract_ventilation(self) -> Ventilation: v_section = self._section("Ventilation", "Conservatories") return Ventilation( ventilation_type=self._get_in(v_section, "Ventilation type:") or "", has_fixed_air_conditioning=self._bool_in(v_section, "Has fixed air conditioning?"), number_of_open_flues=int(self._get_in(v_section, "Number of open flues:") or 0), number_of_closed_flues=int(self._get_in(v_section, "Number of closed flues:") or 0), number_of_boiler_flues=int(self._get_in(v_section, "Number of boiler flues:") or 0), number_of_other_flues=int(self._get_in(v_section, "Number of other flues:") or 0), number_of_extract_fans=int(self._get_in(v_section, "Number of extract fans:") or 0), number_of_passive_vents=int(self._get_in(v_section, "Number of passive vents:") or 0), number_of_flueless_gas_fires=int(self._get_in(v_section, "Number of flueless gas fires:") or 0), pressure_test=self._get_in(v_section, "Pressure test:") or "", draught_lobby=self._bool_in(v_section, "Is there a draught lobby?"), ventilation_in_pcdf_database=self._optional_bool_in( v_section, "Is the ventilation in the PCDF database?" ), ) def extract_conservatories(self) -> Conservatories: raise NotImplementedError def extract_renewables(self) -> Renewables: raise NotImplementedError def extract_room_count_elements(self) -> RoomCountElements: raise NotImplementedError def extract_water_use(self) -> WaterUse: raise NotImplementedError def extract_customer_response(self) -> CustomerResponse: raise NotImplementedError def extract_addendum(self) -> SurveyAddendum: raise NotImplementedError def _parse_main_heating(self, data: List[str]) -> MainHeating: return MainHeating( selection_method=self._get_in(data, "How would you like to select the Heating System?") or "", system_type=self._get_in(data, "System type:") or "", product_id=int(self._get_in(data, "Product Id") or 0), manufacturer=self._get_in(data, "Manufacturer") or "", model=self._get_in(data, "Model") or "", orig_manufacturer=self._get_in(data, "Orig Manuf") or "", fuel=self._get_in(data, "Fuel") or "", summer_efficiency=float(self._get_in(data, "S. Efficiency") or 0), type=self._get_in(data, "Type") or "", condensing=self._bool_in(data, "Condensing"), year=self._get_in(data, "Year") or "", mount=self._get_in(data, "Mount") or "", open_flue=self._get_in(data, "Open Flue") or "", fan_assist=self._bool_in(data, "Fan Assist"), status=self._get_in(data, "Status") or "", central_heating_pump_age=self._get_in(data, "Central heating pump age:") or "", controls=self._get_in(data, "Controls:") or "", flue_gas_heat_recovery_system=self._bool_in( data, "Does the boiler have a Flue Gas Heat Recover", offset=2 ), weather_compensator=self._bool_in(data, "Is there a weather compensator?"), emitter=self._get_in(data, "Emitter:") or "", emitter_temperature=self._get_in(data, "Emitter Temperature:") or "", ) def _parse_secondary_heating(self, data: List[str]) -> SecondaryHeating: return SecondaryHeating( secondary_fuel=self._get_in(data, "Secondary Fuel") or "", ) def _parse_water_heating(self, data: List[str]) -> WaterHeating: thickness_raw = self._get_in(data, "Insulation Thickness (mm):") return WaterHeating( type=self._get_in(data, "Water Heating Type:") or "", system=self._get_in(data, "Water Heating System:") or "", cylinder_size=self._get_in(data, "Cylinder Size:") or "", cylinder_measured_heat_loss=self._get_in(data, "Cylinder Measured Heat Loss:"), insulation_type=self._get_in(data, "Insulation Type:"), insulation_thickness_mm=int(thickness_raw) if thickness_raw else None, has_thermostat=self._optional_bool_in(data, "Cylinder Thermostat:"), ) def _parse_window(self, window_id: int, data: List[str]) -> Window: height_raw = self._get_in(data, "Window height:") width_raw = self._get_in(data, "Window width:") return Window( id=window_id, location=self._get_in(data, "Window location:") or "", wall_type=self._get_in(data, "Window wall type:") or "", glazing_type=self._get_in(data, "Glazing Type:") or "", window_type=self._get_in(data, "Window type:") or "", frame_type=self._get_in(data, "Window frame type:") or "", glazing_gap=self._get_in(data, "What size is the glazing gap?") or "", draught_proofed=self._bool_in(data, "Is the window draught proofed?"), permanent_shutters=self._bool_in(data, "Are there permanent shutters present?"), height_m=float(height_raw.split()[0]) if height_raw else 0.0, width_m=float(width_raw.split()[0]) if width_raw else 0.0, orientation=self._get_in(data, "Orientation:") or "", ) def _parse_insulation_thickness( self, val: Optional[str] ) -> tuple[Optional[int], Optional[str]]: if val is None: return None, None try: return int(val.split()[0]), None except (ValueError, IndexError): return None, val def _parse_roof_space_detail(self, data: List[str]) -> RoofSpaceDetail: thickness_mm, thickness_str = self._parse_insulation_thickness( self._get_in(data, "Roofs - Insulation Thickness:") ) return RoofSpaceDetail( construction_type=self._get_in(data, "Roofs - Construction Type:") or "", insulation_at=self._get_in(data, "Roofs - Insulation At:") or "", roof_u_value_known=self._is_known_in(data, "Roof U-Value:"), cavity_wall_construction_indicators=self._get_in( data, "Record indicators of Cavity Wall Construction in roof", offset=2 ) or "", rooms_in_roof=self._bool_in(data, "Are there rooms in the roof?"), insulation_thickness_mm=thickness_mm, insulation_thickness=thickness_str, ) def _parse_extension_roof_space( self, ext_id: int, data: List[str] ) -> ExtensionRoofSpace: thickness_mm, thickness_str = self._parse_insulation_thickness( self._get_in(data, "Roofs - Insulation Thickness:") ) return ExtensionRoofSpace( id=ext_id, construction_type=self._get_in(data, "Roofs - Construction Type:") or "", insulation_at=self._get_in(data, "Roofs - Insulation At:") or "", roof_u_value_known=self._is_known_in(data, "Roof U-Value:"), cavity_wall_construction_indicators=self._get_in( data, "Record indicators of Cavity Wall Construction in roof", offset=2 ) or "", rooms_in_roof=self._bool_in(data, "Are there rooms in the roof?"), insulation_thickness_mm=thickness_mm, insulation_thickness=thickness_str, ) def _parse_floor_measurements(self, data: List[str]) -> List[FloorMeasurement]: floors = [] i = 0 while i < len(data): if data[i].startswith("Floor") and i + 4 < len(data): floors.append( FloorMeasurement( name=data[i], area_m2=float(data[i + 1]), height_m=float(data[i + 2]), heat_loss_perimeter_m=float(data[i + 3]), pwl_m=float(data[i + 4]), ) ) i += 5 else: i += 1 return floors def _parse_floor_construction(self, data: List[str]) -> FloorConstruction: return FloorConstruction( floor_type=self._get_in(data, "Floor type:") or "", floor_construction=self._get_in(data, "Floor Construction:") or "", floor_insulation_type=self._get_in(data, "Floor Insulation Type:") or "", floor_u_value_known=self._is_known_in(data, "Floor U-Value known?"), )