import re from datetime import date, datetime from typing import List, Optional from datatypes.epc.surveys.elmhurst_site_notes import ( BathsAndShowers, BuildingPartDimensions, ElmhurstSiteNotes, FloorDetails, FloorDimension, Lighting, MainHeating, Meters, PropertyDetails, Renewables, RoofDetails, Shower, SurveyorInfo, VentilationAndCooling, WallDetails, WaterHeating, Window, ) class ElmhurstSiteNotesExtractor: def __init__(self, pages: List[str]) -> None: self._text = "\n".join(pages) self._lines = [l.strip() for l in self._text.splitlines() if l.strip()] # --- generic helpers --- def _next_val(self, label: str) -> Optional[str]: lc = label.rstrip(":") + ":" lb = label.rstrip(":") for i, line in enumerate(self._lines): if line.startswith(lc) and len(line) > len(lc): return line[len(lc):].strip() or None if line == lc or line == lb: for j in range(i + 1, min(i + 4, len(self._lines))): v = self._lines[j] if v.endswith(":") or v.startswith("©"): return None if v: return v return None return None def _str_val(self, label: str) -> str: v = self._next_val(label) return " ".join(v.split()) if v else "" def _opt_str(self, label: str) -> Optional[str]: v = self._next_val(label) return " ".join(v.split()) if v else None def _bool_val(self, label: str) -> bool: v = self._next_val(label) return v is not None and v.lower() == "yes" def _int_val(self, label: str) -> int: v = self._next_val(label) try: return int(v.split()[0]) if v else 0 except (ValueError, IndexError): return 0 def _date_val(self, label: str) -> date: v = self._next_val(label) if not v: raise ValueError(f"Missing date for label: {label}") return datetime.strptime(v.strip(), "%d/%m/%Y").date() def _between(self, start: str, end: str) -> str: try: s = self._text.index(start) + len(start) e = self._text.index(end, s) return self._text[s:e] except ValueError: return "" def _section_lines(self, start: str, end: str) -> List[str]: text = self._between(start, end) return [l.strip() for l in text.splitlines() if l.strip()] def _local_val(self, lines: List[str], label: str) -> Optional[str]: lb = label.rstrip(":") lc = lb + ":" for i, line in enumerate(lines): if line.startswith(lc) and len(line) > len(lc): return line[len(lc):].strip() or None if line == lc or line == lb: for j in range(i + 1, min(i + 4, len(lines))): v = lines[j] if v.endswith(":") or v.startswith("©"): return None if v: return v return None return None def _local_str(self, lines: List[str], label: str) -> str: v = self._local_val(lines, label) return " ".join(v.split()) if v else "" def _local_bool(self, lines: List[str], label: str) -> bool: v = self._local_val(lines, label) return v is not None and v.lower() == "yes" # --- section extractors --- def _extract_surveyor_info(self) -> SurveyorInfo: return SurveyorInfo( surveyor_code=self._str_val("Surveyor"), name=self._str_val("Name"), title=self._str_val("Title"), tel_number=self._str_val("Tel Number"), survey_reference=self._str_val("Survey Reference"), my_reference=self._opt_str("My Reference"), ) def _extract_property_details(self) -> PropertyDetails: epc_m = re.search( r"Check for the existence of\nan EPC:\n(Yes|No)", self._text ) epc_exists = epc_m.group(1).lower() == "yes" if epc_m else False return PropertyDetails( rdsap_version=self._str_val("RdSAP version"), reference_number=self._str_val("Reference Number"), lodgement_required=self._bool_val("Lodgement Required"), regs_region=self._str_val("Regs Region"), epc_language=self._str_val("EPC Language"), postcode=self._str_val("Postcode"), region=self._str_val("Region"), street=self._str_val("Street"), town=self._str_val("Town"), tenure=self._str_val("Property Tenure"), transaction_type=self._str_val("Transaction Type"), inspection_date=self._date_val("Inspection Date"), process_date=self._date_val("Process date"), epc_exists=epc_exists, uprn=self._opt_str("UPRN"), house_name=self._opt_str("House Name"), house_number=self._opt_str("House No"), locality=self._opt_str("Locality"), county=self._opt_str("County"), ) def _extract_attachment(self) -> str: m = re.search(r"1\.0 Property type:\n[^\n]+\n([^\n]+)", self._text) return " ".join(m.group(1).strip().split()) if m else "" def _extract_dimensions(self) -> BuildingPartDimensions: dim_type = self._str_val("Dimension type") section = self._between("4.0 Dimensions:", "5.0 Conservatory:") floor_matches = re.findall( r"([A-Za-z ]+Floor):\n([\d.]+)\n([\d.]+)\n([\d.]+)\n([\d.]+)", section, ) floors = [ FloorDimension( name=name.strip(), area_m2=float(area), room_height_m=float(height), heat_loss_perimeter_m=float(hlp), party_wall_length_m=float(pwl), ) for name, area, height, hlp, pwl in floor_matches ] return BuildingPartDimensions(dimension_type=dim_type, floors=floors) def _extract_walls(self) -> WallDetails: lines = self._section_lines("7.0 Walls:", "8.0 Roofs:") thickness_raw = self._local_val(lines, "Wall Thickness") thickness_mm = ( int(thickness_raw.split()[0]) if thickness_raw else None ) return WallDetails( wall_type=self._local_str(lines, "Type"), insulation=self._local_str(lines, "Insulation"), thickness_unknown=self._local_bool(lines, "Wall Thickness Unknown"), u_value_known=self._local_bool(lines, "U-value Known"), party_wall_type=self._local_str(lines, "Party Wall Type"), thickness_mm=thickness_mm, ) def _extract_roof(self) -> RoofDetails: lines = self._section_lines("8.0 Roofs:", "8.1 Rooms in Roof:") thickness_raw = self._local_val(lines, "Insulation Thickness") thickness_mm = ( int(thickness_raw.split()[0]) if thickness_raw else None ) return RoofDetails( roof_type=self._local_str(lines, "Type"), insulation=self._local_str(lines, "Insulation"), u_value_known=self._local_bool(lines, "U-value Known"), insulation_thickness_mm=thickness_mm, ) def _extract_floor(self) -> FloorDetails: lines = self._section_lines("9.0 Floors:", "10.0 Doors:") u_val_raw = self._local_val(lines, "Default U-value") default_u = float(u_val_raw) if u_val_raw else None return FloorDetails( location=self._local_str(lines, "Location"), floor_type=self._local_str(lines, "Type"), insulation=self._local_str(lines, "Insulation"), u_value_known=self._local_bool(lines, "U-value Known"), default_u_value=default_u, ) def _extract_windows(self) -> List[Window]: m = re.search( r"Permanent\s+Shutters\n(.*?)Draught Proofing", self._text, re.DOTALL, ) if not m: return [] tokens = [t.strip() for t in m.group(1).splitlines() if t.strip()] windows: List[Window] = [] i = 0 while i + 12 < len(tokens): try: width_m = float(tokens[i]) height_m = float(tokens[i + 1]) area_m2 = float(tokens[i + 2]) except (ValueError, IndexError): i += 1 continue i += 3 # Collect glazing type tokens until frame_factor (0 < v ≤ 1.0) glazing_parts: List[str] = [] while i < len(tokens): try: v = float(tokens[i]) if 0.0 < v <= 1.0: break glazing_parts.append(tokens[i]) except ValueError: glazing_parts.append(tokens[i]) i += 1 # If last glazing token is a single word (no spaces, not numeric) it's the frame_type frame_type: Optional[str] = None if glazing_parts and " " not in glazing_parts[-1] and not glazing_parts[-1].replace(".", "").isdigit(): frame_type = glazing_parts.pop() glazing_type = " ".join(glazing_parts).strip() if i >= len(tokens): break frame_factor = float(tokens[i]); i += 1 # Consume glazing_gap if present ("mm" token, possibly multi-token e.g. "16 mm or more") glazing_gap: Optional[str] = None if i < len(tokens) and "mm" in tokens[i]: gap_parts = [tokens[i]]; i += 1 while i < len(tokens) and tokens[i].lower() in {"or", "more"}: gap_parts.append(tokens[i]); i += 1 glazing_gap = " ".join(gap_parts) building_part = tokens[i]; i += 1 location = tokens[i]; i += 1 orientation = tokens[i]; i += 1 data_source = tokens[i]; i += 1 u_value = float(tokens[i]); i += 1 g_value = float(tokens[i]); i += 1 draught_proofed = tokens[i].lower() == "yes"; i += 1 permanent_shutters = tokens[i]; i += 1 windows.append( Window( width_m=width_m, height_m=height_m, area_m2=area_m2, glazing_type=glazing_type, frame_factor=frame_factor, building_part=building_part, location=location, orientation=orientation, data_source=data_source, u_value=u_value, g_value=g_value, draught_proofed=draught_proofed, permanent_shutters=permanent_shutters, frame_type=frame_type, glazing_gap=glazing_gap, ) ) return windows def _extract_ventilation(self) -> VentilationAndCooling: return VentilationAndCooling( open_chimneys_count=self._int_val("No. of open chimneys"), open_flues_count=self._int_val("No. of open flues"), open_chimneys_closed_fire_count=self._int_val( "No. of open chimneys/open flues attached to closed fire" ), solid_fuel_boiler_flues_count=self._int_val( "No. of flues attached to solid fuel boiler" ), other_heater_flues_count=self._int_val( "No. of open flues attached to other heater" ), blocked_chimneys_count=self._int_val("No. of blocked chimneys"), extract_fans_count=self._int_val("No. of intermittent extract fans"), passive_vents_count=self._int_val("No. of passive vents"), flueless_gas_fires_count=self._int_val("No. of flueless gas fires"), fixed_space_cooling=self._bool_val("Fixed Space Cooling"), draught_lobby=self._str_val("Draught Lobby"), mechanical_ventilation=self._bool_val("Mechanical Ventilation"), pressure_test_method=self._str_val("Test Method"), ) def _extract_lighting(self) -> Lighting: led_cfl_count_known = self._bool_val("Number of LED and CFL Known") return Lighting( total_bulbs=self._int_val("Total number of bulbs"), led_cfl_count_known=led_cfl_count_known, led_count=self._int_val("Number of LED lights"), cfl_count=self._int_val("Number of CFL lights"), incandescent_count=self._int_val("Total number of incandescents"), low_energy_count=( 0 if led_cfl_count_known else self._int_val("Total number of Low Energy") ), ) def _extract_main_heating(self) -> MainHeating: lines = self._section_lines("14.0 Main Heating1", "14.1 Main Heating2") pct_raw = self._local_val(lines, "Percentage of Heat") pct = int(pct_raw.split()[0]) if pct_raw else 0 return MainHeating( heat_emitter=self._local_str(lines, "Heat Emitter"), fuel_type=self._local_str(lines, "Fuel Type"), flue_type=self._local_str(lines, "Flue Type"), fan_assisted_flue=self._local_bool(lines, "Fan Assisted Flue"), design_flow_temperature=self._local_str(lines, "Design flow temperature"), heating_controls_ees=self._local_str(lines, "Main Heating Controls EES"), heating_controls_sap=self._local_str(lines, "Main Heating Controls Sap"), percentage_of_heat=pct, pcdf_boiler_reference=self._local_val(lines, "PCDF boiler Reference"), heat_pump_age=self._local_val(lines, "Heat pump age"), ) def _extract_meters(self) -> Meters: return Meters( electricity_meter_type=self._str_val("Electricity meter type"), main_gas=self._bool_val("Main gas"), electricity_smart_meter=self._bool_val("Electricity Smart Meter Present"), gas_smart_meter=self._bool_val("Gas Smart Meter Present"), ) def _extract_water_heating(self) -> WaterHeating: return WaterHeating( water_heating_code=self._str_val("Water Heating Code"), water_heating_sap_code=self._int_val("Water Heating SapCode"), water_heating_fuel_type=self._str_val("Water Heating Fuel Type"), hot_water_cylinder_present=self._bool_val("Hot Water Cylinder Present"), ) def _extract_baths_and_showers(self) -> BathsAndShowers: n_baths = self._int_val("Total Number of Baths") n_connected = self._int_val("Number of Baths Connected") try: idx = self._lines.index("Connected") except ValueError: return BathsAndShowers( number_of_baths=n_baths, number_of_baths_connected=n_connected, showers=[], ) showers: List[Shower] = [] j = idx + 1 while j + 2 <= len(self._lines) - 1: num_line = self._lines[j] if not num_line.isdigit(): break showers.append( Shower( shower_number=int(num_line), outlet_type=self._lines[j + 1], connected=self._lines[j + 2], ) ) j += 3 return BathsAndShowers( number_of_baths=n_baths, number_of_baths_connected=n_connected, showers=showers, ) def _rating_val(self, label: str) -> int: v = self._next_val(label) try: return int(v.split()[-1]) if v else 0 except (ValueError, IndexError): return 0 def _extract_renewables(self) -> Renewables: fghrs_lines = self._section_lines( "18.0 Flue Gas Heat Recovery System", "19.0 Photovoltaic Panel" ) fghrs = self._local_bool(fghrs_lines, "Present") terrain = self._str_val("Terrain Type") hydro_raw = self._next_val("Electricity generated [kWh/year]") hydro = float(hydro_raw) if hydro_raw else 0.0 return Renewables( solar_water_heating=self._bool_val("Solar Water Heating"), wwhrs_present=self._bool_val("Is WWHRS present in the property?"), flue_gas_heat_recovery_present=fghrs, photovoltaic_panel=self._str_val("Photovoltaic Panel"), export_capable_meter=self._bool_val("Export capable meter"), wind_turbine_present=self._bool_val("Wind turbine present?"), wind_turbines_terrain_type=terrain, hydro_electricity_generated_kwh=hydro, ) def extract(self) -> ElmhurstSiteNotes: emissions_raw = self._next_val("Emissions (t/year)") co2 = float(emissions_raw.split()[0]) if emissions_raw else 0.0 return ElmhurstSiteNotes( surveyor_info=self._extract_surveyor_info(), property_details=self._extract_property_details(), current_sap_rating=self._rating_val("Current SAP rating"), potential_sap_rating=self._rating_val("Potential SAP rating"), current_ei_rating=self._rating_val("Current EI rating"), potential_ei_rating=self._rating_val("Potential EI rating"), co2_emissions_current_t=co2, property_type=self._str_val("1.0 Property type"), attachment=self._extract_attachment(), number_of_storeys=self._int_val("Storeys"), habitable_rooms=self._int_val("Habitable Rooms"), heated_habitable_rooms=self._int_val("Heated Habitable Rooms"), construction_age_band=self._str_val("Main Property"), dimensions=self._extract_dimensions(), has_conservatory=self._bool_val("Is there a conservatory?"), walls=self._extract_walls(), roof=self._extract_roof(), floor=self._extract_floor(), door_count=self._int_val("Total Number of Doors"), insulated_door_count=self._int_val("Number of Insulated Doors"), windows=self._extract_windows(), draught_proofing_percent=self._int_val("Draught Proofing"), ventilation=self._extract_ventilation(), lighting=self._extract_lighting(), main_heating=self._extract_main_heating(), meters=self._extract_meters(), water_heating=self._extract_water_heating(), baths_and_showers=self._extract_baths_and_showers(), renewables=self._extract_renewables(), )