Model/backend/documents_parser/elmhurst_extractor.py
Khalim Conn-Kowlessar 36f2c7bbdf Slice 46a: Elmhurst mapper handles multi-bp Summary PDFs — Summary_000474 chain test flips green
ElmhurstSiteNotes had no representation for extensions: singular dimensions / walls / roof / floor fields could only describe the main bp. Summary PDFs lodge "1st Extension" / "2nd Extension" subsections in §4, §7, §8, §9 with optional "As Main: Yes" inheritance. This slice:

- Adds `ExtensionPart` dataclass and `ElmhurstSiteNotes.extensions: List[ExtensionPart]`.
- Adds `_split_section_by_bp` helper + per-bp parsing of dimensions / walls / roof / floor in the extractor; "As Main" inherits from the main bp.
- Refactors `_map_elmhurst_building_part` into a parameterised builder; adds `_map_elmhurst_building_parts` that yields Main + one SapBuildingPart per extension (capped at 4 per RdSAP10 §1.2).
- Scaffold test `test_summary_000474_mapper_produces_three_building_parts` flips from strict-xfail to passing.

Single-bp behaviour is unchanged (empty extensions list defaults). 752 existing tests stay green.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-24 17:55:13 +00:00

577 lines
24 KiB
Python

import re
from datetime import date, datetime
from typing import List, Optional
from datatypes.epc.surveys.elmhurst_site_notes import (
BathsAndShowers,
BuildingPartDimensions,
ElmhurstSiteNotes,
ExtensionPart,
FloorDetails,
FloorDimension,
Lighting,
MainHeating,
Meters,
PropertyDetails,
Renewables,
RoofDetails,
Shower,
SurveyorInfo,
VentilationAndCooling,
WallDetails,
WaterHeating,
Window,
)
class ElmhurstSiteNotesExtractor:
def __init__(self, pages: List[str]) -> None:
self._text = "\n".join(pages)
self._lines = [l.strip() for l in self._text.splitlines() if l.strip()]
# --- generic helpers ---
def _next_val(self, label: str) -> Optional[str]:
lc = label.rstrip(":") + ":"
lb = label.rstrip(":")
for i, line in enumerate(self._lines):
if line.startswith(lc) and len(line) > len(lc):
return line[len(lc):].strip() or None
if line == lc or line == lb:
for j in range(i + 1, min(i + 4, len(self._lines))):
v = self._lines[j]
if v.endswith(":") or v.startswith("©"):
return None
if v:
return v
return None
return None
def _str_val(self, label: str) -> str:
v = self._next_val(label)
return " ".join(v.split()) if v else ""
def _opt_str(self, label: str) -> Optional[str]:
v = self._next_val(label)
return " ".join(v.split()) if v else None
def _bool_val(self, label: str) -> bool:
v = self._next_val(label)
return v is not None and v.lower() == "yes"
def _int_val(self, label: str) -> int:
v = self._next_val(label)
try:
return int(v.split()[0]) if v else 0
except (ValueError, IndexError):
return 0
def _date_val(self, label: str) -> date:
v = self._next_val(label)
if not v:
raise ValueError(f"Missing date for label: {label}")
return datetime.strptime(v.strip(), "%d/%m/%Y").date()
def _between(self, start: str, end: str) -> str:
try:
s = self._text.index(start) + len(start)
e = self._text.index(end, s)
return self._text[s:e]
except ValueError:
return ""
# Multi-bp helpers: Summary PDFs subdivide §4/§7/§8/§9 with explicit
# "Main Property" / "1st Extension" / "2nd Extension" headers. The
# existing single-bp fixture also carries "Main Property" as a header
# before the body. This helper splits a section into per-bp chunks.
_BP_HEADER_RE = re.compile(
r"^(Main Property|\d+(?:st|nd|rd|th) Extension)\s*$",
re.MULTILINE,
)
def _split_section_by_bp(self, section_text: str) -> List[tuple[str, str]]:
"""Split a section's text into per-bp subsections.
Returns ``[(bp_name, body), ...]`` in document order. Body is
the text between this bp's header and the next bp's header
(exclusive). Returns ``[("Main Property", section_text)]`` when
no headers are found (defensive fallback for malformed PDFs).
"""
matches = list(self._BP_HEADER_RE.finditer(section_text))
if not matches:
return [("Main Property", section_text)]
result: List[tuple[str, str]] = []
for i, m in enumerate(matches):
name = m.group(1)
body_start = m.end()
body_end = (
matches[i + 1].start() if i + 1 < len(matches) else len(section_text)
)
result.append((name, section_text[body_start:body_end]))
return result
def _section_lines(self, start: str, end: str) -> List[str]:
text = self._between(start, end)
return [l.strip() for l in text.splitlines() if l.strip()]
def _local_val(self, lines: List[str], label: str) -> Optional[str]:
lb = label.rstrip(":")
lc = lb + ":"
for i, line in enumerate(lines):
if line.startswith(lc) and len(line) > len(lc):
return line[len(lc):].strip() or None
if line == lc or line == lb:
for j in range(i + 1, min(i + 4, len(lines))):
v = lines[j]
if v.endswith(":") or v.startswith("©"):
return None
if v:
return v
return None
return None
def _local_str(self, lines: List[str], label: str) -> str:
v = self._local_val(lines, label)
return " ".join(v.split()) if v else ""
def _local_bool(self, lines: List[str], label: str) -> bool:
v = self._local_val(lines, label)
return v is not None and v.lower() == "yes"
# --- section extractors ---
def _extract_surveyor_info(self) -> SurveyorInfo:
return SurveyorInfo(
surveyor_code=self._str_val("Surveyor"),
name=self._str_val("Name"),
title=self._str_val("Title"),
tel_number=self._str_val("Tel Number"),
survey_reference=self._str_val("Survey Reference"),
my_reference=self._opt_str("My Reference"),
)
def _extract_property_details(self) -> PropertyDetails:
epc_m = re.search(
r"Check for the existence of\nan EPC:\n(Yes|No)", self._text
)
epc_exists = epc_m.group(1).lower() == "yes" if epc_m else False
return PropertyDetails(
rdsap_version=self._str_val("RdSAP version"),
reference_number=self._str_val("Reference Number"),
lodgement_required=self._bool_val("Lodgement Required"),
regs_region=self._str_val("Regs Region"),
epc_language=self._str_val("EPC Language"),
postcode=self._str_val("Postcode"),
region=self._str_val("Region"),
street=self._str_val("Street"),
town=self._str_val("Town"),
tenure=self._str_val("Property Tenure"),
transaction_type=self._str_val("Transaction Type"),
inspection_date=self._date_val("Inspection Date"),
process_date=self._date_val("Process date"),
epc_exists=epc_exists,
uprn=self._opt_str("UPRN"),
house_name=self._opt_str("House Name"),
house_number=self._opt_str("House No"),
locality=self._opt_str("Locality"),
county=self._opt_str("County"),
)
def _extract_attachment(self) -> str:
m = re.search(r"1\.0 Property type:\n[^\n]+\n([^\n]+)", self._text)
return " ".join(m.group(1).strip().split()) if m else ""
def _floors_from_dimensions_body(self, body: str) -> List[FloorDimension]:
"""Parse FloorDimension entries from a single bp's §4 body."""
matches = re.findall(
r"([A-Za-z ]+Floor):\n([\d.]+)\n([\d.]+)\n([\d.]+)\n([\d.]+)",
body,
)
return [
FloorDimension(
name=name.strip(),
area_m2=float(area),
room_height_m=float(height),
heat_loss_perimeter_m=float(hlp),
party_wall_length_m=float(pwl),
)
for name, area, height, hlp, pwl in matches
]
def _extract_dimensions(self) -> BuildingPartDimensions:
"""Main-property dimensions only. Extensions are picked up by
`_extract_extensions`."""
dim_type = self._str_val("Dimension type")
section = self._between("4.0 Dimensions:", "5.0 Conservatory:")
bp_chunks = self._split_section_by_bp(section)
main_body = bp_chunks[0][1] if bp_chunks else section
return BuildingPartDimensions(
dimension_type=dim_type,
floors=self._floors_from_dimensions_body(main_body),
)
def _wall_details_from_lines(self, lines: List[str]) -> WallDetails:
thickness_raw = self._local_val(lines, "Wall Thickness")
thickness_mm = (
int(thickness_raw.split()[0]) if thickness_raw else None
)
return WallDetails(
wall_type=self._local_str(lines, "Type"),
insulation=self._local_str(lines, "Insulation"),
thickness_unknown=self._local_bool(lines, "Wall Thickness Unknown"),
u_value_known=self._local_bool(lines, "U-value Known"),
party_wall_type=self._local_str(lines, "Party Wall Type"),
thickness_mm=thickness_mm,
)
def _extract_walls(self) -> WallDetails:
section = self._between("7.0 Walls:", "8.0 Roofs:")
bp_chunks = self._split_section_by_bp(section)
main_body = bp_chunks[0][1] if bp_chunks else section
lines = [l.strip() for l in main_body.splitlines() if l.strip()]
return self._wall_details_from_lines(lines)
def _roof_details_from_lines(self, lines: List[str]) -> RoofDetails:
thickness_raw = self._local_val(lines, "Insulation Thickness")
thickness_mm = (
int(thickness_raw.split()[0]) if thickness_raw and thickness_raw.split()[0].isdigit() else None
)
return RoofDetails(
roof_type=self._local_str(lines, "Type"),
insulation=self._local_str(lines, "Insulation"),
u_value_known=self._local_bool(lines, "U-value Known"),
insulation_thickness_mm=thickness_mm,
)
def _extract_roof(self) -> RoofDetails:
section = self._between("8.0 Roofs:", "8.1 Rooms in Roof:")
bp_chunks = self._split_section_by_bp(section)
main_body = bp_chunks[0][1] if bp_chunks else section
lines = [l.strip() for l in main_body.splitlines() if l.strip()]
return self._roof_details_from_lines(lines)
def _floor_details_from_lines(self, lines: List[str]) -> FloorDetails:
u_val_raw = self._local_val(lines, "Default U-value")
default_u = float(u_val_raw) if u_val_raw else None
return FloorDetails(
location=self._local_str(lines, "Location"),
floor_type=self._local_str(lines, "Type"),
insulation=self._local_str(lines, "Insulation"),
u_value_known=self._local_bool(lines, "U-value Known"),
default_u_value=default_u,
)
def _extract_floor(self) -> FloorDetails:
section = self._between("9.0 Floors:", "10.0 Doors:")
bp_chunks = self._split_section_by_bp(section)
main_body = bp_chunks[0][1] if bp_chunks else section
lines = [l.strip() for l in main_body.splitlines() if l.strip()]
return self._floor_details_from_lines(lines)
def _extract_extensions(self) -> List[ExtensionPart]:
"""Collect non-Main building parts. Cross-references the §4, §7,
§8, §9 per-bp subsections by extension name. "As Main: Yes"
within a section body inherits the main bp's data for that
section; otherwise the section body is parsed in isolation."""
# Gather per-section chunks once.
dim_section = self._between("4.0 Dimensions:", "5.0 Conservatory:")
wall_section = self._between("7.0 Walls:", "8.0 Roofs:")
roof_section = self._between("8.0 Roofs:", "8.1 Rooms in Roof:")
floor_section = self._between("9.0 Floors:", "10.0 Doors:")
dim_type = self._str_val("Dimension type")
dim_chunks = dict(self._split_section_by_bp(dim_section))
wall_chunks = dict(self._split_section_by_bp(wall_section))
roof_chunks = dict(self._split_section_by_bp(roof_section))
floor_chunks = dict(self._split_section_by_bp(floor_section))
main_walls = self._extract_walls()
main_roof = self._extract_roof()
main_floor = self._extract_floor()
# Per-bp age-band lookup. Section 3 contains lines like
# "1st Extension B 1900-1929" — the band sits after the name.
age_band_re = re.compile(
r"^(\d+(?:st|nd|rd|th) Extension)\s+([A-M] [^\n]+)$",
re.MULTILINE,
)
age_bands = {m.group(1): m.group(2).strip() for m in age_band_re.finditer(self._text)}
# Collect names in document order from the dimensions section
# (excluding Main Property).
names = [
name for name, _ in self._split_section_by_bp(dim_section)
if name != "Main Property"
]
extensions: List[ExtensionPart] = []
for name in names:
dim_body = dim_chunks.get(name, "")
wall_body = wall_chunks.get(name, "")
roof_body = roof_chunks.get(name, "")
floor_body = floor_chunks.get(name, "")
wall_lines = [l.strip() for l in wall_body.splitlines() if l.strip()]
roof_lines = [l.strip() for l in roof_body.splitlines() if l.strip()]
floor_lines = [l.strip() for l in floor_body.splitlines() if l.strip()]
walls = main_walls if self._local_bool(wall_lines, "As Main Wall") else self._wall_details_from_lines(wall_lines)
roof = main_roof if self._local_bool(roof_lines, "As Main") else self._roof_details_from_lines(roof_lines)
floor = main_floor if self._local_bool(floor_lines, "As Main") else self._floor_details_from_lines(floor_lines)
extensions.append(
ExtensionPart(
name=name,
construction_age_band=age_bands.get(name, ""),
dimensions=BuildingPartDimensions(
dimension_type=dim_type,
floors=self._floors_from_dimensions_body(dim_body),
),
walls=walls,
roof=roof,
floor=floor,
)
)
return extensions
def _extract_windows(self) -> List[Window]:
m = re.search(
r"Permanent\s+Shutters\n(.*?)Draught Proofing",
self._text,
re.DOTALL,
)
if not m:
return []
tokens = [t.strip() for t in m.group(1).splitlines() if t.strip()]
windows: List[Window] = []
i = 0
while i + 12 < len(tokens):
try:
width_m = float(tokens[i])
height_m = float(tokens[i + 1])
area_m2 = float(tokens[i + 2])
except (ValueError, IndexError):
i += 1
continue
i += 3
# Collect glazing type tokens until frame_factor (0 < v ≤ 1.0)
glazing_parts: List[str] = []
while i < len(tokens):
try:
v = float(tokens[i])
if 0.0 < v <= 1.0:
break
glazing_parts.append(tokens[i])
except ValueError:
glazing_parts.append(tokens[i])
i += 1
# If last glazing token is a single word (no spaces, not numeric) it's the frame_type
frame_type: Optional[str] = None
if glazing_parts and " " not in glazing_parts[-1] and not glazing_parts[-1].replace(".", "").isdigit():
frame_type = glazing_parts.pop()
glazing_type = " ".join(glazing_parts).strip()
if i >= len(tokens):
break
frame_factor = float(tokens[i]); i += 1
# Consume glazing_gap if present ("mm" token, possibly multi-token e.g. "16 mm or more")
glazing_gap: Optional[str] = None
if i < len(tokens) and "mm" in tokens[i]:
gap_parts = [tokens[i]]; i += 1
while i < len(tokens) and tokens[i].lower() in {"or", "more"}:
gap_parts.append(tokens[i]); i += 1
glazing_gap = " ".join(gap_parts)
building_part = tokens[i]; i += 1
location = tokens[i]; i += 1
orientation = tokens[i]; i += 1
data_source = tokens[i]; i += 1
u_value = float(tokens[i]); i += 1
g_value = float(tokens[i]); i += 1
draught_proofed = tokens[i].lower() == "yes"; i += 1
permanent_shutters = tokens[i]; i += 1
windows.append(
Window(
width_m=width_m,
height_m=height_m,
area_m2=area_m2,
glazing_type=glazing_type,
frame_factor=frame_factor,
building_part=building_part,
location=location,
orientation=orientation,
data_source=data_source,
u_value=u_value,
g_value=g_value,
draught_proofed=draught_proofed,
permanent_shutters=permanent_shutters,
frame_type=frame_type,
glazing_gap=glazing_gap,
)
)
return windows
def _extract_ventilation(self) -> VentilationAndCooling:
return VentilationAndCooling(
open_chimneys_count=self._int_val("No. of open chimneys"),
open_flues_count=self._int_val("No. of open flues"),
open_chimneys_closed_fire_count=self._int_val(
"No. of open chimneys/open flues attached to closed fire"
),
solid_fuel_boiler_flues_count=self._int_val(
"No. of flues attached to solid fuel boiler"
),
other_heater_flues_count=self._int_val(
"No. of open flues attached to other heater"
),
blocked_chimneys_count=self._int_val("No. of blocked chimneys"),
extract_fans_count=self._int_val("No. of intermittent extract fans"),
passive_vents_count=self._int_val("No. of passive vents"),
flueless_gas_fires_count=self._int_val("No. of flueless gas fires"),
fixed_space_cooling=self._bool_val("Fixed Space Cooling"),
draught_lobby=self._str_val("Draught Lobby"),
mechanical_ventilation=self._bool_val("Mechanical Ventilation"),
pressure_test_method=self._str_val("Test Method"),
)
def _extract_lighting(self) -> Lighting:
led_cfl_count_known = self._bool_val("Number of LED and CFL Known")
return Lighting(
total_bulbs=self._int_val("Total number of bulbs"),
led_cfl_count_known=led_cfl_count_known,
led_count=self._int_val("Number of LED lights"),
cfl_count=self._int_val("Number of CFL lights"),
incandescent_count=self._int_val("Total number of incandescents"),
low_energy_count=(
0 if led_cfl_count_known
else self._int_val("Total number of Low Energy")
),
)
def _extract_main_heating(self) -> MainHeating:
lines = self._section_lines("14.0 Main Heating1", "14.1 Main Heating2")
pct_raw = self._local_val(lines, "Percentage of Heat")
pct = int(pct_raw.split()[0]) if pct_raw else 0
return MainHeating(
heat_emitter=self._local_str(lines, "Heat Emitter"),
fuel_type=self._local_str(lines, "Fuel Type"),
flue_type=self._local_str(lines, "Flue Type"),
fan_assisted_flue=self._local_bool(lines, "Fan Assisted Flue"),
design_flow_temperature=self._local_str(lines, "Design flow temperature"),
heating_controls_ees=self._local_str(lines, "Main Heating Controls EES"),
heating_controls_sap=self._local_str(lines, "Main Heating Controls Sap"),
percentage_of_heat=pct,
pcdf_boiler_reference=self._local_val(lines, "PCDF boiler Reference"),
heat_pump_age=self._local_val(lines, "Heat pump age"),
)
def _extract_meters(self) -> Meters:
return Meters(
electricity_meter_type=self._str_val("Electricity meter type"),
main_gas=self._bool_val("Main gas"),
electricity_smart_meter=self._bool_val("Electricity Smart Meter Present"),
gas_smart_meter=self._bool_val("Gas Smart Meter Present"),
)
def _extract_water_heating(self) -> WaterHeating:
return WaterHeating(
water_heating_code=self._str_val("Water Heating Code"),
water_heating_sap_code=self._int_val("Water Heating SapCode"),
water_heating_fuel_type=self._str_val("Water Heating Fuel Type"),
hot_water_cylinder_present=self._bool_val("Hot Water Cylinder Present"),
)
def _extract_baths_and_showers(self) -> BathsAndShowers:
n_baths = self._int_val("Total Number of Baths")
n_connected = self._int_val("Number of Baths Connected")
try:
idx = self._lines.index("Connected")
except ValueError:
return BathsAndShowers(
number_of_baths=n_baths,
number_of_baths_connected=n_connected,
showers=[],
)
showers: List[Shower] = []
j = idx + 1
while j + 2 <= len(self._lines) - 1:
num_line = self._lines[j]
if not num_line.isdigit():
break
showers.append(
Shower(
shower_number=int(num_line),
outlet_type=self._lines[j + 1],
connected=self._lines[j + 2],
)
)
j += 3
return BathsAndShowers(
number_of_baths=n_baths,
number_of_baths_connected=n_connected,
showers=showers,
)
def _rating_val(self, label: str) -> int:
v = self._next_val(label)
try:
return int(v.split()[-1]) if v else 0
except (ValueError, IndexError):
return 0
def _extract_renewables(self) -> Renewables:
fghrs_lines = self._section_lines(
"18.0 Flue Gas Heat Recovery System", "19.0 Photovoltaic Panel"
)
fghrs = self._local_bool(fghrs_lines, "Present")
terrain = self._str_val("Terrain Type")
hydro_raw = self._next_val("Electricity generated [kWh/year]")
hydro = float(hydro_raw) if hydro_raw else 0.0
return Renewables(
solar_water_heating=self._bool_val("Solar Water Heating"),
wwhrs_present=self._bool_val("Is WWHRS present in the property?"),
flue_gas_heat_recovery_present=fghrs,
photovoltaic_panel=self._str_val("Photovoltaic Panel"),
export_capable_meter=self._bool_val("Export capable meter"),
wind_turbine_present=self._bool_val("Wind turbine present?"),
wind_turbines_terrain_type=terrain,
hydro_electricity_generated_kwh=hydro,
)
def extract(self) -> ElmhurstSiteNotes:
emissions_raw = self._next_val("Emissions (t/year)")
co2 = float(emissions_raw.split()[0]) if emissions_raw else 0.0
return ElmhurstSiteNotes(
surveyor_info=self._extract_surveyor_info(),
property_details=self._extract_property_details(),
current_sap_rating=self._rating_val("Current SAP rating"),
potential_sap_rating=self._rating_val("Potential SAP rating"),
current_ei_rating=self._rating_val("Current EI rating"),
potential_ei_rating=self._rating_val("Potential EI rating"),
co2_emissions_current_t=co2,
property_type=self._str_val("1.0 Property type"),
attachment=self._extract_attachment(),
number_of_storeys=self._int_val("Storeys"),
habitable_rooms=self._int_val("Habitable Rooms"),
heated_habitable_rooms=self._int_val("Heated Habitable Rooms"),
construction_age_band=self._str_val("Main Property"),
dimensions=self._extract_dimensions(),
has_conservatory=self._bool_val("Is there a conservatory?"),
walls=self._extract_walls(),
roof=self._extract_roof(),
floor=self._extract_floor(),
door_count=self._int_val("Total Number of Doors"),
insulated_door_count=self._int_val("Number of Insulated Doors"),
windows=self._extract_windows(),
draught_proofing_percent=self._int_val("Draught Proofing"),
ventilation=self._extract_ventilation(),
lighting=self._extract_lighting(),
main_heating=self._extract_main_heating(),
meters=self._extract_meters(),
water_heating=self._extract_water_heating(),
baths_and_showers=self._extract_baths_and_showers(),
renewables=self._extract_renewables(),
extensions=self._extract_extensions(),
)