Model/backend/documents_parser/elmhurst_extractor.py
Khalim Conn-Kowlessar c144d444e2 Slice S0380.26: RdSAP10 §5.8 dry-lining adjustment on alt walls — closes cert 7700 -0.44 → +5e-5
Per RdSAP10 §5.8 final note + Table 14 page 41:

  "For drylining including laths and plaster use Rinsulation = 0.17 m²K/W."

Applied additively to the base U-value of an otherwise-uninsulated wall:

  U_adjusted = 1 / (1/U_base + 0.17)  — rounded to 2 d.p. half-up.

Closed form for the cohort fixture (cavity-as-built age C, U_base=1.5):

  1 / (1/1.5 + 0.17) = 1.19522... → 1.20 ✓ matches worksheet

Cert 7700-3362-0922-7022-3563 (Summary_000905.pdf / dr87-0001-000905.pdf)
is an End-Terrace house age C lodging:
  - Main wall: CavityWallDensePlasterDenseBlock, Filled Cavity, U=0.70
  - Alt wall 1: 14.44 m² Cavity As-Built, Dry-lining: Yes (worksheet
    `CavityWallPlasterOnDabsDenseBlock`, U=1.20)

Pre-slice the Elmhurst alt-wall mapper hard-coded `wall_dry_lined="N"`
and the cascade ignored the field everywhere — alt-wall U routed to the
cavity-as-built default (1.50), giving fabric (33) 148.72 W/K vs
worksheet 144.38 (Δ +4.33 W/K = ~+0.44 SAP). Worksheet "SAP value" line
lodges unrounded SAP 63.4425.

Implementation:
  1. `AlternativeWall.dry_lined: bool = False` on the Elmhurst surveys
     dataclass.
  2. Elmhurst extractor reads "Alternative Wall N Dry-lining: Yes/No"
     into the new field.
  3. `_map_elmhurst_alternative_wall` propagates `wall_dry_lined="Y"`
     instead of the hard-coded "N".
  4. `u_wall` gains a `dry_lined: bool = False` kwarg and a single
     §5.8 adjustment site at the as-built bucket (bucket=0). Insulated
     buckets already absorb the dry-lining R via Table 14.
  5. `_alt_wall_w_per_k` passes `dry_lined=alt_wall.wall_dry_lined == "Y"`.

Scope is the alt-wall path only — main BPs in the corpus all lodge
`wall_dry_lined="N"` (or the Summary PDF omits the field for the main
wall), so the main-wall call site is untouched. Conservative regression
posture per the user's strict cohort-pin convention.

Cohort-2 outcome (38 certs, Summary path):
  exact (<1e-4): 22 → **23**  (+1: cert 7700 -0.44 → +4.87e-05)
  0.07..0.5:      1 → **0**   (-1: cert 7700 closes out)
  0.5..1:         1 → 1       (cert 9796 unchanged — MIT precision floor)
  RAISES:         0 → 0

Cohort-1 ASHP cohort untouched: all certs lodge wall_dry_lined="N", so
the alt-wall call site short-circuits to the original cascade. Verified
no regressions across the 22 previously-exact cohort-2 certs either.

Pyright net-zero on all 8 touched files (183 → 183).

Tests: 704 → 708 pass (+4 new: u_wall §5.8 adjustment fires
correctly; cavity-as-built unchanged without flag; insulated bucket
unaffected by flag; heat_transmission alt-wall delta = 14.44 × 0.30
W/K; cert 7700 full chain hits worksheet 63.4425 at < 1e-4),
10 expected fails unchanged.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-28 10:56:11 +00:00

1326 lines
58 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from datetime import date, datetime
from typing import List, Optional
from datatypes.epc.surveys.elmhurst_site_notes import (
AlternativeWall,
BathsAndShowers,
BuildingPartDimensions,
ElmhurstSiteNotes,
ExtensionPart,
FloorDetails,
FloorDimension,
Lighting,
MainHeating,
Meters,
PropertyDetails,
Renewables,
RoofDetails,
RoomInRoof,
RoomInRoofSurface,
Shower,
SurveyorInfo,
VentilationAndCooling,
ElmhurstPvArray,
WallDetails,
WaterHeating,
Window,
)
class ElmhurstSiteNotesExtractor:
def __init__(self, pages: List[str]) -> None:
self._text = "\n".join(pages)
self._lines = [l.strip() for l in self._text.splitlines() if l.strip()]
# --- generic helpers ---
def _next_val(self, label: str) -> Optional[str]:
lc = label.rstrip(":") + ":"
lb = label.rstrip(":")
for i, line in enumerate(self._lines):
if line.startswith(lc) and len(line) > len(lc):
return line[len(lc):].strip() or None
if line == lc or line == lb:
for j in range(i + 1, min(i + 4, len(self._lines))):
v = self._lines[j]
if v.endswith(":") or v.startswith("©"):
return None
if v:
return v
return None
return None
def _str_val(self, label: str) -> str:
v = self._next_val(label)
return " ".join(v.split()) if v else ""
def _opt_str(self, label: str) -> Optional[str]:
v = self._next_val(label)
return " ".join(v.split()) if v else None
def _bool_val(self, label: str) -> bool:
v = self._next_val(label)
return v is not None and v.lower() == "yes"
def _int_val(self, label: str) -> int:
v = self._next_val(label)
try:
return int(v.split()[0]) if v else 0
except (ValueError, IndexError):
return 0
def _date_val(self, label: str) -> date:
v = self._next_val(label)
if not v:
raise ValueError(f"Missing date for label: {label}")
return datetime.strptime(v.strip(), "%d/%m/%Y").date()
def _between(self, start: str, end: str) -> str:
try:
s = self._text.index(start) + len(start)
e = self._text.index(end, s)
return self._text[s:e]
except ValueError:
return ""
# Multi-bp helpers: Summary PDFs subdivide §4/§7/§8/§9 with explicit
# "Main Property" / "1st Extension" / "2nd Extension" headers. The
# existing single-bp fixture also carries "Main Property" as a header
# before the body. This helper splits a section into per-bp chunks.
_BP_HEADER_RE = re.compile(
r"^(Main Property|\d+(?:st|nd|rd|th) Extension)\s*$",
re.MULTILINE,
)
def _split_section_by_bp(self, section_text: str) -> List[tuple[str, str]]:
"""Split a section's text into per-bp subsections.
Returns ``[(bp_name, body), ...]`` in document order. Body is
the text between this bp's header and the next bp's header
(exclusive). Returns ``[("Main Property", section_text)]`` when
no headers are found (defensive fallback for malformed PDFs).
"""
matches = list(self._BP_HEADER_RE.finditer(section_text))
if not matches:
return [("Main Property", section_text)]
result: List[tuple[str, str]] = []
for i, m in enumerate(matches):
name = m.group(1)
body_start = m.end()
body_end = (
matches[i + 1].start() if i + 1 < len(matches) else len(section_text)
)
result.append((name, section_text[body_start:body_end]))
return result
def _section_lines(self, start: str, end: str) -> List[str]:
text = self._between(start, end)
return [l.strip() for l in text.splitlines() if l.strip()]
def _local_val(self, lines: List[str], label: str) -> Optional[str]:
lb = label.rstrip(":")
lc = lb + ":"
for i, line in enumerate(lines):
if line.startswith(lc) and len(line) > len(lc):
return line[len(lc):].strip() or None
if line == lc or line == lb:
for j in range(i + 1, min(i + 4, len(lines))):
v = lines[j]
if v.endswith(":") or v.startswith("©"):
return None
if v:
return v
return None
return None
def _local_str(self, lines: List[str], label: str) -> str:
v = self._local_val(lines, label)
return " ".join(v.split()) if v else ""
def _local_bool(self, lines: List[str], label: str) -> bool:
v = self._local_val(lines, label)
return v is not None and v.lower() == "yes"
# --- section extractors ---
def _extract_surveyor_info(self) -> SurveyorInfo:
return SurveyorInfo(
surveyor_code=self._str_val("Surveyor"),
name=self._str_val("Name"),
title=self._str_val("Title"),
tel_number=self._str_val("Tel Number"),
survey_reference=self._str_val("Survey Reference"),
my_reference=self._opt_str("My Reference"),
)
def _extract_property_details(self) -> PropertyDetails:
epc_m = re.search(
r"Check for the existence of\nan EPC:\n(Yes|No)", self._text
)
epc_exists = epc_m.group(1).lower() == "yes" if epc_m else False
return PropertyDetails(
rdsap_version=self._str_val("RdSAP version"),
reference_number=self._str_val("Reference Number"),
lodgement_required=self._bool_val("Lodgement Required"),
regs_region=self._str_val("Regs Region"),
epc_language=self._str_val("EPC Language"),
postcode=self._str_val("Postcode"),
region=self._str_val("Region"),
street=self._str_val("Street"),
town=self._str_val("Town"),
tenure=self._str_val("Property Tenure"),
transaction_type=self._str_val("Transaction Type"),
inspection_date=self._date_val("Inspection Date"),
process_date=self._date_val("Process date"),
epc_exists=epc_exists,
uprn=self._opt_str("UPRN"),
house_name=self._opt_str("House Name"),
house_number=self._opt_str("House No"),
locality=self._opt_str("Locality"),
county=self._opt_str("County"),
)
def _extract_attachment(self) -> str:
"""Extract the Summary's "attachment" line — the §1.0 built-form
descriptor (e.g. "M Mid-Terrace", "D Detached") that sits
between the property-type value and the §2.0 section header
for HOUSES.
Flats DON'T lodge an attachment line in the Elmhurst Summary;
the §2.0 Number of Storeys header follows immediately after
the "F Flat" property-type value. Detect that case and return
"" so the mapper's `built_form` doesn't capture section-
header noise.
"""
m = re.search(r"1\.0 Property type:\n[^\n]+\n([^\n]+)", self._text)
if not m:
return ""
candidate = " ".join(m.group(1).strip().split())
if re.match(r"^\d+\.\d+\s", candidate) or "Number of Storeys" in candidate:
return ""
return candidate
def _floors_from_dimensions_body(self, body: str) -> List[FloorDimension]:
"""Parse FloorDimension entries from a single bp's §4 body."""
matches = re.findall(
r"([A-Za-z ]+Floor):\n([\d.]+)\n([\d.]+)\n([\d.]+)\n([\d.]+)",
body,
)
return [
FloorDimension(
name=name.strip(),
area_m2=float(area),
room_height_m=float(height),
heat_loss_perimeter_m=float(hlp),
party_wall_length_m=float(pwl),
)
for name, area, height, hlp, pwl in matches
]
def _extract_dimensions(self) -> BuildingPartDimensions:
"""Main-property dimensions only. Extensions are picked up by
`_extract_extensions`."""
dim_type = self._str_val("Dimension type")
section = self._between("4.0 Dimensions:", "5.0 Conservatory:")
bp_chunks = self._split_section_by_bp(section)
main_body = bp_chunks[0][1] if bp_chunks else section
return BuildingPartDimensions(
dimension_type=dim_type,
floors=self._floors_from_dimensions_body(main_body),
)
def _wall_details_from_lines(self, lines: List[str]) -> WallDetails:
thickness_raw = self._local_val(lines, "Wall Thickness")
thickness_mm = (
int(thickness_raw.split()[0]) if thickness_raw else None
)
# Composite / retrofit insulation thickness — Summary §7.0
# writes the value on the line pair "Insulation Thickness" /
# "100 mm" when a composite filled-cavity-plus-external (or
# equivalent) wall is lodged. The "Insulation Thickness" label
# is local-scoped inside the §7 block so it does not collide
# with the §8 Roofs / §9 Floors blocks. None when the PDF
# omits the line (no retrofit lodged).
ins_thickness_raw = self._local_val(lines, "Insulation Thickness")
insulation_thickness_mm = (
int(ins_thickness_raw.split()[0])
if ins_thickness_raw and ins_thickness_raw.split()[0].isdigit()
else None
)
return WallDetails(
wall_type=self._local_str(lines, "Type"),
insulation=self._local_str(lines, "Insulation"),
thickness_unknown=self._local_bool(lines, "Wall Thickness Unknown"),
u_value_known=self._local_bool(lines, "U-value Known"),
party_wall_type=self._local_str(lines, "Party Wall Type"),
thickness_mm=thickness_mm,
insulation_thickness_mm=insulation_thickness_mm,
alternative_walls=self._alternative_walls_from_lines(lines),
)
def _alternative_walls_from_lines(self, lines: List[str]) -> List[AlternativeWall]:
"""Parse up to two §7 "Alternative Wall N" sub-area lodgements.
The Elmhurst Summary PDF lays them out as a contiguous block of
prefixed labels ("Alternative Wall 1 Area", "Alternative Wall 1
Type", …); we read each numbered slot independently and drop
slots whose Area is missing/zero."""
result: List[AlternativeWall] = []
for n in (1, 2):
area_raw = self._local_val(lines, f"Alternative Wall {n} Area")
if not area_raw:
continue
try:
area = float(area_raw.split()[0])
except (ValueError, IndexError):
continue
if area <= 0:
continue
thickness_raw = self._local_val(lines, f"Alternative Wall {n} Thickness")
thickness_mm = (
int(thickness_raw.split()[0])
if thickness_raw and thickness_raw.split()[0].isdigit()
else None
)
result.append(AlternativeWall(
area_m2=area,
wall_type=self._local_str(lines, f"Alternative Wall {n} Type"),
insulation=self._local_str(lines, f"Alternative Wall {n} Insulation"),
thickness_unknown=self._local_bool(
lines, f"Alternative Wall {n} Thickness Unknown"
),
thickness_mm=thickness_mm,
u_value_known=self._local_bool(
lines, f"Alternative Wall {n} U-value Known"
),
# RdSAP10 §5.8 + Table 14: dry-lined uninsulated wall adds
# R = 0.17 m²K/W to base U. Cohort fixture: cert 7700
# Alt 1 "CavityWallPlasterOnDabs" lodges Dry-lining: Yes →
# U = 1/(1/1.5 + 0.17) ≈ 1.20.
dry_lined=self._local_bool(
lines, f"Alternative Wall {n} Dry-lining"
),
))
return result
def _extract_walls(self) -> WallDetails:
section = self._between("7.0 Walls:", "8.0 Roofs:")
bp_chunks = self._split_section_by_bp(section)
main_body = bp_chunks[0][1] if bp_chunks else section
lines = [l.strip() for l in main_body.splitlines() if l.strip()]
return self._wall_details_from_lines(lines)
def _roof_details_from_lines(self, lines: List[str]) -> RoofDetails:
thickness_raw = self._local_val(lines, "Insulation Thickness")
thickness_mm = (
int(thickness_raw.split()[0]) if thickness_raw and thickness_raw.split()[0].isdigit() else None
)
insulation = self._local_str(lines, "Insulation")
# The Summary PDF omits the "Insulation Thickness" line entirely
# when no retrofit insulation is lodged (e.g. "Insulation: N None"
# on 000516). Treat that case as 0 mm so the cascade picks Table
# 16 row 0 (U=2.30) rather than the age-band default — the
# surveyor explicitly recorded "None".
if thickness_mm is None and insulation.split(" ", 1)[0] == "N":
thickness_mm = 0
return RoofDetails(
roof_type=self._local_str(lines, "Type"),
insulation=insulation,
u_value_known=self._local_bool(lines, "U-value Known"),
insulation_thickness_mm=thickness_mm,
)
def _extract_roof(self) -> RoofDetails:
section = self._between("8.0 Roofs:", "8.1 Rooms in Roof:")
bp_chunks = self._split_section_by_bp(section)
main_body = bp_chunks[0][1] if bp_chunks else section
lines = [l.strip() for l in main_body.splitlines() if l.strip()]
return self._roof_details_from_lines(lines)
def _floor_details_from_lines(self, lines: List[str]) -> FloorDetails:
u_val_raw = self._local_val(lines, "Default U-value")
default_u = float(u_val_raw) if u_val_raw else None
return FloorDetails(
location=self._local_str(lines, "Location"),
floor_type=self._local_str(lines, "Type"),
insulation=self._local_str(lines, "Insulation"),
u_value_known=self._local_bool(lines, "U-value Known"),
default_u_value=default_u,
)
def _extract_floor(self) -> FloorDetails:
section = self._between("9.0 Floors:", "10.0 Doors:")
bp_chunks = self._split_section_by_bp(section)
main_body = bp_chunks[0][1] if bp_chunks else section
lines = [l.strip() for l in main_body.splitlines() if l.strip()]
return self._floor_details_from_lines(lines)
def _extract_door_u_value(self) -> Optional[float]:
"""Read the §10 Doors block's "Average U-value" lodging.
Scoped to the §10..§11 slice so the global "U-value" labels in
Walls/Roofs/Floors can't shadow the door reading. None when the
PDF omits the line (e.g. all doors recorded as uninsulated)."""
lines = self._section_lines("10.0 Doors:", "11.0 Windows:")
raw = self._local_val(lines, "Average U-value")
if not raw:
return None
try:
return float(raw.split()[0])
except (ValueError, IndexError):
return None
# RIR surface row: `<name> <length> <height> [<insulation> [<ins_type>]
# [<gable_type>] <default_u> <known> <u>]`. The middle slot
# widths vary by surface kind; we match the four leading numerics
# robustly (length, height, default_u, u_value) and slot the
# remaining textual fields by position. The layout preprocessor
# collapses multi-space-separated cells into single newlines, so
# each row in the dump occupies multiple lines per cell.
_RIR_SURFACE_NAMES: tuple[str, ...] = (
"Flat Ceiling 1", "Flat Ceiling 2",
"Stud Wall 1", "Stud Wall 2",
"Slope 1", "Slope 2",
"Gable Wall 1", "Gable Wall 2",
"Common Wall 1", "Common Wall 2",
)
def _extract_room_in_roof(
self, main_dim_body: str, age_band_text: str
) -> Optional[RoomInRoof]:
"""Parse the §8.1 Rooms in Roof section for the Main bp. Returns
None when no RR is lodged (single-storey or simple loft houses).
`main_dim_body` is the Main-property §4 chunk used to pull the
RR floor area; `age_band_text` is the §3 raw text holding the
"Main Prop. Room(s) in Roof <band>" line."""
# RR floor area lives in §4 Dimensions immediately above the
# storey floor entries: "Room(s) in Roof: 15.06".
m = re.search(r"Room\(s\) in Roof:\s+(\d+(?:\.\d+)?)", main_dim_body)
if m is None:
return None
floor_area = float(m.group(1))
if floor_area <= 0:
return None
section = self._between("8.1 Rooms in Roof:", "9.0 Floors:")
if not section.strip() or "Room in roof type" not in section:
return None
bp_chunks = self._split_section_by_bp(section)
main_body = bp_chunks[0][1] if bp_chunks else section
lines = [l.strip() for l in main_body.splitlines() if l.strip()]
assessment_idx = next(
(i for i, l in enumerate(lines) if l == "Assessment"), None
)
assessment = (
lines[assessment_idx + 1] if assessment_idx is not None and assessment_idx + 1 < len(lines) else ""
)
surfaces: List[RoomInRoofSurface] = []
for name in self._RIR_SURFACE_NAMES:
try:
idx = lines.index(name)
except ValueError:
continue
surfaces.append(self._parse_rir_surface_row(name, lines, idx))
# Age band from §3: "Main Prop. Room(s) in Roof B 1900-1929"
age_m = re.search(
r"Main Prop\. Room\(s\) in Roof\s+([A-M] [^\n]+)", age_band_text
)
age_band = age_m.group(1).strip() if age_m else None
return RoomInRoof(
floor_area_m2=floor_area,
construction_age_band=age_band,
assessment=assessment,
surfaces=surfaces,
)
_RIR_NUMERIC_RE = re.compile(r"^-?\d+(?:\.\d+)?$")
_RIR_INSULATION_THICKNESS_RE = re.compile(r"^\d+\s*mm$")
def _parse_rir_surface_row(
self, name: str, lines: List[str], idx: int
) -> RoomInRoofSurface:
"""One RR surface row spans the name line followed by ~6-9 tokens
depending on which optional cells the surveyor filled. The token
order is stable: length, height, [insulation], [ins_type],
[gable_type], default_u, u_known, u_value. Numeric cells (length,
height, default_u, u_value) are the anchor; everything else is
slotted into the appropriate textual field."""
# Walk forward until either we exhaust the cell budget or hit
# the next RIR row's name marker — the layout dump puts each
# numeric / textual cell on its own line and we can't tell
# the LAST cell of THIS row from the FIRST cell of the next
# without that signal.
tokens: List[str] = []
scan_end = min(idx + 10, len(lines))
for j in range(idx + 1, scan_end):
if self._is_next_rir_row(lines[j]):
break
tokens.append(lines[j])
# First two numerics = length, height
length = float(tokens[0]) if tokens and self._RIR_NUMERIC_RE.match(tokens[0]) else 0.0
height = float(tokens[1]) if len(tokens) > 1 and self._RIR_NUMERIC_RE.match(tokens[1]) else 0.0
# Last numeric is u_value; preceding "Yes"/"No" is u_value_known;
# the numeric before that is default_u.
# Walk from the end backwards looking for the u_value, then known
# flag, then default_u.
u_value = 0.0
u_value_known = False
default_u: Optional[float] = None
# The known/default_u tail is fairly stable; collect the trailing
# tokens and slot by position. The "known" token is "No" or "Yes".
rev = list(reversed(tokens[2:]))
# rev[0] = u_value, rev[1] = u_value_known, rev[2] = default_u
if len(rev) >= 1 and self._RIR_NUMERIC_RE.match(rev[0]):
u_value = float(rev[0])
if len(rev) >= 2 and rev[1] in ("Yes", "No"):
u_value_known = rev[1] == "Yes"
if len(rev) >= 3 and self._RIR_NUMERIC_RE.match(rev[2]):
default_u = float(rev[2])
# Middle textual cells: insulation, insulation_type, gable_type.
# Drop the leading length/height (already consumed) and the
# trailing 3 tokens (default_u, known, u_value).
middle = tokens[2:-3] if len(tokens) >= 5 else []
insulation = ""
insulation_type: Optional[str] = None
gable_type: Optional[str] = None
for t in middle:
if self._RIR_INSULATION_THICKNESS_RE.match(t) or t in ("As Built", "None"):
if not insulation:
insulation = t
elif t in ("Mineral or EPS", "PUR", "PIR"):
insulation_type = t
elif t in ("Party", "Sheltered", "Connected to heated space"):
gable_type = t
return RoomInRoofSurface(
name=name,
length_m=length,
height_m=height,
insulation=insulation,
insulation_type=insulation_type,
gable_type=gable_type,
default_u_value=default_u,
u_value_known=u_value_known,
u_value=u_value,
)
def _is_next_rir_row(self, line: str) -> bool:
return line in self._RIR_SURFACE_NAMES
def _extract_extensions(self) -> List[ExtensionPart]:
"""Collect non-Main building parts. Cross-references the §4, §7,
§8, §9 per-bp subsections by extension name. "As Main: Yes"
within a section body inherits the main bp's data for that
section; otherwise the section body is parsed in isolation."""
# Gather per-section chunks once.
dim_section = self._between("4.0 Dimensions:", "5.0 Conservatory:")
wall_section = self._between("7.0 Walls:", "8.0 Roofs:")
roof_section = self._between("8.0 Roofs:", "8.1 Rooms in Roof:")
floor_section = self._between("9.0 Floors:", "10.0 Doors:")
dim_type = self._str_val("Dimension type")
dim_chunks = dict(self._split_section_by_bp(dim_section))
wall_chunks = dict(self._split_section_by_bp(wall_section))
roof_chunks = dict(self._split_section_by_bp(roof_section))
floor_chunks = dict(self._split_section_by_bp(floor_section))
main_walls = self._extract_walls()
main_roof = self._extract_roof()
main_floor = self._extract_floor()
# Per-bp age-band lookup. Section 3 contains lines like
# "1st Extension B 1900-1929" — the band sits after the name.
age_band_re = re.compile(
r"^(\d+(?:st|nd|rd|th) Extension)\s+([A-M] [^\n]+)$",
re.MULTILINE,
)
age_bands = {m.group(1): m.group(2).strip() for m in age_band_re.finditer(self._text)}
# Collect names in document order from the dimensions section
# (excluding Main Property).
names = [
name for name, _ in self._split_section_by_bp(dim_section)
if name != "Main Property"
]
extensions: List[ExtensionPart] = []
for name in names:
dim_body = dim_chunks.get(name, "")
wall_body = wall_chunks.get(name, "")
roof_body = roof_chunks.get(name, "")
floor_body = floor_chunks.get(name, "")
wall_lines = [l.strip() for l in wall_body.splitlines() if l.strip()]
roof_lines = [l.strip() for l in roof_body.splitlines() if l.strip()]
floor_lines = [l.strip() for l in floor_body.splitlines() if l.strip()]
if self._local_bool(wall_lines, "As Main Wall"):
# Alternative walls live in the extension's own chunk
# even when the main wall fields are inherited; merge
# them into the inherited WallDetails so the bp carries
# them through to its SapBuildingPart.
walls = WallDetails(
wall_type=main_walls.wall_type,
insulation=main_walls.insulation,
thickness_unknown=main_walls.thickness_unknown,
u_value_known=main_walls.u_value_known,
party_wall_type=main_walls.party_wall_type,
thickness_mm=main_walls.thickness_mm,
insulation_thickness_mm=main_walls.insulation_thickness_mm,
alternative_walls=self._alternative_walls_from_lines(wall_lines),
)
else:
walls = self._wall_details_from_lines(wall_lines)
roof = main_roof if self._local_bool(roof_lines, "As Main") else self._roof_details_from_lines(roof_lines)
floor = main_floor if self._local_bool(floor_lines, "As Main") else self._floor_details_from_lines(floor_lines)
extensions.append(
ExtensionPart(
name=name,
construction_age_band=age_bands.get(name, ""),
dimensions=BuildingPartDimensions(
dimension_type=dim_type,
floors=self._floors_from_dimensions_body(dim_body),
),
walls=walls,
roof=roof,
floor=floor,
)
)
return extensions
def _extract_windows(self) -> List[Window]:
# Textract-style pages keep "Permanent\s+Shutters" adjacent in
# reading order and the windows table flows as one column-block
# the existing token-walker can step through. PDF-derived pages
# (Summary PDFs preprocessed from `pdftotext -layout`) break the
# header across lines, so this regex misses entirely and the
# `_extract_windows_from_layout` fallback below picks them up
# by anchoring on the W/H/Area data line.
m = re.search(
r"Permanent\s+Shutters\n(.*?)Draught Proofing",
self._text,
re.DOTALL,
)
if not m:
return self._extract_windows_from_layout()
tokens = [t.strip() for t in m.group(1).splitlines() if t.strip()]
windows: List[Window] = []
i = 0
while i + 12 < len(tokens):
try:
width_m = float(tokens[i])
height_m = float(tokens[i + 1])
area_m2 = float(tokens[i + 2])
except (ValueError, IndexError):
i += 1
continue
i += 3
# Collect glazing type tokens until frame_factor (0 < v ≤ 1.0)
glazing_parts: List[str] = []
while i < len(tokens):
try:
v = float(tokens[i])
if 0.0 < v <= 1.0:
break
glazing_parts.append(tokens[i])
except ValueError:
glazing_parts.append(tokens[i])
i += 1
# If last glazing token is a single word (no spaces, not numeric) it's the frame_type
frame_type: Optional[str] = None
if glazing_parts and " " not in glazing_parts[-1] and not glazing_parts[-1].replace(".", "").isdigit():
frame_type = glazing_parts.pop()
glazing_type = " ".join(glazing_parts).strip()
if i >= len(tokens):
break
frame_factor = float(tokens[i]); i += 1
# Consume glazing_gap if present ("mm" token, possibly multi-token e.g. "16 mm or more")
glazing_gap: Optional[str] = None
if i < len(tokens) and "mm" in tokens[i]:
gap_parts = [tokens[i]]; i += 1
while i < len(tokens) and tokens[i].lower() in {"or", "more"}:
gap_parts.append(tokens[i]); i += 1
glazing_gap = " ".join(gap_parts)
building_part = tokens[i]; i += 1
location = tokens[i]; i += 1
orientation = tokens[i]; i += 1
data_source = tokens[i]; i += 1
u_value = float(tokens[i]); i += 1
g_value = float(tokens[i]); i += 1
draught_proofed = tokens[i].lower() == "yes"; i += 1
permanent_shutters = tokens[i]; i += 1
windows.append(
Window(
width_m=width_m,
height_m=height_m,
area_m2=area_m2,
glazing_type=glazing_type,
frame_factor=frame_factor,
building_part=building_part,
location=location,
orientation=orientation,
data_source=data_source,
u_value=u_value,
g_value=g_value,
draught_proofed=draught_proofed,
permanent_shutters=permanent_shutters,
frame_type=frame_type,
glazing_gap=glazing_gap,
)
)
return windows
# Anchors used by the layout-style window parser. The W/H/Area anchor
# is sometimes followed by a joined glazing-type phrase on the same
# line (e.g. '1.22 1.76 2.15 Double pre 2002'); the optional 4th
# capture surfaces that text so the parser can use it instead of a
# separately-laid-out prefix line.
_WIDTH_HEIGHT_AREA_RE = re.compile(
r"^(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)(?:\s+(\S.*?))?$"
)
_MANUFACTURER_RE = re.compile(r"^(Manufacturer|Default)\s+(\d+\.\d+)$")
_ORIENTATION_TOKENS = frozenset({
"North", "South", "East", "West", "NE", "NW", "SE", "SW",
})
_BP_INLINE_TOKENS = frozenset({"Main"}) # "Extension" only appears as suffix
# The Elmhurst Summary PDF lodges each window's glazing-type as a
# capitalised phrase like "Double between 2002" / "Double with unknown"
# / "Single" / "Triple" / "Secondary". The first token of that phrase
# marks the start of a new window's prefix block in the layout dump,
# which is the only stable signal partitioning one window's suffix
# from the next window's prefix.
_GLAZING_TYPE_PREFIX_WORDS = frozenset({
"Single", "Double", "Triple", "Secondary",
})
def _extract_windows_from_layout(self) -> List[Window]:
"""Fallback window parser for Summary PDFs preprocessed from
`pdftotext -layout`. Each window has two stable anchors:
a "W H Area" line and a "Manufacturer <U_value>" line a few
lines further down. Everything between holds frame_type,
frame_factor, and a variable mix of glazing_gap, building_part,
location, and orientation (depending on which fields the
surveyor lodged); everything around the window holds glazing-
type/building-part/orientation prefix/suffix tokens split by
the layout preprocessor.
"""
m = re.search(
r"11\.0 Windows:(.*?)(Draught Proofing|12\.0 Ventilation)",
self._text, re.DOTALL,
)
if not m:
return []
lines = m.group(1).splitlines()
# Locate all (data_line, manufacturer_line) pairs in document
# order. Each pair is one window.
data_anchors: List[tuple[int, re.Match[str]]] = []
for i, line in enumerate(lines):
anchor = self._WIDTH_HEIGHT_AREA_RE.match(line.strip())
if anchor is not None:
data_anchors.append((i, anchor))
windows: List[Window] = []
for k, (data_idx, anchor) in enumerate(data_anchors):
manuf_idx = self._find_manufacturer_after(lines, data_idx)
if manuf_idx is None:
continue
prev_manuf_idx = (
self._find_manufacturer_after(lines, data_anchors[k - 1][0])
if k > 0 else None
)
next_data_idx = (
data_anchors[k + 1][0] if k + 1 < len(data_anchors) else len(lines)
)
# Partition the cross-window gap between this window's suffix
# and the next window's prefix on the first glazing-type-start
# token (Single/Double/Triple/Secondary). The same boundary
# is used symmetrically — current window's `after_end` = next
# window's `before_start` — so prefix tokens of W_{k+1} never
# get attributed as suffix of W_k (which was the bug producing
# orientation='East-South' for windows where 'South' actually
# belonged to the next row).
before_start = (
self._partition_after_manuf(lines, prev_manuf_idx, data_idx)
if prev_manuf_idx is not None else 0
)
after_end = self._partition_after_manuf(lines, manuf_idx, next_data_idx)
try:
window = self._parse_window_from_anchors(
lines=lines,
data_idx=data_idx,
manuf_idx=manuf_idx,
anchor=anchor,
before_start=before_start,
after_end=after_end,
)
except (ValueError, IndexError):
continue
if window is not None:
windows.append(window)
return windows
def _find_manufacturer_after(self, lines: List[str], data_idx: int) -> Optional[int]:
for j in range(data_idx + 1, min(data_idx + 12, len(lines))):
if self._MANUFACTURER_RE.match(lines[j].strip()):
return j
return None
_FRAME_TYPE_AND_FACTOR_RE = re.compile(r"^(\S+(?:\s+\S+)*?)\s+(\d\.\d+)$")
_FRAME_FACTOR_ONLY_RE = re.compile(r"^(\d\.\d+)$")
def _parse_frame_type_and_factor(
self, lines: List[str], data_idx: int
) -> tuple[str, Optional[float], int]:
"""Return `(frame_type, frame_factor, middle_start_idx)` from
the lines immediately after the data anchor. Layouts vary:
(a) "PVC" on data+1, "0.70" on data+2 — the original 000474
shape;
(b) "Wood 0.70" on data+1 — joined-cell variant from 000487
and 000516 first-row windows;
(c) "0.70" alone on data+1 (no frame_type word at all) —
seen in 000487's subsequent windows where the
preprocessor dropped the frame-type column. frame_type
is recovered downstream from glazing-type defaults or
left empty."""
first = lines[data_idx + 1].strip()
combined = self._FRAME_TYPE_AND_FACTOR_RE.match(first)
if combined is not None:
return combined.group(1), float(combined.group(2)), data_idx + 2
factor_only = self._FRAME_FACTOR_ONLY_RE.match(first)
if factor_only is not None:
return "", float(factor_only.group(1)), data_idx + 2
if data_idx + 2 >= len(lines):
return first, None, data_idx + 2
frame_type = first
try:
frame_factor = float(lines[data_idx + 2].strip())
except ValueError:
return frame_type, None, data_idx + 3
return frame_type, frame_factor, data_idx + 3
def _partition_after_manuf(
self, lines: List[str], manuf_idx: int, next_data_idx: int
) -> int:
"""Return the exclusive upper bound for this window's suffix
block (and the inclusive lower bound for the next window's prefix
block). After the manufacturer line come 3 fixed tokens (g_value,
draught, shutters); the variable suffix lines start at manuf+4
and run until either (a) the next window's glazing-type-start
token (e.g. 'Double between 2002', 'Single', 'Triple ...') or
(b) the second orientation token in the gap, whichever comes
first. Branch (b) covers layouts where the glazing-type is
joined to the data line (no separate prefix line exists), so
the only signal of window-transition is the orientation tokens
rotating: orient_suffix(k) → orient_prefix(k+1). Falls through
to `next_data_idx` when neither marker is present."""
scan_start = manuf_idx + 4
seen_orient = False
for j in range(scan_start, next_data_idx):
stripped = lines[j].strip()
first_word = stripped.split(" ", 1)[0]
if first_word in self._GLAZING_TYPE_PREFIX_WORDS:
return j
if stripped in self._ORIENTATION_TOKENS:
if seen_orient:
return j
seen_orient = True
return next_data_idx
def _parse_window_from_anchors(
self,
*,
lines: List[str],
data_idx: int,
manuf_idx: int,
anchor: re.Match[str],
before_start: int,
after_end: int,
) -> Optional[Window]:
width = float(anchor.group(1))
height = float(anchor.group(2))
area = float(anchor.group(3))
# Layout-style cell joining sometimes leaves the glazing-type
# phrase trailing the W H Area triplet on the same line (e.g.
# "1.22 1.76 2.15 Double pre 2002"); when present we pass it
# through as `inline_glazing_type` and the composer skips the
# would-be glazing-prefix scan.
inline_glazing_type = anchor.group(4) if anchor.lastindex and anchor.lastindex >= 4 else None
# frame_type and frame_factor immediately follow the data line.
# Layout-style cell joining sometimes collapses them onto a
# single "Wood 0.70" line; treat both shapes uniformly so the
# downstream `middle` slice still starts at the first variable
# field (glazing_gap / bp / location / orient).
if data_idx + 1 >= len(lines):
return None
frame_type, frame_factor, middle_start = self._parse_frame_type_and_factor(
lines, data_idx
)
if frame_factor is None or not 0.0 < frame_factor <= 1.0:
return None
# Variable-order tokens between frame_factor and Manufacturer.
middle = [lines[j].strip() for j in range(middle_start, manuf_idx)]
glazing_gap = next((t for t in middle if "mm" in t.lower()), None)
# Wall-location lodging. Most rows put "External wall" in
# `middle`; alt-wall rows (cert 2636 window-4 / cert 9418 alt-
# wall window) put "Alternative wall" in the PRE-data slice
# (between the previous window's end and W×H×A). Search both
# slices so either layout resolves to the correct location.
pre_data = [lines[j].strip() for j in range(before_start, data_idx)]
location = (
next((t for t in middle if "wall" in t.lower()), None)
or next((t for t in pre_data if "wall" in t.lower()), None)
or "External wall"
)
bp_inline = next((t for t in middle if t in self._BP_INLINE_TOKENS), None)
orient_inline = next(
(t for t in middle if t in self._ORIENTATION_TOKENS), None
)
# Manufacturer line carries data_source + u_value.
manuf_match = self._MANUFACTURER_RE.match(lines[manuf_idx].strip())
if manuf_match is None:
return None
data_source = manuf_match.group(1)
u_value = float(manuf_match.group(2))
# Post-manufacturer: g_value, draught, shutters.
if manuf_idx + 3 >= len(lines):
return None
try:
g_value = float(lines[manuf_idx + 1].strip())
except ValueError:
return None
draught_proofed = lines[manuf_idx + 2].strip().lower() == "yes"
permanent_shutters = lines[manuf_idx + 3].strip()
# Prefix / suffix tokens (variable count) carry the
# glazing-type, building-part, and orientation strings split by
# the layout preprocessor.
before = [lines[j].strip() for j in range(before_start, data_idx) if lines[j].strip()]
after = [lines[j].strip() for j in range(manuf_idx + 4, after_end) if lines[j].strip()]
glazing_type, building_part, orientation = self._compose_window_descriptors(
before=before,
after=after,
bp_inline=bp_inline,
orient_inline=orient_inline,
inline_glazing_type=inline_glazing_type,
)
return Window(
width_m=width,
height_m=height,
area_m2=area,
glazing_type=glazing_type,
frame_factor=frame_factor,
building_part=building_part,
location=location,
orientation=orientation,
data_source=data_source,
u_value=u_value,
g_value=g_value,
draught_proofed=draught_proofed,
permanent_shutters=permanent_shutters,
frame_type=frame_type,
glazing_gap=glazing_gap,
)
def _compose_window_descriptors(
self,
*,
before: List[str],
after: List[str],
bp_inline: Optional[str],
orient_inline: Optional[str],
inline_glazing_type: Optional[str] = None,
) -> tuple[str, str, str]:
"""Re-join the glazing-type / building-part / orientation tokens
split by the layout preprocessor. Each is at most 2 fragments
(one before the data line, one after); inline tokens in the
between-segment win over prefix/suffix fragments."""
# before holds (in document order, possibly): glazing_prefix,
# bp_prefix, orient_prefix — bp/orient may be missing.
# after holds: glazing_suffix, bp_suffix, orient_suffix — same.
prefix = list(before[-3:]) # last 3 lines preceding data
suffix = list(after[:3])
def pop_if_orientation(tokens: List[str]) -> Optional[str]:
for t in tokens:
if t in self._ORIENTATION_TOKENS:
tokens.remove(t)
return t
return None
def pop_if_bp_fragment(tokens: List[str]) -> Optional[str]:
# Prefix fragments like "1st" / "2nd" — match digit-prefixed
# ordinals; suffix fragments are always "Extension".
for t in tokens:
if re.match(r"^\d+(?:st|nd|rd|th)$", t) or t == "Extension":
tokens.remove(t)
return t
return None
orient_prefix_token = pop_if_orientation(prefix)
orient_suffix_token = pop_if_orientation(suffix)
bp_prefix_frag = pop_if_bp_fragment(prefix)
bp_suffix_frag = pop_if_bp_fragment(suffix)
# Glazing type: an inline glazing-type captured from the data
# line (layout-joined variant) wins; otherwise join the remaining
# prefix + suffix fragments.
if inline_glazing_type is not None:
glazing_type = inline_glazing_type
else:
glazing_type = " ".join([*prefix, *suffix]).strip()
# Building part: inline token wins; otherwise join prefix + suffix.
if bp_inline is not None:
building_part = bp_inline
else:
building_part = " ".join(
t for t in (bp_prefix_frag, bp_suffix_frag) if t
).strip()
# Orientation: inline token wins for the primary direction;
# combine with the opposite-direction fragment when present.
primary = orient_inline or orient_prefix_token or ""
secondary_candidates = [
t for t in (orient_prefix_token, orient_suffix_token) if t and t != primary
]
if primary and secondary_candidates:
orientation = f"{primary}-{secondary_candidates[0]}"
else:
orientation = primary
return glazing_type, building_part, orientation
def _extract_ventilation(self) -> VentilationAndCooling:
return VentilationAndCooling(
open_chimneys_count=self._int_val("No. of open chimneys"),
open_flues_count=self._int_val("No. of open flues"),
open_chimneys_closed_fire_count=self._int_val(
"No. of open chimneys/open flues attached to closed fire"
),
solid_fuel_boiler_flues_count=self._int_val(
"No. of flues attached to solid fuel boiler"
),
other_heater_flues_count=self._int_val(
"No. of open flues attached to other heater"
),
blocked_chimneys_count=self._int_val("No. of blocked chimneys"),
extract_fans_count=self._int_val("No. of intermittent extract fans"),
passive_vents_count=self._int_val("No. of passive vents"),
flueless_gas_fires_count=self._int_val("No. of flueless gas fires"),
fixed_space_cooling=self._bool_val("Fixed Space Cooling"),
draught_lobby=self._str_val("Draught Lobby"),
mechanical_ventilation=self._bool_val("Mechanical Ventilation"),
pressure_test_method=self._str_val("Test Method"),
)
def _extract_lighting(self) -> Lighting:
led_cfl_count_known = self._bool_val("Number of LED and CFL Known")
return Lighting(
total_bulbs=self._int_val("Total number of bulbs"),
led_cfl_count_known=led_cfl_count_known,
led_count=self._int_val("Number of LED lights"),
cfl_count=self._int_val("Number of CFL lights"),
incandescent_count=self._int_val("Total number of incandescents"),
low_energy_count=(
0 if led_cfl_count_known
else self._int_val("Total number of Low Energy")
),
)
def _extract_main_heating(self) -> MainHeating:
lines = self._section_lines("14.0 Main Heating1", "14.1 Main Heating2")
pct_raw = self._local_val(lines, "Percentage of Heat")
pct = int(pct_raw.split()[0]) if pct_raw else 0
# The "Secondary Heating SapCode" key is lodged inside §14.1 Main
# Heating2 — Elmhurst uses the Main-2 block to also carry the
# cert's secondary heating system (when one exists). Look for it
# in that section; absence (or "0") means no secondary lodged.
secondary_lines = self._section_lines(
"14.1 Main Heating2", "14.1 Community Heating"
)
secondary_raw = self._local_val(secondary_lines, "Secondary Heating SapCode")
secondary_code = (
int(secondary_raw)
if secondary_raw is not None and secondary_raw.isdigit()
and int(secondary_raw) > 0
else None
)
return MainHeating(
heat_emitter=self._local_str(lines, "Heat Emitter"),
fuel_type=self._local_str(lines, "Fuel Type"),
flue_type=self._local_str(lines, "Flue Type"),
fan_assisted_flue=self._local_bool(lines, "Fan Assisted Flue"),
design_flow_temperature=self._local_str(lines, "Design flow temperature"),
heating_controls_ees=self._local_str(lines, "Main Heating Controls EES"),
heating_controls_sap=self._local_str(lines, "Main Heating Controls Sap"),
percentage_of_heat=pct,
pcdf_boiler_reference=self._local_val(lines, "PCDF boiler Reference"),
heat_pump_age=self._local_val(lines, "Heat pump age"),
secondary_heating_sap_code=secondary_code,
)
def _extract_meters(self) -> Meters:
return Meters(
electricity_meter_type=self._str_val("Electricity meter type"),
main_gas=self._bool_val("Main gas"),
electricity_smart_meter=self._bool_val("Electricity Smart Meter Present"),
gas_smart_meter=self._bool_val("Gas Smart Meter Present"),
)
def _extract_water_heating(self) -> WaterHeating:
# §15.1 lodgings — Summary writes these only when a cylinder
# is present. The §15.1 block uses labels ("Cylinder Size",
# "Insulated", "Insulation Thickness") that collide with
# global occurrences elsewhere ("Insulation Thickness" also
# appears in §7 Walls / §8 Roofs); scope the lookups via
# `_local_val` against the §15.1..§15.2 slice to disambiguate.
cylinder_lines = self._section_lines(
"15.1 Hot Water Cylinder", "15.2 Community Hot Water",
)
cylinder_size_label = self._local_val(
cylinder_lines, "Cylinder Size",
)
cylinder_insulation_label = self._local_val(
cylinder_lines, "Insulated",
)
cylinder_ins_thickness_raw = self._local_val(
cylinder_lines, "Insulation Thickness",
)
cylinder_insulation_thickness_mm: Optional[int] = None
if cylinder_ins_thickness_raw:
first = cylinder_ins_thickness_raw.split()[0]
if first.isdigit():
cylinder_insulation_thickness_mm = int(first)
cylinder_thermostat_raw = self._local_val(
cylinder_lines, "Cylinder Thermostat",
)
cylinder_thermostat: Optional[bool] = (
cylinder_thermostat_raw.strip().lower() == "yes"
if cylinder_thermostat_raw is not None
else None
)
return WaterHeating(
water_heating_code=self._str_val("Water Heating Code"),
water_heating_sap_code=self._int_val("Water Heating SapCode"),
water_heating_fuel_type=self._str_val("Water Heating Fuel Type"),
hot_water_cylinder_present=self._bool_val("Hot Water Cylinder Present"),
cylinder_size_label=cylinder_size_label,
cylinder_insulation_label=cylinder_insulation_label,
cylinder_insulation_thickness_mm=cylinder_insulation_thickness_mm,
cylinder_thermostat=cylinder_thermostat,
)
def _extract_baths_and_showers(self) -> BathsAndShowers:
n_baths = self._int_val("Total Number of Baths")
n_connected = self._int_val("Number of Baths Connected")
try:
idx = self._lines.index("Connected")
except ValueError:
return BathsAndShowers(
number_of_baths=n_baths,
number_of_baths_connected=n_connected,
showers=[],
)
showers: List[Shower] = []
j = idx + 1
while j + 2 <= len(self._lines) - 1:
num_line = self._lines[j]
if not num_line.isdigit():
break
showers.append(
Shower(
shower_number=int(num_line),
outlet_type=self._lines[j + 1],
connected=self._lines[j + 2],
)
)
j += 3
return BathsAndShowers(
number_of_baths=n_baths,
number_of_baths_connected=n_connected,
showers=showers,
)
def _rating_val(self, label: str) -> int:
v = self._next_val(label)
try:
return int(v.split()[-1]) if v else 0
except (ValueError, IndexError):
return 0
def _extract_renewables(self) -> Renewables:
fghrs_lines = self._section_lines(
"18.0 Flue Gas Heat Recovery System", "19.0 Photovoltaic Panel"
)
fghrs = self._local_bool(fghrs_lines, "Present")
terrain = self._str_val("Terrain Type")
hydro_raw = self._next_val("Electricity generated [kWh/year]")
hydro = float(hydro_raw) if hydro_raw else 0.0
# RdSAP 10 §11.1 b): the Summary §19.0 may lodge a "% of roof
# area" row when the surveyor doesn't capture detailed kWp /
# orientation / pitch. `_int_val` returns 0 when the label is
# absent (cert lodges detailed pv_arrays instead) — collapse to
# None so downstream can distinguish "no PV" from "PV via %
# roof area path".
pv_pct = self._int_val("Proportion of roof area")
return Renewables(
solar_water_heating=self._bool_val("Solar Water Heating"),
wwhrs_present=self._bool_val("Is WWHRS present in the property?"),
flue_gas_heat_recovery_present=fghrs,
photovoltaic_panel=self._str_val("Photovoltaic Panel"),
export_capable_meter=self._bool_val("Export capable meter"),
wind_turbine_present=self._bool_val("Wind turbine present?"),
wind_turbines_terrain_type=terrain,
hydro_electricity_generated_kwh=hydro,
pv_arrays=self._extract_pv_arrays(),
pv_percent_roof_area=pv_pct if pv_pct > 0 else None,
)
def _extract_pv_arrays(self) -> List[ElmhurstPvArray]:
"""Parse the Elmhurst Summary §19.0 PV Panel section. Returns
one `ElmhurstPvArray` per lodged array, or [] when absent.
The Summary's PV block looks like (single-array, e.g. cert 0380):
Photovoltaic panel details
PV Cells kW Peak Orientation
Elevation
Overshading
3.00
South-East
45°
None Or Little
Multi-array (e.g. cert 0350 lodges 2 arrays):
...
1.50
South-East
45°
None Or Little
1.50
North-West
45°
None Or Little
— each array is 4 values in (kW Peak, Orientation, Elevation,
Overshading) order. Anchor on "Photovoltaic panel details",
skip header lines, then read values in 4-tuples until the
section breaks at the next §header or end-of-array tokens
(Batteries / Export / Capacity / etc.).
"""
anchor = "Photovoltaic panel details"
try:
idx = next(i for i, l in enumerate(self._lines) if l == anchor)
except StopIteration:
return []
# The header lines after the anchor are: "PV Cells kW Peak
# Orientation", "Elevation", "Overshading". Subsequent lines
# carry values for one OR MORE arrays. Stop at the next
# §-header (a "20.0" or "21.0") or post-PV section tokens
# ("Batteries", "Connected to", "Diverter", "Capacity", etc.).
header_tokens = {"pv cells", "kw peak", "orientation", "elevation", "overshading"}
stop_tokens = {
"batteries", "capacity known", "capacity",
"connected to the dwelling's meter", "diverter present",
"export capable meter",
}
values: List[str] = []
for line in self._lines[idx + 1:]:
stripped = line.strip()
if not stripped:
continue
lower = stripped.lower()
if lower in stop_tokens:
break
# Next §-header (e.g. "20.0 Wind Turbine") closes the block —
# match "<digits>.<digit><whitespace><word>" so kWp values
# like "1.50" don't trip the close.
if re.match(r"^\d{1,2}\.\d\s+\w", stripped):
break
if any(h in lower for h in header_tokens):
continue
values.append(stripped)
# Walk values in 4-tuples; an incomplete trailing tuple is dropped.
arrays: List[ElmhurstPvArray] = []
for i in range(0, len(values) - 3, 4):
try:
kwp = float(values[i])
except ValueError:
continue
orientation = values[i + 1]
# Elevation lodged as "45°" — strip trailing degree symbol.
m = re.match(r"^(\d+)", values[i + 2])
if m is None:
continue
elevation = int(m.group(1))
overshading = values[i + 3]
arrays.append(ElmhurstPvArray(
peak_power_kw=kwp,
orientation=orientation,
elevation_deg=elevation,
overshading=overshading,
))
return arrays
def extract(self) -> ElmhurstSiteNotes:
emissions_raw = self._next_val("Emissions (t/year)")
co2 = float(emissions_raw.split()[0]) if emissions_raw else 0.0
return ElmhurstSiteNotes(
surveyor_info=self._extract_surveyor_info(),
property_details=self._extract_property_details(),
current_sap_rating=self._rating_val("Current SAP rating"),
potential_sap_rating=self._rating_val("Potential SAP rating"),
current_ei_rating=self._rating_val("Current EI rating"),
potential_ei_rating=self._rating_val("Potential EI rating"),
co2_emissions_current_t=co2,
property_type=self._str_val("1.0 Property type"),
attachment=self._extract_attachment(),
number_of_storeys=self._int_val("Storeys"),
habitable_rooms=self._int_val("Habitable Rooms"),
heated_habitable_rooms=self._int_val("Heated Habitable Rooms"),
construction_age_band=self._str_val("Main Property"),
dimensions=self._extract_dimensions(),
has_conservatory=self._bool_val("Is there a conservatory?"),
walls=self._extract_walls(),
roof=self._extract_roof(),
floor=self._extract_floor(),
door_count=self._int_val("Total Number of Doors"),
insulated_door_count=self._int_val("Number of Insulated Doors"),
insulated_door_u_value=self._extract_door_u_value(),
windows=self._extract_windows(),
draught_proofing_percent=self._int_val("Draught Proofing"),
ventilation=self._extract_ventilation(),
lighting=self._extract_lighting(),
main_heating=self._extract_main_heating(),
meters=self._extract_meters(),
water_heating=self._extract_water_heating(),
baths_and_showers=self._extract_baths_and_showers(),
renewables=self._extract_renewables(),
extensions=self._extract_extensions(),
room_in_roof=self._extract_room_in_roof_from_text(),
)
def _extract_room_in_roof_from_text(self) -> Optional[RoomInRoof]:
"""Convenience wrapper: pulls the Main §4 body + the §3 age-band
text once so `_extract_room_in_roof` doesn't need to re-slice
the document."""
dim_section = self._between("4.0 Dimensions:", "5.0 Conservatory:")
bp_chunks = self._split_section_by_bp(dim_section)
main_body = bp_chunks[0][1] if bp_chunks else dim_section
return self._extract_room_in_roof(main_body, self._text)