Model/backend/documents_parser/elmhurst_extractor.py
Khalim Conn-Kowlessar 2b1afa7339 S0380.204: extract Main Heating2's own emitter + control (§14.1)
Prerequisite for the SAP 10.2 p.186 two-systems-different-parts MIT.
When two main systems heat different parts of a dwelling, §14.1 Main
Heating2 lodges its OWN "Heat Emitter" + "Main Heating Controls Sap"
(simulated case 6: Main 1 radiators / control 2106 serving the living
area, Main 2 underfloor / control 2110 serving elsewhere). The extractor
+ mapper dropped both — `MainHeatingDetail.heat_emitter_type` and
`main_heating_control` came through as empty-string sentinels, so the
cascade saw system 2 as having no responsiveness (defaulted R=1.0) and no
control type.

- `MainHeating2` datatype gains `heat_emitter` + `heating_controls_sap`.
- The extractor reads them from the §14.1 block.
- `_map_elmhurst_main_heating_2` maps them via the same helpers as Main 1
  (`_elmhurst_heat_emitter_int` → underfloor-in-screed = emitter 2, Table
  4d R=0.75; `_elmhurst_sap_control_code` → 2110, Table 4e type 3),
  threading the dwelling floor + age band for the underfloor subtype.

Empty-string fallback preserved for the legacy DHW-only Main 2 (cert
000565 §14.1 omits emitter/control). No cascade output changes yet — the
MIT consumer lands in S0380.205. Full suite 2358 pass + 0 fail.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 15:53:32 +00:00

1679 lines
76 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from datetime import date, datetime
from typing import List, Optional
from datatypes.epc.surveys.elmhurst_site_notes import (
AlternativeWall,
BathsAndShowers,
BuildingPartDimensions,
CommunityHeating,
ElmhurstSiteNotes,
ExtensionPart,
FloorDetails,
FloorDimension,
Lighting,
MainHeating,
MainHeating2,
Meters,
PropertyDetails,
Renewables,
RoofDetails,
RoomInRoof,
RoomInRoofSurface,
Shower,
SurveyorInfo,
VentilationAndCooling,
ElmhurstPvArray,
WallDetails,
WaterHeating,
Window,
)
def _parse_solar_pitch_deg(raw: Optional[str]) -> Optional[int]:
"""Parse the §16.0 "Collector elevation" lodgement (e.g. "30°", "60°",
or a bare integer). Returns None when absent or unparseable."""
if not raw:
return None
m = re.search(r"(\d+)", raw)
return int(m.group(1)) if m else None
class ElmhurstSiteNotesExtractor:
def __init__(self, pages: List[str]) -> None:
self._text = "\n".join(pages)
self._lines = [l.strip() for l in self._text.splitlines() if l.strip()]
# --- generic helpers ---
def _next_val(self, label: str) -> Optional[str]:
lc = label.rstrip(":") + ":"
lb = label.rstrip(":")
for i, line in enumerate(self._lines):
if line.startswith(lc) and len(line) > len(lc):
return line[len(lc):].strip() or None
if line == lc or line == lb:
for j in range(i + 1, min(i + 4, len(self._lines))):
v = self._lines[j]
if v.endswith(":") or v.startswith("©"):
return None
if v:
return v
return None
return None
def _str_val(self, label: str) -> str:
v = self._next_val(label)
return " ".join(v.split()) if v else ""
def _opt_str(self, label: str) -> Optional[str]:
v = self._next_val(label)
return " ".join(v.split()) if v else None
def _bool_val(self, label: str) -> bool:
v = self._next_val(label)
return v is not None and v.lower() == "yes"
def _int_val(self, label: str) -> int:
v = self._next_val(label)
try:
return int(v.split()[0]) if v else 0
except (ValueError, IndexError):
return 0
def _date_val(self, label: str) -> date:
v = self._next_val(label)
if not v:
raise ValueError(f"Missing date for label: {label}")
return datetime.strptime(v.strip(), "%d/%m/%Y").date()
def _between(self, start: str, end: str) -> str:
try:
s = self._text.index(start) + len(start)
e = self._text.index(end, s)
return self._text[s:e]
except ValueError:
return ""
# Multi-bp helpers: Summary PDFs subdivide §4/§7/§8/§9 with explicit
# "Main Property" / "1st Extension" / "2nd Extension" headers. The
# existing single-bp fixture also carries "Main Property" as a header
# before the body. This helper splits a section into per-bp chunks.
_BP_HEADER_RE = re.compile(
r"^(Main Property|\d+(?:st|nd|rd|th) Extension)\s*$",
re.MULTILINE,
)
def _split_section_by_bp(self, section_text: str) -> List[tuple[str, str]]:
"""Split a section's text into per-bp subsections.
Returns ``[(bp_name, body), ...]`` in document order. Body is
the text between this bp's header and the next bp's header
(exclusive). Returns ``[("Main Property", section_text)]`` when
no headers are found (defensive fallback for malformed PDFs).
"""
matches = list(self._BP_HEADER_RE.finditer(section_text))
if not matches:
return [("Main Property", section_text)]
result: List[tuple[str, str]] = []
for i, m in enumerate(matches):
name = m.group(1)
body_start = m.end()
body_end = (
matches[i + 1].start() if i + 1 < len(matches) else len(section_text)
)
result.append((name, section_text[body_start:body_end]))
return result
def _section_lines(self, start: str, end: str) -> List[str]:
text = self._between(start, end)
return [l.strip() for l in text.splitlines() if l.strip()]
def _section_lines_first_end(
self, start: str, ends: tuple[str, ...],
) -> List[str]:
"""Like `_section_lines` but accepts multiple end-marker candidates
and uses whichever appears first after `start`. Defends against
Summary-shape variants where the next-section heading differs
(e.g. §14.0 Main Heating1 closes at "14.1 Main Heating2" on
boiler/HP certs but at "14.1 Community Heating" on community-
heated certs)."""
try:
s = self._text.index(start) + len(start)
except ValueError:
return []
earliest: int | None = None
for end in ends:
try:
idx = self._text.index(end, s)
except ValueError:
continue
if earliest is None or idx < earliest:
earliest = idx
if earliest is None:
return []
text = self._text[s:earliest]
return [l.strip() for l in text.splitlines() if l.strip()]
def _local_val(self, lines: List[str], label: str) -> Optional[str]:
lb = label.rstrip(":")
lc = lb + ":"
for i, line in enumerate(lines):
if line.startswith(lc) and len(line) > len(lc):
return line[len(lc):].strip() or None
if line == lc or line == lb:
for j in range(i + 1, min(i + 4, len(lines))):
v = lines[j]
if v.endswith(":") or v.startswith("©"):
return None
if v:
return v
return None
return None
def _local_str(self, lines: List[str], label: str) -> str:
v = self._local_val(lines, label)
return " ".join(v.split()) if v else ""
def _local_bool(self, lines: List[str], label: str) -> bool:
v = self._local_val(lines, label)
return v is not None and v.lower() == "yes"
# --- section extractors ---
def _extract_surveyor_info(self) -> SurveyorInfo:
return SurveyorInfo(
surveyor_code=self._str_val("Surveyor"),
name=self._str_val("Name"),
title=self._str_val("Title"),
tel_number=self._str_val("Tel Number"),
survey_reference=self._str_val("Survey Reference"),
my_reference=self._opt_str("My Reference"),
)
def _extract_property_details(self) -> PropertyDetails:
epc_m = re.search(
r"Check for the existence of\nan EPC:\n(Yes|No)", self._text
)
epc_exists = epc_m.group(1).lower() == "yes" if epc_m else False
return PropertyDetails(
rdsap_version=self._str_val("RdSAP version"),
reference_number=self._str_val("Reference Number"),
lodgement_required=self._bool_val("Lodgement Required"),
regs_region=self._str_val("Regs Region"),
epc_language=self._str_val("EPC Language"),
postcode=self._str_val("Postcode"),
region=self._str_val("Region"),
street=self._str_val("Street"),
town=self._str_val("Town"),
tenure=self._str_val("Property Tenure"),
transaction_type=self._str_val("Transaction Type"),
inspection_date=self._date_val("Inspection Date"),
process_date=self._date_val("Process date"),
epc_exists=epc_exists,
uprn=self._opt_str("UPRN"),
house_name=self._opt_str("House Name"),
house_number=self._opt_str("House No"),
locality=self._opt_str("Locality"),
county=self._opt_str("County"),
)
def _extract_attachment(self) -> str:
"""Extract the Summary's "attachment" line — the §1.0 built-form
descriptor (e.g. "M Mid-Terrace", "D Detached") that sits
between the property-type value and the §2.0 section header
for HOUSES.
Flats DON'T lodge an attachment line in the Elmhurst Summary;
the §2.0 Number of Storeys header follows immediately after
the "F Flat" property-type value. Detect that case and return
"" so the mapper's `built_form` doesn't capture section-
header noise.
"""
m = re.search(r"1\.0 Property type:\n[^\n]+\n([^\n]+)", self._text)
if not m:
return ""
candidate = " ".join(m.group(1).strip().split())
if re.match(r"^\d+\.\d+\s", candidate) or "Number of Storeys" in candidate:
return ""
return candidate
def _floors_from_dimensions_body(self, body: str) -> List[FloorDimension]:
"""Parse FloorDimension entries from a single bp's §4 body."""
matches = re.findall(
r"([A-Za-z ]+Floor):\n([\d.]+)\n([\d.]+)\n([\d.]+)\n([\d.]+)",
body,
)
return [
FloorDimension(
name=name.strip(),
area_m2=float(area),
room_height_m=float(height),
heat_loss_perimeter_m=float(hlp),
party_wall_length_m=float(pwl),
)
for name, area, height, hlp, pwl in matches
]
def _extract_dimensions(self) -> BuildingPartDimensions:
"""Main-property dimensions only. Extensions are picked up by
`_extract_extensions`."""
dim_type = self._str_val("Dimension type")
section = self._between("4.0 Dimensions:", "5.0 Conservatory:")
bp_chunks = self._split_section_by_bp(section)
main_body = bp_chunks[0][1] if bp_chunks else section
return BuildingPartDimensions(
dimension_type=dim_type,
floors=self._floors_from_dimensions_body(main_body),
)
def _wall_details_from_lines(self, lines: List[str]) -> WallDetails:
thickness_raw = self._local_val(lines, "Wall Thickness")
thickness_mm = (
int(thickness_raw.split()[0]) if thickness_raw else None
)
# Composite / retrofit insulation thickness — Summary §7.0
# writes the value on the line pair "Insulation Thickness" /
# "100 mm" when a composite filled-cavity-plus-external (or
# equivalent) wall is lodged. The "Insulation Thickness" label
# is local-scoped inside the §7 block so it does not collide
# with the §8 Roofs / §9 Floors blocks. None when the PDF
# omits the line (no retrofit lodged).
ins_thickness_raw = self._local_val(lines, "Insulation Thickness")
insulation_thickness_mm = self._parse_thickness_mm(ins_thickness_raw)
return WallDetails(
wall_type=self._local_str(lines, "Type"),
insulation=self._local_str(lines, "Insulation"),
thickness_unknown=self._local_bool(lines, "Wall Thickness Unknown"),
u_value_known=self._local_bool(lines, "U-value Known"),
party_wall_type=self._local_str(lines, "Party Wall Type"),
thickness_mm=thickness_mm,
insulation_thickness_mm=insulation_thickness_mm,
alternative_walls=self._alternative_walls_from_lines(lines),
# Summary §7 lodges the per-BP "Curtain Wall Age" line only
# when `Type: CW Curtain Wall`. Per RdSAP 10 §5.18 (PDF
# p.48) this drives the curtain-wall U-value (Post 2023 →
# 1.4; Pre 2023 → 2.0) independent of the dwelling-wide
# age band. Use `_local_val` (Optional[str]) so absent
# lines surface as None, not the empty-string sentinel
# `_local_str` returns.
curtain_wall_age=self._local_val(lines, "Curtain Wall Age"),
)
def _alternative_walls_from_lines(self, lines: List[str]) -> List[AlternativeWall]:
"""Parse up to two §7 "Alternative Wall N" sub-area lodgements.
The Elmhurst Summary PDF lays them out as a contiguous block of
prefixed labels ("Alternative Wall 1 Area", "Alternative Wall 1
Type", …); we read each numbered slot independently and drop
slots whose Area is missing/zero."""
result: List[AlternativeWall] = []
for n in (1, 2):
area_raw = self._local_val(lines, f"Alternative Wall {n} Area")
if not area_raw:
continue
try:
area = float(area_raw.split()[0])
except (ValueError, IndexError):
continue
if area <= 0:
continue
thickness_raw = self._local_val(lines, f"Alternative Wall {n} Thickness")
thickness_mm = self._parse_thickness_mm(thickness_raw)
result.append(AlternativeWall(
area_m2=area,
wall_type=self._local_str(lines, f"Alternative Wall {n} Type"),
insulation=self._local_str(lines, f"Alternative Wall {n} Insulation"),
thickness_unknown=self._local_bool(
lines, f"Alternative Wall {n} Thickness Unknown"
),
thickness_mm=thickness_mm,
u_value_known=self._local_bool(
lines, f"Alternative Wall {n} U-value Known"
),
# RdSAP10 §5.8 + Table 14: dry-lined uninsulated wall adds
# R = 0.17 m²K/W to base U. Cohort fixture: cert 7700
# Alt 1 "CavityWallPlasterOnDabs" lodges Dry-lining: Yes →
# U = 1/(1/1.5 + 0.17) ≈ 1.20.
dry_lined=self._local_bool(
lines, f"Alternative Wall {n} Dry-lining"
),
))
return result
def _extract_walls(self) -> WallDetails:
section = self._between("7.0 Walls:", "8.0 Roofs:")
bp_chunks = self._split_section_by_bp(section)
main_body = bp_chunks[0][1] if bp_chunks else section
lines = [l.strip() for l in main_body.splitlines() if l.strip()]
return self._wall_details_from_lines(lines)
@staticmethod
def _parse_thickness_mm(raw: Optional[str]) -> Optional[int]:
"""Parse an Elmhurst "Insulation Thickness" cell ("100 mm",
"400+ mm") to integer mm. The bucket-cap "400+ mm" (Table 17/18
max tabulated row) carries a trailing "+" that a bare
`.split()[0].isdigit()` test rejects — strip to the leading
digits so the cap parses through to the cascade with its numeric
value (simulated case 5: roof "400+ mm" was silently dropped →
u_roof fell back to the age-J default 0.16 instead of the
300mm+ value 0.11). Returns None when the cell is absent or
carries no leading number ("As Built", "N None")."""
if not raw:
return None
match = re.match(r"\d+", raw.strip())
return int(match.group()) if match else None
def _roof_details_from_lines(self, lines: List[str]) -> RoofDetails:
thickness_raw = self._local_val(lines, "Insulation Thickness")
thickness_mm = self._parse_thickness_mm(thickness_raw)
insulation = self._local_str(lines, "Insulation")
# The Summary PDF omits the "Insulation Thickness" line entirely
# when no retrofit insulation is lodged (e.g. "Insulation: N None"
# on 000516). Treat that case as 0 mm so the cascade picks Table
# 16 row 0 (U=2.30) rather than the age-band default — the
# surveyor explicitly recorded "None".
if thickness_mm is None and insulation.split(" ", 1)[0] == "N":
thickness_mm = 0
return RoofDetails(
roof_type=self._local_str(lines, "Type"),
insulation=insulation,
u_value_known=self._local_bool(lines, "U-value Known"),
insulation_thickness_mm=thickness_mm,
)
def _extract_roof(self) -> RoofDetails:
section = self._between("8.0 Roofs:", "8.1 Rooms in Roof:")
bp_chunks = self._split_section_by_bp(section)
main_body = bp_chunks[0][1] if bp_chunks else section
lines = [l.strip() for l in main_body.splitlines() if l.strip()]
return self._roof_details_from_lines(lines)
def _floor_details_from_lines(self, lines: List[str]) -> FloorDetails:
u_val_raw = self._local_val(lines, "Default U-value")
default_u = float(u_val_raw) if u_val_raw else None
# RdSAP 10 §5.13 Table 20 — retro-fitted upper floors lodge an
# "Insulation Thickness: NNN mm" cell so the cascade can route
# via the per-thickness column. Mirror of the §8 roof extractor
# at `_roof_details_from_lines`.
thickness_raw = self._local_val(lines, "Insulation Thickness")
thickness_mm = self._parse_thickness_mm(thickness_raw)
return FloorDetails(
location=self._local_str(lines, "Location"),
floor_type=self._local_str(lines, "Type"),
insulation=self._local_str(lines, "Insulation"),
u_value_known=self._local_bool(lines, "U-value Known"),
default_u_value=default_u,
insulation_thickness_mm=thickness_mm,
)
def _extract_floor(self) -> FloorDetails:
section = self._between("9.0 Floors:", "10.0 Doors:")
bp_chunks = self._split_section_by_bp(section)
main_body = bp_chunks[0][1] if bp_chunks else section
lines = [l.strip() for l in main_body.splitlines() if l.strip()]
return self._floor_details_from_lines(lines)
def _extract_door_u_value(self) -> Optional[float]:
"""Read the §10 Doors block's "Average U-value" lodging.
Scoped to the §10..§11 slice so the global "U-value" labels in
Walls/Roofs/Floors can't shadow the door reading. None when the
PDF omits the line (e.g. all doors recorded as uninsulated)."""
lines = self._section_lines("10.0 Doors:", "11.0 Windows:")
raw = self._local_val(lines, "Average U-value")
if not raw:
return None
try:
return float(raw.split()[0])
except (ValueError, IndexError):
return None
# RIR surface row: `<name> <length> <height> [<insulation> [<ins_type>]
# [<gable_type>] <default_u> <known> <u>]`. The middle slot
# widths vary by surface kind; we match the four leading numerics
# robustly (length, height, default_u, u_value) and slot the
# remaining textual fields by position. The layout preprocessor
# collapses multi-space-separated cells into single newlines, so
# each row in the dump occupies multiple lines per cell.
_RIR_SURFACE_NAMES: tuple[str, ...] = (
"Flat Ceiling 1", "Flat Ceiling 2",
"Stud Wall 1", "Stud Wall 2",
"Slope 1", "Slope 2",
"Gable Wall 1", "Gable Wall 2",
"Common Wall 1", "Common Wall 2",
)
def _extract_room_in_roof(
self, main_dim_body: str, age_band_text: str
) -> Optional[RoomInRoof]:
"""Parse the §8.1 Rooms in Roof block for the Main bp."""
section = self._between("8.1 Rooms in Roof:", "9.0 Floors:")
bp_chunks = self._split_section_by_bp(section) if section.strip() else []
main_body = bp_chunks[0][1] if bp_chunks else ""
# Age band from §3: "Main Prop. Room(s) in Roof H 1991-1995"
age_m = re.search(
r"Main Prop\. Room\(s\) in Roof\s+([A-M] [^\n]+)", age_band_text
)
age_band = age_m.group(1).strip() if age_m else None
return self._room_in_roof_from_bodies(
dim_body=main_dim_body,
rir_body=main_body,
age_band=age_band,
)
def _room_in_roof_from_bodies(
self,
dim_body: str,
rir_body: str,
age_band: Optional[str],
) -> Optional[RoomInRoof]:
"""Parse a single-BP Room(s) in Roof from the §4 dimension body
(floor area) and §8.1 construction body (assessment + surfaces).
Used for both Main and each extension — extensions get their
own per-BP slice of §4 and §8.1 + the per-extension age band
from §3's "<N>th Ext. Room(s) in Roof <age>" line.
"""
m = re.search(r"Room\(s\) in Roof:\s+(\d+(?:\.\d+)?)", dim_body)
if m is None:
return None
floor_area = float(m.group(1))
if floor_area <= 0:
return None
if not rir_body.strip() or "Room in roof type" not in rir_body:
# §4 lodged an RR area but §8.1 has no construction details
# for this BP — surface as a partial RR so the cascade can
# still attribute the floor area to TFA. Empty surfaces
# tuple is the sentinel the mapper consumes.
return RoomInRoof(
floor_area_m2=floor_area,
construction_age_band=age_band,
assessment="",
surfaces=[],
)
lines = [l.strip() for l in rir_body.splitlines() if l.strip()]
assessment_idx = next(
(i for i, l in enumerate(lines) if l == "Assessment"), None
)
assessment = (
lines[assessment_idx + 1]
if assessment_idx is not None and assessment_idx + 1 < len(lines)
else ""
)
surfaces: List[RoomInRoofSurface] = []
for name in self._RIR_SURFACE_NAMES:
try:
idx = lines.index(name)
except ValueError:
continue
surfaces.append(self._parse_rir_surface_row(name, lines, idx))
return RoomInRoof(
floor_area_m2=floor_area,
construction_age_band=age_band,
assessment=assessment,
surfaces=surfaces,
)
_RIR_NUMERIC_RE = re.compile(r"^-?\d+(?:\.\d+)?$")
# Elmhurst insulation cell formats: "100 mm", "125 mm", ... and the
# bucket-cap "400+ mm" (Table 17 max tabulated row). Optional trailing
# "+" allows the bucket-cap to parse through to the cascade with the
# same numeric value.
_RIR_INSULATION_THICKNESS_RE = re.compile(r"^\d+\+?\s*mm$")
def _parse_rir_surface_row(
self, name: str, lines: List[str], idx: int
) -> RoomInRoofSurface:
"""One RR surface row spans the name line followed by ~6-9 tokens
depending on which optional cells the surveyor filled. The token
order is stable: length, height, [insulation], [ins_type],
[gable_type], default_u, u_known, u_value. Numeric cells (length,
height, default_u, u_value) are the anchor; everything else is
slotted into the appropriate textual field."""
# Walk forward until either we exhaust the cell budget or hit
# the next RIR row's name marker — the layout dump puts each
# numeric / textual cell on its own line and we can't tell
# the LAST cell of THIS row from the FIRST cell of the next
# without that signal.
tokens: List[str] = []
scan_end = min(idx + 10, len(lines))
for j in range(idx + 1, scan_end):
if self._is_next_rir_row(lines[j]):
break
tokens.append(lines[j])
# First two numerics = length, height
length = float(tokens[0]) if tokens and self._RIR_NUMERIC_RE.match(tokens[0]) else 0.0
height = float(tokens[1]) if len(tokens) > 1 and self._RIR_NUMERIC_RE.match(tokens[1]) else 0.0
# Last numeric is u_value; preceding "Yes"/"No" is u_value_known;
# the numeric before that is default_u.
# Walk from the end backwards looking for the u_value, then known
# flag, then default_u.
u_value = 0.0
u_value_known = False
default_u: Optional[float] = None
# The known/default_u tail is fairly stable; collect the trailing
# tokens and slot by position. The "known" token is "No" or "Yes".
rev = list(reversed(tokens[2:]))
# rev[0] = u_value, rev[1] = u_value_known, rev[2] = default_u
if len(rev) >= 1 and self._RIR_NUMERIC_RE.match(rev[0]):
u_value = float(rev[0])
if len(rev) >= 2 and rev[1] in ("Yes", "No"):
u_value_known = rev[1] == "Yes"
if len(rev) >= 3 and self._RIR_NUMERIC_RE.match(rev[2]):
default_u = float(rev[2])
# Middle textual cells: insulation, insulation_type, gable_type.
# Drop the leading length/height (already consumed) and the
# trailing 3 tokens (default_u, known, u_value).
middle = tokens[2:-3] if len(tokens) >= 5 else []
insulation = ""
insulation_type: Optional[str] = None
gable_type: Optional[str] = None
for t in middle:
if self._RIR_INSULATION_THICKNESS_RE.match(t) or t in ("As Built", "None", "Unknown"):
# "Unknown" is the third spec-valid thickness token
# (RdSAP 10 §3.10.1 PDF p.24: "default U-values apply
# when the roof room insulation is 'as built' or
# 'unknown'"). Mapper routes "Unknown" to
# insulation_thickness_mm=None so the cascade falls
# back to Table 18 col 4 default.
if not insulation:
insulation = t
elif t in ("Mineral or EPS", "PUR", "PIR", "PUR or PIR"):
# Summary §8.1 lodges the rigid-foam column as the
# disjunction "PUR or PIR" when the assessor doesn't
# distinguish between the two; the mapper canonicalises
# all three forms to SAP10 "rigid_foam" (cascade Table
# 17 col (b)).
insulation_type = t
elif t in (
"Party", "Sheltered", "Exposed",
"Connected", "Connected to heated space",
):
gable_type = t
return RoomInRoofSurface(
name=name,
length_m=length,
height_m=height,
insulation=insulation,
insulation_type=insulation_type,
gable_type=gable_type,
default_u_value=default_u,
u_value_known=u_value_known,
u_value=u_value,
)
def _is_next_rir_row(self, line: str) -> bool:
return line in self._RIR_SURFACE_NAMES
def _extract_extensions(self) -> List[ExtensionPart]:
"""Collect non-Main building parts. Cross-references the §4, §7,
§8, §9 per-bp subsections by extension name. "As Main: Yes"
within a section body inherits the main bp's data for that
section; otherwise the section body is parsed in isolation."""
# Gather per-section chunks once.
dim_section = self._between("4.0 Dimensions:", "5.0 Conservatory:")
wall_section = self._between("7.0 Walls:", "8.0 Roofs:")
roof_section = self._between("8.0 Roofs:", "8.1 Rooms in Roof:")
rir_section = self._between("8.1 Rooms in Roof:", "9.0 Floors:")
floor_section = self._between("9.0 Floors:", "10.0 Doors:")
dim_type = self._str_val("Dimension type")
dim_chunks = dict(self._split_section_by_bp(dim_section))
wall_chunks = dict(self._split_section_by_bp(wall_section))
roof_chunks = dict(self._split_section_by_bp(roof_section))
rir_chunks = dict(self._split_section_by_bp(rir_section)) if rir_section.strip() else {}
floor_chunks = dict(self._split_section_by_bp(floor_section))
# Per-extension RR age bands from §3: "1st Ext. Room(s) in Roof I 1996-2002".
ext_rir_age_re = re.compile(
r"(\d+(?:st|nd|rd|th))\s+Ext\.\s+Room\(s\) in Roof\s+([A-M] [^\n]+)",
re.MULTILINE,
)
ext_rir_age_bands: dict[str, str] = {
f"{m.group(1)} Extension": m.group(2).strip()
for m in ext_rir_age_re.finditer(self._text)
}
main_walls = self._extract_walls()
main_roof = self._extract_roof()
main_floor = self._extract_floor()
# Per-bp age-band lookup. Section 3 contains lines like
# "1st Extension B 1900-1929" — the band sits after the name.
age_band_re = re.compile(
r"^(\d+(?:st|nd|rd|th) Extension)\s+([A-M] [^\n]+)$",
re.MULTILINE,
)
age_bands = {m.group(1): m.group(2).strip() for m in age_band_re.finditer(self._text)}
# Collect names in document order from the dimensions section
# (excluding Main Property).
names = [
name for name, _ in self._split_section_by_bp(dim_section)
if name != "Main Property"
]
extensions: List[ExtensionPart] = []
for name in names:
dim_body = dim_chunks.get(name, "")
wall_body = wall_chunks.get(name, "")
roof_body = roof_chunks.get(name, "")
floor_body = floor_chunks.get(name, "")
wall_lines = [l.strip() for l in wall_body.splitlines() if l.strip()]
roof_lines = [l.strip() for l in roof_body.splitlines() if l.strip()]
floor_lines = [l.strip() for l in floor_body.splitlines() if l.strip()]
if self._local_bool(wall_lines, "As Main Wall"):
# Alternative walls live in the extension's own chunk
# even when the main wall fields are inherited; merge
# them into the inherited WallDetails so the bp carries
# them through to its SapBuildingPart.
walls = WallDetails(
wall_type=main_walls.wall_type,
insulation=main_walls.insulation,
thickness_unknown=main_walls.thickness_unknown,
u_value_known=main_walls.u_value_known,
party_wall_type=main_walls.party_wall_type,
thickness_mm=main_walls.thickness_mm,
insulation_thickness_mm=main_walls.insulation_thickness_mm,
alternative_walls=self._alternative_walls_from_lines(wall_lines),
)
else:
walls = self._wall_details_from_lines(wall_lines)
roof = main_roof if self._local_bool(roof_lines, "As Main") else self._roof_details_from_lines(roof_lines)
floor = main_floor if self._local_bool(floor_lines, "As Main") else self._floor_details_from_lines(floor_lines)
rir = self._room_in_roof_from_bodies(
dim_body=dim_body,
rir_body=rir_chunks.get(name, ""),
age_band=ext_rir_age_bands.get(name),
)
extensions.append(
ExtensionPart(
name=name,
construction_age_band=age_bands.get(name, ""),
dimensions=BuildingPartDimensions(
dimension_type=dim_type,
floors=self._floors_from_dimensions_body(dim_body),
),
walls=walls,
roof=roof,
floor=floor,
room_in_roof=rir,
)
)
return extensions
def _extract_windows(self) -> List[Window]:
# Textract-style pages keep "Permanent\s+Shutters" adjacent in
# reading order and the windows table flows as one column-block
# the existing token-walker can step through. PDF-derived pages
# (Summary PDFs preprocessed from `pdftotext -layout`) break the
# header across lines, so this regex misses entirely and the
# `_extract_windows_from_layout` fallback below picks them up
# by anchoring on the W/H/Area data line.
m = re.search(
r"Permanent\s+Shutters\n(.*?)Draught Proofing",
self._text,
re.DOTALL,
)
if not m:
return self._extract_windows_from_layout()
tokens = [t.strip() for t in m.group(1).splitlines() if t.strip()]
windows: List[Window] = []
i = 0
while i + 12 < len(tokens):
try:
width_m = float(tokens[i])
height_m = float(tokens[i + 1])
area_m2 = float(tokens[i + 2])
except (ValueError, IndexError):
i += 1
continue
i += 3
# Collect glazing type tokens until frame_factor (0 < v ≤ 1.0)
glazing_parts: List[str] = []
while i < len(tokens):
try:
v = float(tokens[i])
if 0.0 < v <= 1.0:
break
glazing_parts.append(tokens[i])
except ValueError:
glazing_parts.append(tokens[i])
i += 1
# If last glazing token is a single word (no spaces, not numeric) it's the frame_type
frame_type: Optional[str] = None
if glazing_parts and " " not in glazing_parts[-1] and not glazing_parts[-1].replace(".", "").isdigit():
frame_type = glazing_parts.pop()
glazing_type = " ".join(glazing_parts).strip()
if i >= len(tokens):
break
frame_factor = float(tokens[i]); i += 1
# Consume glazing_gap if present ("mm" token, possibly multi-token e.g. "16 mm or more")
glazing_gap: Optional[str] = None
if i < len(tokens) and "mm" in tokens[i]:
gap_parts = [tokens[i]]; i += 1
while i < len(tokens) and tokens[i].lower() in {"or", "more"}:
gap_parts.append(tokens[i]); i += 1
glazing_gap = " ".join(gap_parts)
building_part = tokens[i]; i += 1
location = tokens[i]; i += 1
orientation = tokens[i]; i += 1
data_source = tokens[i]; i += 1
u_value = float(tokens[i]); i += 1
g_value = float(tokens[i]); i += 1
draught_proofed = tokens[i].lower() == "yes"; i += 1
permanent_shutters = tokens[i]; i += 1
windows.append(
Window(
width_m=width_m,
height_m=height_m,
area_m2=area_m2,
glazing_type=glazing_type,
frame_factor=frame_factor,
building_part=building_part,
location=location,
orientation=orientation,
data_source=data_source,
u_value=u_value,
g_value=g_value,
draught_proofed=draught_proofed,
permanent_shutters=permanent_shutters,
frame_type=frame_type,
glazing_gap=glazing_gap,
)
)
return windows
# Anchors used by the layout-style window parser. The W/H/Area anchor
# is sometimes followed by a joined glazing-type phrase on the same
# line (e.g. '1.22 1.76 2.15 Double pre 2002'); the optional 4th
# capture surfaces that text so the parser can use it instead of a
# separately-laid-out prefix line.
_WIDTH_HEIGHT_AREA_RE = re.compile(
r"^(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)(?:\s+(\S.*?))?$"
)
_MANUFACTURER_RE = re.compile(r"^(Manufacturer|Default)\s+(\d+\.\d+)$")
_ORIENTATION_TOKENS = frozenset({
"North", "South", "East", "West", "NE", "NW", "SE", "SW",
})
_BP_INLINE_TOKENS = frozenset({"Main"}) # "Extension" only appears as suffix
# A room-in-roof window (rooflight) lodges its §11 "Location" cell as
# "Roof of Room in Roof", which the layout preprocessor wraps onto two
# tokens ("Roof of Room" in the prefix block, "in Roof" in the suffix).
# Detected so the window routes to a roof window (worksheet (27a))
# and the tokens don't leak into the glazing-type phrase.
_ROOF_OF_ROOM_LOCATION_TOKENS = frozenset({"Roof of Room", "in Roof"})
# The Elmhurst Summary PDF lodges each window's glazing-type as a
# capitalised phrase like "Double between 2002" / "Double with unknown"
# / "Single" / "Triple" / "Secondary". The first token of that phrase
# marks the start of a new window's prefix block in the layout dump,
# which is the only stable signal partitioning one window's suffix
# from the next window's prefix.
_GLAZING_TYPE_PREFIX_WORDS = frozenset({
"Single", "Double", "Triple", "Secondary",
})
def _extract_windows_from_layout(self) -> List[Window]:
"""Fallback window parser for Summary PDFs preprocessed from
`pdftotext -layout`. Each window has two stable anchors:
a "W H Area" line and a "Manufacturer <U_value>" line a few
lines further down. Everything between holds frame_type,
frame_factor, and a variable mix of glazing_gap, building_part,
location, and orientation (depending on which fields the
surveyor lodged); everything around the window holds glazing-
type/building-part/orientation prefix/suffix tokens split by
the layout preprocessor.
"""
m = re.search(
r"11\.0 Windows:(.*?)(Draught Proofing|12\.0 Ventilation)",
self._text, re.DOTALL,
)
if not m:
return []
lines = m.group(1).splitlines()
# Locate all (data_line, manufacturer_line) pairs in document
# order. Each pair is one window.
data_anchors: List[tuple[int, re.Match[str]]] = []
for i, line in enumerate(lines):
anchor = self._WIDTH_HEIGHT_AREA_RE.match(line.strip())
if anchor is not None:
data_anchors.append((i, anchor))
windows: List[Window] = []
for k, (data_idx, anchor) in enumerate(data_anchors):
manuf_idx = self._find_manufacturer_after(lines, data_idx)
if manuf_idx is None:
continue
prev_manuf_idx = (
self._find_manufacturer_after(lines, data_anchors[k - 1][0])
if k > 0 else None
)
next_data_idx = (
data_anchors[k + 1][0] if k + 1 < len(data_anchors) else len(lines)
)
# Partition the cross-window gap between this window's suffix
# and the next window's prefix on the first glazing-type-start
# token (Single/Double/Triple/Secondary). The same boundary
# is used symmetrically — current window's `after_end` = next
# window's `before_start` — so prefix tokens of W_{k+1} never
# get attributed as suffix of W_k (which was the bug producing
# orientation='East-South' for windows where 'South' actually
# belonged to the next row).
before_start = (
self._partition_after_manuf(lines, prev_manuf_idx, data_idx)
if prev_manuf_idx is not None else 0
)
after_end = self._partition_after_manuf(lines, manuf_idx, next_data_idx)
try:
window = self._parse_window_from_anchors(
lines=lines,
data_idx=data_idx,
manuf_idx=manuf_idx,
anchor=anchor,
before_start=before_start,
after_end=after_end,
)
except (ValueError, IndexError):
continue
if window is not None:
windows.append(window)
return windows
def _find_manufacturer_after(self, lines: List[str], data_idx: int) -> Optional[int]:
for j in range(data_idx + 1, min(data_idx + 12, len(lines))):
if self._MANUFACTURER_RE.match(lines[j].strip()):
return j
return None
_FRAME_TYPE_AND_FACTOR_RE = re.compile(r"^(\S+(?:\s+\S+)*?)\s+(\d\.\d+)$")
_FRAME_FACTOR_ONLY_RE = re.compile(r"^(\d\.\d+)$")
def _parse_frame_type_and_factor(
self, lines: List[str], data_idx: int
) -> tuple[str, Optional[float], int]:
"""Return `(frame_type, frame_factor, middle_start_idx)` from
the lines immediately after the data anchor. Layouts vary:
(a) "PVC" on data+1, "0.70" on data+2 — the original 000474
shape;
(b) "Wood 0.70" on data+1 — joined-cell variant from 000487
and 000516 first-row windows;
(c) "0.70" alone on data+1 (no frame_type word at all) —
seen in 000487's subsequent windows where the
preprocessor dropped the frame-type column. frame_type
is recovered downstream from glazing-type defaults or
left empty."""
first = lines[data_idx + 1].strip()
combined = self._FRAME_TYPE_AND_FACTOR_RE.match(first)
if combined is not None:
return combined.group(1), float(combined.group(2)), data_idx + 2
factor_only = self._FRAME_FACTOR_ONLY_RE.match(first)
if factor_only is not None:
return "", float(factor_only.group(1)), data_idx + 2
if data_idx + 2 >= len(lines):
return first, None, data_idx + 2
frame_type = first
try:
frame_factor = float(lines[data_idx + 2].strip())
except ValueError:
return frame_type, None, data_idx + 3
return frame_type, frame_factor, data_idx + 3
def _partition_after_manuf(
self, lines: List[str], manuf_idx: int, next_data_idx: int
) -> int:
"""Return the exclusive upper bound for this window's suffix
block (and the inclusive lower bound for the next window's prefix
block). After the manufacturer line come 3 fixed tokens (g_value,
draught, shutters); the variable suffix lines start at manuf+4
and run until either (a) the next window's glazing-type-start
token (e.g. 'Double between 2002', 'Single', 'Triple ...') or
(b) the second orientation token in the gap, whichever comes
first. Branch (b) covers layouts where the glazing-type is
joined to the data line (no separate prefix line exists), so
the only signal of window-transition is the orientation tokens
rotating: orient_suffix(k) → orient_prefix(k+1). Falls through
to `next_data_idx` when neither marker is present."""
scan_start = manuf_idx + 4
seen_orient = False
for j in range(scan_start, next_data_idx):
stripped = lines[j].strip()
first_word = stripped.split(" ", 1)[0]
if first_word in self._GLAZING_TYPE_PREFIX_WORDS:
return j
if stripped in self._ORIENTATION_TOKENS:
if seen_orient:
return j
seen_orient = True
return next_data_idx
def _parse_window_from_anchors(
self,
*,
lines: List[str],
data_idx: int,
manuf_idx: int,
anchor: re.Match[str],
before_start: int,
after_end: int,
) -> Optional[Window]:
width = float(anchor.group(1))
height = float(anchor.group(2))
area = float(anchor.group(3))
# Layout-style cell joining sometimes leaves the glazing-type
# phrase trailing the W H Area triplet on the same line (e.g.
# "1.22 1.76 2.15 Double pre 2002"); when present we pass it
# through as `inline_glazing_type` and the composer skips the
# would-be glazing-prefix scan.
inline_glazing_type = anchor.group(4) if anchor.lastindex and anchor.lastindex >= 4 else None
# frame_type and frame_factor immediately follow the data line.
# Layout-style cell joining sometimes collapses them onto a
# single "Wood 0.70" line; treat both shapes uniformly so the
# downstream `middle` slice still starts at the first variable
# field (glazing_gap / bp / location / orient).
if data_idx + 1 >= len(lines):
return None
frame_type, frame_factor, middle_start = self._parse_frame_type_and_factor(
lines, data_idx
)
if frame_factor is None or not 0.0 < frame_factor <= 1.0:
return None
# Variable-order tokens between frame_factor and Manufacturer.
middle = [lines[j].strip() for j in range(middle_start, manuf_idx)]
glazing_gap = next((t for t in middle if "mm" in t.lower()), None)
# Wall-location lodging. Most rows put "External wall" in
# `middle`; alt-wall rows (cert 2636 window-4 / cert 9418 alt-
# wall window) put "Alternative wall" in the PRE-data slice
# (between the previous window's end and W×H×A). Search both
# slices so either layout resolves to the correct location.
pre_data = [lines[j].strip() for j in range(before_start, data_idx)]
location = (
next((t for t in middle if "wall" in t.lower()), None)
or next((t for t in pre_data if "wall" in t.lower()), None)
or "External wall"
)
bp_inline = next((t for t in middle if t in self._BP_INLINE_TOKENS), None)
orient_inline = next(
(t for t in middle if t in self._ORIENTATION_TOKENS), None
)
# Manufacturer line carries data_source + u_value.
manuf_match = self._MANUFACTURER_RE.match(lines[manuf_idx].strip())
if manuf_match is None:
return None
data_source = manuf_match.group(1)
u_value = float(manuf_match.group(2))
# Post-manufacturer: g_value, draught, shutters.
if manuf_idx + 3 >= len(lines):
return None
try:
g_value = float(lines[manuf_idx + 1].strip())
except ValueError:
return None
draught_proofed = lines[manuf_idx + 2].strip().lower() == "yes"
permanent_shutters = lines[manuf_idx + 3].strip()
# Prefix / suffix tokens (variable count) carry the
# glazing-type, building-part, and orientation strings split by
# the layout preprocessor.
before = [lines[j].strip() for j in range(before_start, data_idx) if lines[j].strip()]
after = [lines[j].strip() for j in range(manuf_idx + 4, after_end) if lines[j].strip()]
# Room-in-roof windows lodge their location as "Roof of Room in
# Roof" (wrapped across the prefix/suffix blocks). Detect it, pull
# those tokens out so they don't contaminate the glazing-type
# phrase, and override the wall-keyed `location` with the roof-of-
# room marker the roof-window classifier keys on.
if any(
t in self._ROOF_OF_ROOM_LOCATION_TOKENS for t in (*before, *after)
):
location = "Roof of Room"
before = [t for t in before if t not in self._ROOF_OF_ROOM_LOCATION_TOKENS]
after = [t for t in after if t not in self._ROOF_OF_ROOM_LOCATION_TOKENS]
glazing_type, building_part, orientation = self._compose_window_descriptors(
before=before,
after=after,
bp_inline=bp_inline,
orient_inline=orient_inline,
inline_glazing_type=inline_glazing_type,
)
return Window(
width_m=width,
height_m=height,
area_m2=area,
glazing_type=glazing_type,
frame_factor=frame_factor,
building_part=building_part,
location=location,
orientation=orientation,
data_source=data_source,
u_value=u_value,
g_value=g_value,
draught_proofed=draught_proofed,
permanent_shutters=permanent_shutters,
frame_type=frame_type,
glazing_gap=glazing_gap,
)
def _compose_window_descriptors(
self,
*,
before: List[str],
after: List[str],
bp_inline: Optional[str],
orient_inline: Optional[str],
inline_glazing_type: Optional[str] = None,
) -> tuple[str, str, str]:
"""Re-join the glazing-type / building-part / orientation tokens
split by the layout preprocessor. Each is at most 2 fragments
(one before the data line, one after); inline tokens in the
between-segment win over prefix/suffix fragments."""
# before holds (in document order, possibly): glazing_prefix,
# bp_prefix, orient_prefix — bp/orient may be missing.
# after holds: glazing_suffix, bp_suffix, orient_suffix — same.
prefix = list(before[-3:]) # last 3 lines preceding data
suffix = list(after[:3])
def pop_if_orientation(tokens: List[str]) -> Optional[str]:
for t in tokens:
if t in self._ORIENTATION_TOKENS:
tokens.remove(t)
return t
return None
def pop_if_bp_fragment(tokens: List[str]) -> Optional[str]:
# Prefix fragments like "1st" / "2nd" — match digit-prefixed
# ordinals; suffix fragments are always "Extension".
for t in tokens:
if re.match(r"^\d+(?:st|nd|rd|th)$", t) or t == "Extension":
tokens.remove(t)
return t
return None
orient_prefix_token = pop_if_orientation(prefix)
orient_suffix_token = pop_if_orientation(suffix)
bp_prefix_frag = pop_if_bp_fragment(prefix)
bp_suffix_frag = pop_if_bp_fragment(suffix)
# Glazing type: an inline glazing-type captured from the data
# line (layout-joined variant) wins; otherwise join the remaining
# prefix + suffix fragments.
if inline_glazing_type is not None:
glazing_type = inline_glazing_type
else:
# The glazing-type phrase always starts with a glazing-start
# word (Single/Double/Triple/Secondary). The FIRST window in
# a building part has `before_start = 0`, so its prefix block
# reaches back into the wrapped windows-table header; the
# third header line's tail tokenises to "value value Proofed
# Shutters" (the "U value / g value / Draught Proofed /
# Permanent Shutters" column titles) and is neither an
# orientation nor a bp fragment, so it survives the pops.
# Drop any prefix fragments preceding the glazing-start word
# so they don't leak into the glazing type.
glazing_start = next(
(
idx
for idx, frag in enumerate(prefix)
if frag.split(" ", 1)[0] in self._GLAZING_TYPE_PREFIX_WORDS
),
None,
)
glazing_prefix = (
prefix[glazing_start:] if glazing_start is not None else prefix
)
glazing_type = " ".join([*glazing_prefix, *suffix]).strip()
# Building part: inline token wins; otherwise join prefix + suffix.
if bp_inline is not None:
building_part = bp_inline
else:
building_part = " ".join(
t for t in (bp_prefix_frag, bp_suffix_frag) if t
).strip()
# Orientation: inline token wins for the primary direction;
# combine with the opposite-direction fragment when present.
primary = orient_inline or orient_prefix_token or ""
secondary_candidates = [
t for t in (orient_prefix_token, orient_suffix_token) if t and t != primary
]
if primary and secondary_candidates:
orientation = f"{primary}-{secondary_candidates[0]}"
else:
orientation = primary
return glazing_type, building_part, orientation
def _extract_ventilation(self) -> VentilationAndCooling:
# SAP 10.2 §2 (17a) "Air permeability value, AP4". Scoped to
# §12.2..§13.0 so the per-window U-values + door U-values can't
# shadow the float read. Absent when `pressure_test_method !=
# "Pulse"` (the modal cohort lodgement).
pressure_lines = self._section_lines(
"12.2 Air Pressure Test", "13.0 Lighting"
)
ap4_raw = self._local_val(pressure_lines, "Pressure Test Result (AP4)")
air_permeability_ap4_m3_h_m2: Optional[float] = None
if ap4_raw:
try:
air_permeability_ap4_m3_h_m2 = float(ap4_raw.split()[0])
except (ValueError, IndexError):
air_permeability_ap4_m3_h_m2 = None
# Summary §12.1 "Mechanical Ventilation Type" — scoped to §12.1
# body so the global "Type" labels in §14 / §15 can't shadow it.
mv_lines = self._section_lines(
"12.1 Mechanical Ventilation", "12.2 Air Pressure Test"
)
mv_type_raw = self._local_val(mv_lines, "Mechanical Ventilation Type")
mechanical_ventilation_type = (
" ".join(mv_type_raw.split()) if mv_type_raw else None
)
# SAP 10.2 §2.6.4 + Table 4f line (230a) — MEV PCDB lookup
# inputs. Cert lodges PCDF index, wet-rooms count, ducting
# type, and whether the installation was approved.
mev_pcdf_raw = self._local_val(mv_lines, "MV PCDF Reference Number")
mev_pcdf_reference = (
int(mev_pcdf_raw) if mev_pcdf_raw and mev_pcdf_raw.isdigit() else None
)
wet_rooms_raw = self._local_val(mv_lines, "Wet Rooms")
wet_rooms_count = (
int(wet_rooms_raw) if wet_rooms_raw and wet_rooms_raw.isdigit() else None
)
duct_type_raw = self._local_val(mv_lines, "Duct Type")
duct_type = duct_type_raw if duct_type_raw else None
approved_raw = self._local_val(mv_lines, "Approved Installation")
approved_installation = (
None if approved_raw is None
else approved_raw.strip().lower() == "yes"
)
return VentilationAndCooling(
open_chimneys_count=self._int_val("No. of open chimneys"),
open_flues_count=self._int_val("No. of open flues"),
open_chimneys_closed_fire_count=self._int_val(
"No. of open chimneys/open flues attached to closed fire"
),
solid_fuel_boiler_flues_count=self._int_val(
"No. of flues attached to solid fuel boiler"
),
other_heater_flues_count=self._int_val(
"No. of open flues attached to other heater"
),
blocked_chimneys_count=self._int_val("No. of blocked chimneys"),
extract_fans_count=self._int_val("No. of intermittent extract fans"),
passive_vents_count=self._int_val("No. of passive vents"),
flueless_gas_fires_count=self._int_val("No. of flueless gas fires"),
fixed_space_cooling=self._bool_val("Fixed Space Cooling"),
draught_lobby=self._str_val("Draught Lobby"),
mechanical_ventilation=self._bool_val("Mechanical Ventilation"),
pressure_test_method=self._str_val("Test Method"),
air_permeability_ap4_m3_h_m2=air_permeability_ap4_m3_h_m2,
mechanical_ventilation_type=mechanical_ventilation_type,
mechanical_ventilation_pcdf_reference=mev_pcdf_reference,
wet_rooms_count=wet_rooms_count,
duct_type=duct_type,
approved_installation=approved_installation,
)
def _extract_lighting(self) -> Lighting:
led_cfl_count_known = self._bool_val("Number of LED and CFL Known")
return Lighting(
total_bulbs=self._int_val("Total number of bulbs"),
led_cfl_count_known=led_cfl_count_known,
led_count=self._int_val("Number of LED lights"),
cfl_count=self._int_val("Number of CFL lights"),
incandescent_count=self._int_val("Total number of incandescents"),
low_energy_count=(
0 if led_cfl_count_known
else self._int_val("Total number of Low Energy")
),
)
def _extract_main_heating(self) -> MainHeating:
# Community-heated dwellings (e.g. SAP code 301 "Community heating
# scheme" per SAP10.2 Table 4a category 6) and "no system" certs
# (SAP code 699 "Electric heaters assumed where no system lodged")
# lodge §14.0 Main Heating1 directly followed by §14.1 Community
# Heating/Heat Network rather than §14.1 Main Heating2 — there is
# no second main system on a community-heated dwelling. Close the
# §14.0 block at whichever §14.1 form appears first so every
# Summary shape surfaces the SAP code.
lines = self._section_lines_first_end(
"14.0 Main Heating1",
("14.1 Main Heating2", "14.1 Community Heating"),
)
pct_raw = self._local_val(lines, "Percentage of Heat")
pct = int(pct_raw.split()[0]) if pct_raw else 0
# §14.0 "Main Heating SAP Code" identifies Main 1 by SAP 10.2
# Table 4a code (e.g. 224 = "Air source heat pump, 2013 or
# later"). PCDB-boiler certs leave this empty / lodge "0" — the
# PCDB index in `PCDF boiler Reference` is the identifier in
# that case. Treat 0 (or absent) as None so the mapper can
# distinguish "no SAP code lodged" from a real Table 4a code.
sap_code_raw = self._local_val(lines, "Main Heating SAP Code")
main_heating_sap_code: Optional[int] = None
if sap_code_raw is not None:
head = sap_code_raw.split()[0] if sap_code_raw.split() else ""
if head.isdigit():
v = int(head)
main_heating_sap_code = v if v > 0 else None
# The "Secondary Heating SapCode" key is lodged inside §14.1 Main
# Heating2 — Elmhurst uses the Main-2 block to also carry the
# cert's secondary heating system (when one exists). Look for it
# in that section; absence (or "0") means no secondary lodged.
secondary_lines = self._section_lines(
"14.1 Main Heating2", "14.1 Community Heating"
)
secondary_raw = self._local_val(secondary_lines, "Secondary Heating SapCode")
secondary_code = (
int(secondary_raw)
if secondary_raw is not None and secondary_raw.isdigit()
and int(secondary_raw) > 0
else None
)
main_heating_2 = self._extract_main_heating_2()
community_heating = self._extract_community_heating()
return MainHeating(
heat_emitter=self._local_str(lines, "Heat Emitter"),
fuel_type=self._local_str(lines, "Fuel Type"),
flue_type=self._local_str(lines, "Flue Type"),
fan_assisted_flue=self._local_bool(lines, "Fan Assisted Flue"),
design_flow_temperature=self._local_str(lines, "Design flow temperature"),
heating_controls_ees=self._local_str(lines, "Main Heating Controls EES"),
heating_controls_sap=self._local_str(lines, "Main Heating Controls Sap"),
percentage_of_heat=pct,
pcdf_boiler_reference=self._local_val(lines, "PCDF boiler Reference"),
heat_pump_age=self._local_val(lines, "Heat pump age"),
main_heating_sap_code=main_heating_sap_code,
main_heating_ees=self._local_str(lines, "Main Heating EES Code"),
secondary_heating_sap_code=secondary_code,
main_heating_2=main_heating_2,
community_heating=community_heating,
)
def _extract_main_heating_2(self) -> Optional[MainHeating2]:
"""§14.1 Main Heating2 block — returns None when the block is
either absent or lodges only placeholder zeros (the PCDB-only
convention for "no Main 2"). Otherwise builds a populated
`MainHeating2` from the lodged §14.1 fields.
Identifier signal: Main 2 is "present" when the §14.1 block
lodges either a non-zero PCDB boiler reference (e.g. cert 000565
Main 2 PCDB 15100 Vaillant Ecotec plus 415) OR a non-zero SAP
code. PCDB-only certs lodge `PCDF boiler Reference = 0` +
`Main Heating SAP Code = 0` for an absent Main 2 (per the two
JSON fixtures at `elmhurst_site_notes_{1,2}_text.json`).
"""
lines = self._section_lines(
"14.1 Main Heating2", "14.1 Community Heating",
)
pcdf_raw = self._local_val(lines, "PCDF boiler Reference")
pcdf_first = (
pcdf_raw.split()[0] if pcdf_raw and pcdf_raw.split() else ""
)
has_pcdb_ref = pcdf_first.isdigit() and int(pcdf_first) > 0
sap_code_raw = self._local_val(lines, "Main Heating SAP Code")
main_heating_sap_code: Optional[int] = None
if sap_code_raw is not None:
head = sap_code_raw.split()[0] if sap_code_raw.split() else ""
if head.isdigit():
v = int(head)
main_heating_sap_code = v if v > 0 else None
if not has_pcdb_ref and main_heating_sap_code is None:
return None
# §14.1's "Percentage of Heat" lodges either "0 %" (with space)
# or "0%" (no space). Strip the '%' before int() rather than
# split() so both forms parse.
pct_raw = self._local_val(lines, "Percentage of Heat")
pct = (
int(pct_raw.rstrip("%").strip().split()[0])
if pct_raw and pct_raw.rstrip("%").strip()
else 0
)
return MainHeating2(
pcdf_boiler_reference=pcdf_raw,
fuel_type=self._local_str(lines, "Fuel Type"),
flue_type=self._local_str(lines, "Flue Type"),
fan_assisted_flue=self._local_bool(lines, "Fan Assisted Flue"),
percentage_of_heat=pct,
main_heating_sap_code=main_heating_sap_code,
heat_emitter=self._local_str(lines, "Heat Emitter"),
heating_controls_sap=self._local_str(lines, "Main Heating Controls Sap"),
)
def _extract_community_heating(self) -> Optional[CommunityHeating]:
"""§14.1 Community Heating/Heat Network block. Lodged in place of
§14.1 Main Heating2 when the §14.0 Main Heating SAP code names a
heat-network row (Table 4a 301/302/304). Returns None when no
§14.1 Community Heating block is present on the cert.
The block carries the Community Heat Source (Boilers / CHP /
Heat pump) + Community Fuel Type (Mains Gas / Electricity /
Mineral oil or biodiesel / Coal) — together these resolve the
Table 12 heat-network fuel code that bills the cascade. See
`_resolve_community_heating_fuel_code` in the mapper.
"""
lines = self._section_lines(
"14.1 Community Heating/Heat Network", "14.2 Meters",
)
# Absence of the §14.1 Community Heating block: no marker found
# → `_section_lines` returns []. Lodgement convention also
# leaves Community Heat Source empty on individually-heated
# dwellings; treat both as "no community heating present".
heat_source = self._local_str(lines, "Community Heat Source")
if not lines or not heat_source:
return None
return CommunityHeating(
heating_type=self._local_str(lines, "Heating Type"),
pcdf_boiler_reference=self._local_val(lines, "PCDF Boiler Reference"),
community_heat_source=heat_source,
community_fuel_type=self._local_str(lines, "Community Fuel Type"),
heating_controls_ees=self._local_str(lines, "Heating Controls EES"),
heating_controls_sap=self._local_str(lines, "Heating Controls SAP"),
chp_fuel_factor=self._local_val(lines, "CHP Fuel Factor"),
)
def _extract_meters(self) -> Meters:
return Meters(
electricity_meter_type=self._str_val("Electricity meter type"),
main_gas=self._bool_val("Main gas"),
electricity_smart_meter=self._bool_val("Electricity Smart Meter Present"),
gas_smart_meter=self._bool_val("Gas Smart Meter Present"),
)
def _extract_water_heating(self) -> WaterHeating:
# §15.1 lodgings — Summary writes these only when a cylinder
# is present. The §15.1 block uses labels ("Cylinder Size",
# "Insulated", "Insulation Thickness") that collide with
# global occurrences elsewhere ("Insulation Thickness" also
# appears in §7 Walls / §8 Roofs); scope the lookups via
# `_local_val` against the §15.1..§15.2 slice to disambiguate.
cylinder_lines = self._section_lines(
"15.1 Hot Water Cylinder", "15.2 Community Hot Water",
)
cylinder_size_label = self._local_val(
cylinder_lines, "Cylinder Size",
)
cylinder_insulation_label = self._local_val(
cylinder_lines, "Insulated",
)
cylinder_ins_thickness_raw = self._local_val(
cylinder_lines, "Insulation Thickness",
)
cylinder_insulation_thickness_mm: Optional[int] = None
if cylinder_ins_thickness_raw:
first = cylinder_ins_thickness_raw.split()[0]
if first.isdigit():
cylinder_insulation_thickness_mm = int(first)
cylinder_thermostat_raw = self._local_val(
cylinder_lines, "Cylinder Thermostat",
)
cylinder_thermostat: Optional[bool] = (
cylinder_thermostat_raw.strip().lower() == "yes"
if cylinder_thermostat_raw is not None
else None
)
# Fallback: Elmhurst Summary §16 "Recommendations" block carries
# existing fittings as `<feature> (Already installed)` lines.
# When §15.1 doesn't lodge "Cylinder Thermostat" directly, treat
# the "Cylinder thermostat (Already installed)" recommendation
# line as confirmation that the thermostat is present (per
# S0380.140 corpus probe — all 41 variants on property 001431
# lodge this in §16 but none in §15.1, so the §15.1-only lookup
# returned None and the cascade defaulted `has_cylinder_thermostat
# = False`, mis-applying SAP 10.2 Table 2b's ×1.3 "no thermostat"
# multiplier).
if cylinder_thermostat is None:
if "Cylinder thermostat (Already installed)" in self._lines:
cylinder_thermostat = True
return WaterHeating(
water_heating_code=self._str_val("Water Heating Code"),
water_heating_sap_code=self._int_val("Water Heating SapCode"),
water_heating_fuel_type=self._str_val("Water Heating Fuel Type"),
hot_water_cylinder_present=self._bool_val("Hot Water Cylinder Present"),
cylinder_size_label=cylinder_size_label,
cylinder_insulation_label=cylinder_insulation_label,
cylinder_insulation_thickness_mm=cylinder_insulation_thickness_mm,
cylinder_thermostat=cylinder_thermostat,
)
def _extract_baths_and_showers(self) -> BathsAndShowers:
n_baths = self._int_val("Total Number of Baths")
n_connected = self._int_val("Number of Baths Connected")
# Section-bounded "Connected" lookup. Global `_lines.index` collides
# with §3 building-parts elevation flags ("Connected" / "Exposed" /
# "Sheltered"), losing the shower roster on multi-extension certs
# (cert 000565 lodges 4 extensions and an electric shower; pre-fix
# the global match landed on a wall row and the digit-check broke).
# `1x.0 Baths and Showers` and `18.0 Flue Gas Heat Recovery System`
# are both unique single-occurrence anchors in the Elmhurst Summary
# PDF schema.
section = self._section_lines(
"1x.0 Baths and Showers", "18.0 Flue Gas Heat Recovery System",
)
try:
idx = section.index("Connected")
except ValueError:
return BathsAndShowers(
number_of_baths=n_baths,
number_of_baths_connected=n_connected,
showers=[],
)
showers: List[Shower] = []
j = idx + 1
while j + 2 <= len(section) - 1:
num_line = section[j]
if not num_line.isdigit():
break
showers.append(
Shower(
shower_number=int(num_line),
outlet_type=section[j + 1],
connected=section[j + 2],
)
)
j += 3
return BathsAndShowers(
number_of_baths=n_baths,
number_of_baths_connected=n_connected,
showers=showers,
)
def _rating_val(self, label: str) -> int:
v = self._next_val(label)
try:
return int(v.split()[-1]) if v else 0
except (ValueError, IndexError):
return 0
def _extract_renewables(self) -> Renewables:
fghrs_lines = self._section_lines(
"18.0 Flue Gas Heat Recovery System", "19.0 Photovoltaic Panel"
)
fghrs = self._local_bool(fghrs_lines, "Present")
terrain = self._str_val("Terrain Type")
hydro_raw = self._next_val("Electricity generated [kWh/year]")
hydro = float(hydro_raw) if hydro_raw else 0.0
# RdSAP 10 §11.1 b): the Summary §19.0 may lodge a "% of roof
# area" row when the surveyor doesn't capture detailed kWp /
# orientation / pitch. `_int_val` returns 0 when the label is
# absent (cert lodges detailed pv_arrays instead) — collapse to
# None so downstream can distinguish "no PV" from "PV via %
# roof area path".
pv_pct = self._int_val("Proportion of roof area")
# Solar HW collector geometry — Summary §16.0. Only populated
# when the cert lodges "Are details known? Yes" in the solar
# block. Cert 000565 lodges West / 30° / Modest. When absent
# (cert says no, or no solar HW at all) → None and the cascade
# falls back to RdSAP 10 §10.11 Table 29 defaults (South / 30°
# / Modest).
solar_lines = self._section_lines(
"16.0 Solar water heating",
"17.0 Waste Water Heat Recovery System",
)
solar_orientation = self._local_val(
solar_lines, "Collector orientation",
)
solar_pitch_raw = self._local_val(solar_lines, "Collector elevation")
solar_pitch = _parse_solar_pitch_deg(solar_pitch_raw)
solar_overshading = self._local_val(solar_lines, "Overshading")
return Renewables(
solar_water_heating=self._bool_val("Solar Water Heating"),
wwhrs_present=self._bool_val("Is WWHRS present in the property?"),
flue_gas_heat_recovery_present=fghrs,
photovoltaic_panel=self._str_val("Photovoltaic Panel"),
export_capable_meter=self._bool_val("Export capable meter"),
wind_turbine_present=self._bool_val("Wind turbine present?"),
wind_turbines_terrain_type=terrain,
hydro_electricity_generated_kwh=hydro,
pv_arrays=self._extract_pv_arrays(),
pv_percent_roof_area=pv_pct if pv_pct > 0 else None,
solar_hw_collector_orientation=solar_orientation,
solar_hw_collector_pitch_deg=solar_pitch,
solar_hw_overshading=solar_overshading,
)
def _extract_pv_arrays(self) -> List[ElmhurstPvArray]:
"""Parse the Elmhurst Summary §19.0 PV Panel section. Returns
one `ElmhurstPvArray` per lodged array, or [] when absent.
The Summary's PV block looks like (single-array, e.g. cert 0380):
Photovoltaic panel details
PV Cells kW Peak Orientation
Elevation
Overshading
3.00
South-East
45°
None Or Little
Multi-array (e.g. cert 0350 lodges 2 arrays):
...
1.50
South-East
45°
None Or Little
1.50
North-West
45°
None Or Little
— each array is 4 values in (kW Peak, Orientation, Elevation,
Overshading) order. Anchor on "Photovoltaic panel details",
skip header lines, then read values in 4-tuples until the
section breaks at the next §header or end-of-array tokens
(Batteries / Export / Capacity / etc.).
"""
anchor = "Photovoltaic panel details"
try:
idx = next(i for i, l in enumerate(self._lines) if l == anchor)
except StopIteration:
return []
# The header lines after the anchor are: "PV Cells kW Peak
# Orientation", "Elevation", "Overshading". Subsequent lines
# carry values for one OR MORE arrays. Stop at the next
# §-header (a "20.0" or "21.0") or post-PV section tokens
# ("Batteries", "Connected to", "Diverter", "Capacity", etc.).
header_tokens = {"pv cells", "kw peak", "orientation", "elevation", "overshading"}
stop_tokens = {
"batteries", "capacity known", "capacity",
"connected to the dwelling's meter", "diverter present",
"export capable meter",
}
values: List[str] = []
for line in self._lines[idx + 1:]:
stripped = line.strip()
if not stripped:
continue
lower = stripped.lower()
if lower in stop_tokens:
break
# Next §-header (e.g. "20.0 Wind Turbine") closes the block —
# match "<digits>.<digit><whitespace><word>" so kWp values
# like "1.50" don't trip the close.
if re.match(r"^\d{1,2}\.\d\s+\w", stripped):
break
if any(h in lower for h in header_tokens):
continue
values.append(stripped)
# Walk values in 4-tuples; an incomplete trailing tuple is dropped.
arrays: List[ElmhurstPvArray] = []
for i in range(0, len(values) - 3, 4):
try:
kwp = float(values[i])
except ValueError:
continue
orientation = values[i + 1]
# Elevation lodged as "45°" — strip trailing degree symbol.
m = re.match(r"^(\d+)", values[i + 2])
if m is None:
continue
elevation = int(m.group(1))
overshading = values[i + 3]
arrays.append(ElmhurstPvArray(
peak_power_kw=kwp,
orientation=orientation,
elevation_deg=elevation,
overshading=overshading,
))
return arrays
def extract(self) -> ElmhurstSiteNotes:
emissions_raw = self._next_val("Emissions (t/year)")
co2 = float(emissions_raw.split()[0]) if emissions_raw else 0.0
return ElmhurstSiteNotes(
surveyor_info=self._extract_surveyor_info(),
property_details=self._extract_property_details(),
current_sap_rating=self._rating_val("Current SAP rating"),
potential_sap_rating=self._rating_val("Potential SAP rating"),
current_ei_rating=self._rating_val("Current EI rating"),
potential_ei_rating=self._rating_val("Potential EI rating"),
co2_emissions_current_t=co2,
property_type=self._str_val("1.0 Property type"),
attachment=self._extract_attachment(),
number_of_storeys=self._int_val("Storeys"),
habitable_rooms=self._int_val("Habitable Rooms"),
heated_habitable_rooms=self._int_val("Heated Habitable Rooms"),
construction_age_band=self._str_val("Main Property"),
dimensions=self._extract_dimensions(),
has_conservatory=self._bool_val("Is there a conservatory?"),
walls=self._extract_walls(),
roof=self._extract_roof(),
floor=self._extract_floor(),
door_count=self._int_val("Total Number of Doors"),
insulated_door_count=self._int_val("Number of Insulated Doors"),
insulated_door_u_value=self._extract_door_u_value(),
windows=self._extract_windows(),
draught_proofing_percent=self._int_val("Draught Proofing"),
ventilation=self._extract_ventilation(),
lighting=self._extract_lighting(),
main_heating=self._extract_main_heating(),
meters=self._extract_meters(),
water_heating=self._extract_water_heating(),
baths_and_showers=self._extract_baths_and_showers(),
renewables=self._extract_renewables(),
extensions=self._extract_extensions(),
room_in_roof=self._extract_room_in_roof_from_text(),
)
def _extract_room_in_roof_from_text(self) -> Optional[RoomInRoof]:
"""Convenience wrapper: pulls the Main §4 body + the §3 age-band
text once so `_extract_room_in_roof` doesn't need to re-slice
the document."""
dim_section = self._between("4.0 Dimensions:", "5.0 Conservatory:")
bp_chunks = self._split_section_by_bp(dim_section)
main_body = bp_chunks[0][1] if bp_chunks else dim_section
return self._extract_room_in_roof(main_body, self._text)