mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
PR feedback (dancafc): `_parse_thickness_mm` handles a None input and returns Optional[int], so its call-return locals — and the Optional[str] raws they read from `_local_val` — read clearer when annotated. Annotates `thickness_raw`/`ins_thickness_raw: Optional[str]` and `thickness_mm`/`insulation_thickness_mm: Optional[int]` at all four call sites (_wall_details_from_lines, _alternative_walls_from_lines, _roof_details_from_lines, _floor_details_from_lines), plus the adjacent `u_val_raw`/`default_u` Optional pair in _floor_details_from_lines for consistency. Matches the project convention of typehinting call-return locals. No behaviour change; pyright clean, 569 parser tests pass. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1694 lines
77 KiB
Python
1694 lines
77 KiB
Python
import re
|
||
from datetime import date, datetime
|
||
from typing import List, Optional
|
||
|
||
from datatypes.epc.surveys.elmhurst_site_notes import (
|
||
AlternativeWall,
|
||
BathsAndShowers,
|
||
BuildingPartDimensions,
|
||
CommunityHeating,
|
||
ElmhurstSiteNotes,
|
||
ExtensionPart,
|
||
FloorDetails,
|
||
FloorDimension,
|
||
Lighting,
|
||
MainHeating,
|
||
MainHeating2,
|
||
Meters,
|
||
PropertyDetails,
|
||
Renewables,
|
||
RoofDetails,
|
||
RoomInRoof,
|
||
RoomInRoofSurface,
|
||
Shower,
|
||
SurveyorInfo,
|
||
VentilationAndCooling,
|
||
ElmhurstPvArray,
|
||
WallDetails,
|
||
WaterHeating,
|
||
Window,
|
||
)
|
||
|
||
|
||
def _parse_solar_pitch_deg(raw: Optional[str]) -> Optional[int]:
|
||
"""Parse the §16.0 "Collector elevation" lodgement (e.g. "30°", "60°",
|
||
or a bare integer). Returns None when absent or unparseable."""
|
||
if not raw:
|
||
return None
|
||
m = re.search(r"(\d+)", raw)
|
||
return int(m.group(1)) if m else None
|
||
|
||
|
||
class ElmhurstSiteNotesExtractor:
|
||
def __init__(self, pages: List[str]) -> None:
|
||
self._text = "\n".join(pages)
|
||
self._lines = [l.strip() for l in self._text.splitlines() if l.strip()]
|
||
|
||
# --- generic helpers ---
|
||
|
||
def _next_val(self, label: str) -> Optional[str]:
|
||
lc = label.rstrip(":") + ":"
|
||
lb = label.rstrip(":")
|
||
for i, line in enumerate(self._lines):
|
||
if line.startswith(lc) and len(line) > len(lc):
|
||
return line[len(lc):].strip() or None
|
||
if line == lc or line == lb:
|
||
for j in range(i + 1, min(i + 4, len(self._lines))):
|
||
v = self._lines[j]
|
||
if v.endswith(":") or v.startswith("©"):
|
||
return None
|
||
if v:
|
||
return v
|
||
return None
|
||
return None
|
||
|
||
def _str_val(self, label: str) -> str:
|
||
v = self._next_val(label)
|
||
return " ".join(v.split()) if v else ""
|
||
|
||
def _opt_str(self, label: str) -> Optional[str]:
|
||
v = self._next_val(label)
|
||
return " ".join(v.split()) if v else None
|
||
|
||
def _bool_val(self, label: str) -> bool:
|
||
v = self._next_val(label)
|
||
return v is not None and v.lower() == "yes"
|
||
|
||
def _int_val(self, label: str) -> int:
|
||
v = self._next_val(label)
|
||
try:
|
||
return int(v.split()[0]) if v else 0
|
||
except (ValueError, IndexError):
|
||
return 0
|
||
|
||
def _date_val(self, label: str) -> date:
|
||
v = self._next_val(label)
|
||
if not v:
|
||
raise ValueError(f"Missing date for label: {label}")
|
||
return datetime.strptime(v.strip(), "%d/%m/%Y").date()
|
||
|
||
def _between(self, start: str, end: str) -> str:
|
||
try:
|
||
s = self._text.index(start) + len(start)
|
||
e = self._text.index(end, s)
|
||
return self._text[s:e]
|
||
except ValueError:
|
||
return ""
|
||
|
||
# Multi-bp helpers: Summary PDFs subdivide §4/§7/§8/§9 with explicit
|
||
# "Main Property" / "1st Extension" / "2nd Extension" headers. The
|
||
# existing single-bp fixture also carries "Main Property" as a header
|
||
# before the body. This helper splits a section into per-bp chunks.
|
||
_BP_HEADER_RE = re.compile(
|
||
r"^(Main Property|\d+(?:st|nd|rd|th) Extension)\s*$",
|
||
re.MULTILINE,
|
||
)
|
||
|
||
def _split_section_by_bp(self, section_text: str) -> List[tuple[str, str]]:
|
||
"""Split a section's text into per-bp subsections.
|
||
|
||
Returns ``[(bp_name, body), ...]`` in document order. Body is
|
||
the text between this bp's header and the next bp's header
|
||
(exclusive). Returns ``[("Main Property", section_text)]`` when
|
||
no headers are found (defensive fallback for malformed PDFs).
|
||
"""
|
||
matches = list(self._BP_HEADER_RE.finditer(section_text))
|
||
if not matches:
|
||
return [("Main Property", section_text)]
|
||
result: List[tuple[str, str]] = []
|
||
for i, m in enumerate(matches):
|
||
name = m.group(1)
|
||
body_start = m.end()
|
||
body_end = (
|
||
matches[i + 1].start() if i + 1 < len(matches) else len(section_text)
|
||
)
|
||
result.append((name, section_text[body_start:body_end]))
|
||
return result
|
||
|
||
def _section_lines(self, start: str, end: str) -> List[str]:
|
||
text = self._between(start, end)
|
||
return [l.strip() for l in text.splitlines() if l.strip()]
|
||
|
||
def _section_lines_first_end(
|
||
self, start: str, ends: tuple[str, ...],
|
||
) -> List[str]:
|
||
"""Like `_section_lines` but accepts multiple end-marker candidates
|
||
and uses whichever appears first after `start`. Defends against
|
||
Summary-shape variants where the next-section heading differs
|
||
(e.g. §14.0 Main Heating1 closes at "14.1 Main Heating2" on
|
||
boiler/HP certs but at "14.1 Community Heating" on community-
|
||
heated certs)."""
|
||
try:
|
||
s = self._text.index(start) + len(start)
|
||
except ValueError:
|
||
return []
|
||
earliest: int | None = None
|
||
for end in ends:
|
||
try:
|
||
idx = self._text.index(end, s)
|
||
except ValueError:
|
||
continue
|
||
if earliest is None or idx < earliest:
|
||
earliest = idx
|
||
if earliest is None:
|
||
return []
|
||
text = self._text[s:earliest]
|
||
return [l.strip() for l in text.splitlines() if l.strip()]
|
||
|
||
def _local_val(self, lines: List[str], label: str) -> Optional[str]:
|
||
lb = label.rstrip(":")
|
||
lc = lb + ":"
|
||
for i, line in enumerate(lines):
|
||
if line.startswith(lc) and len(line) > len(lc):
|
||
return line[len(lc):].strip() or None
|
||
if line == lc or line == lb:
|
||
for j in range(i + 1, min(i + 4, len(lines))):
|
||
v = lines[j]
|
||
if v.endswith(":") or v.startswith("©"):
|
||
return None
|
||
if v:
|
||
return v
|
||
return None
|
||
return None
|
||
|
||
def _local_str(self, lines: List[str], label: str) -> str:
|
||
v = self._local_val(lines, label)
|
||
return " ".join(v.split()) if v else ""
|
||
|
||
def _local_bool(self, lines: List[str], label: str) -> bool:
|
||
v = self._local_val(lines, label)
|
||
return v is not None and v.lower() == "yes"
|
||
|
||
# --- section extractors ---
|
||
|
||
def _extract_surveyor_info(self) -> SurveyorInfo:
|
||
return SurveyorInfo(
|
||
surveyor_code=self._str_val("Surveyor"),
|
||
name=self._str_val("Name"),
|
||
title=self._str_val("Title"),
|
||
tel_number=self._str_val("Tel Number"),
|
||
survey_reference=self._str_val("Survey Reference"),
|
||
my_reference=self._opt_str("My Reference"),
|
||
)
|
||
|
||
def _extract_property_details(self) -> PropertyDetails:
|
||
epc_m = re.search(
|
||
r"Check for the existence of\nan EPC:\n(Yes|No)", self._text
|
||
)
|
||
epc_exists = epc_m.group(1).lower() == "yes" if epc_m else False
|
||
|
||
return PropertyDetails(
|
||
rdsap_version=self._str_val("RdSAP version"),
|
||
reference_number=self._str_val("Reference Number"),
|
||
lodgement_required=self._bool_val("Lodgement Required"),
|
||
regs_region=self._str_val("Regs Region"),
|
||
epc_language=self._str_val("EPC Language"),
|
||
postcode=self._str_val("Postcode"),
|
||
region=self._str_val("Region"),
|
||
street=self._str_val("Street"),
|
||
town=self._str_val("Town"),
|
||
tenure=self._str_val("Property Tenure"),
|
||
transaction_type=self._str_val("Transaction Type"),
|
||
inspection_date=self._date_val("Inspection Date"),
|
||
process_date=self._date_val("Process date"),
|
||
epc_exists=epc_exists,
|
||
uprn=self._opt_str("UPRN"),
|
||
house_name=self._opt_str("House Name"),
|
||
house_number=self._opt_str("House No"),
|
||
locality=self._opt_str("Locality"),
|
||
county=self._opt_str("County"),
|
||
)
|
||
|
||
def _extract_attachment(self) -> str:
|
||
"""Extract the Summary's "attachment" line — the §1.0 built-form
|
||
descriptor (e.g. "M Mid-Terrace", "D Detached") that sits
|
||
between the property-type value and the §2.0 section header
|
||
for HOUSES.
|
||
|
||
Flats DON'T lodge an attachment line in the Elmhurst Summary;
|
||
the §2.0 Number of Storeys header follows immediately after
|
||
the "F Flat" property-type value. Detect that case and return
|
||
"" so the mapper's `built_form` doesn't capture section-
|
||
header noise.
|
||
"""
|
||
m = re.search(r"1\.0 Property type:\n[^\n]+\n([^\n]+)", self._text)
|
||
if not m:
|
||
return ""
|
||
candidate = " ".join(m.group(1).strip().split())
|
||
if re.match(r"^\d+\.\d+\s", candidate) or "Number of Storeys" in candidate:
|
||
return ""
|
||
return candidate
|
||
|
||
def _floors_from_dimensions_body(self, body: str) -> List[FloorDimension]:
|
||
"""Parse FloorDimension entries from a single bp's §4 body."""
|
||
matches = re.findall(
|
||
r"([A-Za-z ]+Floor):\n([\d.]+)\n([\d.]+)\n([\d.]+)\n([\d.]+)",
|
||
body,
|
||
)
|
||
return [
|
||
FloorDimension(
|
||
name=name.strip(),
|
||
area_m2=float(area),
|
||
room_height_m=float(height),
|
||
heat_loss_perimeter_m=float(hlp),
|
||
party_wall_length_m=float(pwl),
|
||
)
|
||
for name, area, height, hlp, pwl in matches
|
||
]
|
||
|
||
def _extract_dimensions(self) -> BuildingPartDimensions:
|
||
"""Main-property dimensions only. Extensions are picked up by
|
||
`_extract_extensions`."""
|
||
dim_type = self._str_val("Dimension type")
|
||
section = self._between("4.0 Dimensions:", "5.0 Conservatory:")
|
||
bp_chunks = self._split_section_by_bp(section)
|
||
main_body = bp_chunks[0][1] if bp_chunks else section
|
||
return BuildingPartDimensions(
|
||
dimension_type=dim_type,
|
||
floors=self._floors_from_dimensions_body(main_body),
|
||
)
|
||
|
||
def _wall_details_from_lines(self, lines: List[str]) -> WallDetails:
|
||
thickness_raw: Optional[str] = self._local_val(lines, "Wall Thickness")
|
||
thickness_mm: Optional[int] = (
|
||
int(thickness_raw.split()[0]) if thickness_raw else None
|
||
)
|
||
# Composite / retrofit insulation thickness — Summary §7.0
|
||
# writes the value on the line pair "Insulation Thickness" /
|
||
# "100 mm" when a composite filled-cavity-plus-external (or
|
||
# equivalent) wall is lodged. The "Insulation Thickness" label
|
||
# is local-scoped inside the §7 block so it does not collide
|
||
# with the §8 Roofs / §9 Floors blocks. None when the PDF
|
||
# omits the line (no retrofit lodged).
|
||
ins_thickness_raw: Optional[str] = self._local_val(lines, "Insulation Thickness")
|
||
insulation_thickness_mm: Optional[int] = self._parse_thickness_mm(ins_thickness_raw)
|
||
return WallDetails(
|
||
wall_type=self._local_str(lines, "Type"),
|
||
insulation=self._local_str(lines, "Insulation"),
|
||
thickness_unknown=self._local_bool(lines, "Wall Thickness Unknown"),
|
||
u_value_known=self._local_bool(lines, "U-value Known"),
|
||
party_wall_type=self._local_str(lines, "Party Wall Type"),
|
||
thickness_mm=thickness_mm,
|
||
insulation_thickness_mm=insulation_thickness_mm,
|
||
alternative_walls=self._alternative_walls_from_lines(lines),
|
||
# Summary §7 lodges the per-BP "Curtain Wall Age" line only
|
||
# when `Type: CW Curtain Wall`. Per RdSAP 10 §5.18 (PDF
|
||
# p.48) this drives the curtain-wall U-value (Post 2023 →
|
||
# 1.4; Pre 2023 → 2.0) independent of the dwelling-wide
|
||
# age band. Use `_local_val` (Optional[str]) so absent
|
||
# lines surface as None, not the empty-string sentinel
|
||
# `_local_str` returns.
|
||
curtain_wall_age=self._local_val(lines, "Curtain Wall Age"),
|
||
)
|
||
|
||
def _alternative_walls_from_lines(self, lines: List[str]) -> List[AlternativeWall]:
|
||
"""Parse up to two §7 "Alternative Wall N" sub-area lodgements.
|
||
The Elmhurst Summary PDF lays them out as a contiguous block of
|
||
prefixed labels ("Alternative Wall 1 Area", "Alternative Wall 1
|
||
Type", …); we read each numbered slot independently and drop
|
||
slots whose Area is missing/zero."""
|
||
result: List[AlternativeWall] = []
|
||
for n in (1, 2):
|
||
area_raw = self._local_val(lines, f"Alternative Wall {n} Area")
|
||
if not area_raw:
|
||
continue
|
||
try:
|
||
area = float(area_raw.split()[0])
|
||
except (ValueError, IndexError):
|
||
continue
|
||
if area <= 0:
|
||
continue
|
||
thickness_raw: Optional[str] = self._local_val(
|
||
lines, f"Alternative Wall {n} Thickness"
|
||
)
|
||
thickness_mm: Optional[int] = self._parse_thickness_mm(thickness_raw)
|
||
result.append(AlternativeWall(
|
||
area_m2=area,
|
||
wall_type=self._local_str(lines, f"Alternative Wall {n} Type"),
|
||
insulation=self._local_str(lines, f"Alternative Wall {n} Insulation"),
|
||
thickness_unknown=self._local_bool(
|
||
lines, f"Alternative Wall {n} Thickness Unknown"
|
||
),
|
||
thickness_mm=thickness_mm,
|
||
u_value_known=self._local_bool(
|
||
lines, f"Alternative Wall {n} U-value Known"
|
||
),
|
||
# RdSAP10 §5.8 + Table 14: dry-lined uninsulated wall adds
|
||
# R = 0.17 m²K/W to base U. Cohort fixture: cert 7700
|
||
# Alt 1 "CavityWallPlasterOnDabs" lodges Dry-lining: Yes →
|
||
# U = 1/(1/1.5 + 0.17) ≈ 1.20.
|
||
dry_lined=self._local_bool(
|
||
lines, f"Alternative Wall {n} Dry-lining"
|
||
),
|
||
))
|
||
return result
|
||
|
||
def _extract_walls(self) -> WallDetails:
|
||
section = self._between("7.0 Walls:", "8.0 Roofs:")
|
||
bp_chunks = self._split_section_by_bp(section)
|
||
main_body = bp_chunks[0][1] if bp_chunks else section
|
||
lines = [l.strip() for l in main_body.splitlines() if l.strip()]
|
||
return self._wall_details_from_lines(lines)
|
||
|
||
@staticmethod
|
||
def _parse_thickness_mm(raw: Optional[str]) -> Optional[int]:
|
||
"""Parse an Elmhurst "Insulation Thickness" cell ("100 mm",
|
||
"400+ mm") to integer mm. The bucket-cap "400+ mm" (Table 17/18
|
||
max tabulated row) carries a trailing "+" that a bare
|
||
`.split()[0].isdigit()` test rejects — strip to the leading
|
||
digits so the cap parses through to the cascade with its numeric
|
||
value (simulated case 5: roof "400+ mm" was silently dropped →
|
||
u_roof fell back to the age-J default 0.16 instead of the
|
||
300mm+ value 0.11). Returns None when the cell is absent or
|
||
carries no leading number ("As Built", "N None")."""
|
||
if not raw:
|
||
return None
|
||
match = re.match(r"\d+", raw.strip())
|
||
return int(match.group()) if match else None
|
||
|
||
def _roof_details_from_lines(self, lines: List[str]) -> RoofDetails:
|
||
thickness_raw: Optional[str] = self._local_val(lines, "Insulation Thickness")
|
||
thickness_mm: Optional[int] = self._parse_thickness_mm(thickness_raw)
|
||
insulation = self._local_str(lines, "Insulation")
|
||
# The Summary PDF omits the "Insulation Thickness" line entirely
|
||
# when no retrofit insulation is lodged (e.g. "Insulation: N None"
|
||
# on 000516). Treat that case as 0 mm so the cascade picks Table
|
||
# 16 row 0 (U=2.30) rather than the age-band default — the
|
||
# surveyor explicitly recorded "None".
|
||
if thickness_mm is None and insulation.split(" ", 1)[0] == "N":
|
||
thickness_mm = 0
|
||
return RoofDetails(
|
||
roof_type=self._local_str(lines, "Type"),
|
||
insulation=insulation,
|
||
u_value_known=self._local_bool(lines, "U-value Known"),
|
||
insulation_thickness_mm=thickness_mm,
|
||
)
|
||
|
||
def _extract_roof(self) -> RoofDetails:
|
||
section = self._between("8.0 Roofs:", "8.1 Rooms in Roof:")
|
||
bp_chunks = self._split_section_by_bp(section)
|
||
main_body = bp_chunks[0][1] if bp_chunks else section
|
||
lines = [l.strip() for l in main_body.splitlines() if l.strip()]
|
||
return self._roof_details_from_lines(lines)
|
||
|
||
def _floor_details_from_lines(self, lines: List[str]) -> FloorDetails:
|
||
u_val_raw: Optional[str] = self._local_val(lines, "Default U-value")
|
||
default_u: Optional[float] = float(u_val_raw) if u_val_raw else None
|
||
# RdSAP 10 §5.13 Table 20 — retro-fitted upper floors lodge an
|
||
# "Insulation Thickness: NNN mm" cell so the cascade can route
|
||
# via the per-thickness column. Mirror of the §8 roof extractor
|
||
# at `_roof_details_from_lines`.
|
||
thickness_raw: Optional[str] = self._local_val(lines, "Insulation Thickness")
|
||
thickness_mm: Optional[int] = self._parse_thickness_mm(thickness_raw)
|
||
return FloorDetails(
|
||
location=self._local_str(lines, "Location"),
|
||
floor_type=self._local_str(lines, "Type"),
|
||
insulation=self._local_str(lines, "Insulation"),
|
||
u_value_known=self._local_bool(lines, "U-value Known"),
|
||
default_u_value=default_u,
|
||
insulation_thickness_mm=thickness_mm,
|
||
)
|
||
|
||
def _extract_floor(self) -> FloorDetails:
|
||
section = self._between("9.0 Floors:", "10.0 Doors:")
|
||
bp_chunks = self._split_section_by_bp(section)
|
||
main_body = bp_chunks[0][1] if bp_chunks else section
|
||
lines = [l.strip() for l in main_body.splitlines() if l.strip()]
|
||
return self._floor_details_from_lines(lines)
|
||
|
||
def _extract_door_u_value(self) -> Optional[float]:
|
||
"""Read the §10 Doors block's "Average U-value" lodging.
|
||
Scoped to the §10..§11 slice so the global "U-value" labels in
|
||
Walls/Roofs/Floors can't shadow the door reading. None when the
|
||
PDF omits the line (e.g. all doors recorded as uninsulated)."""
|
||
lines = self._section_lines("10.0 Doors:", "11.0 Windows:")
|
||
raw = self._local_val(lines, "Average U-value")
|
||
if not raw:
|
||
return None
|
||
try:
|
||
return float(raw.split()[0])
|
||
except (ValueError, IndexError):
|
||
return None
|
||
|
||
# RIR surface row: `<name> <length> <height> [<insulation> [<ins_type>]
|
||
# [<gable_type>] <default_u> <known> <u>]`. The middle slot
|
||
# widths vary by surface kind; we match the four leading numerics
|
||
# robustly (length, height, default_u, u_value) and slot the
|
||
# remaining textual fields by position. The layout preprocessor
|
||
# collapses multi-space-separated cells into single newlines, so
|
||
# each row in the dump occupies multiple lines per cell.
|
||
_RIR_SURFACE_NAMES: tuple[str, ...] = (
|
||
"Flat Ceiling 1", "Flat Ceiling 2",
|
||
"Stud Wall 1", "Stud Wall 2",
|
||
"Slope 1", "Slope 2",
|
||
"Gable Wall 1", "Gable Wall 2",
|
||
"Common Wall 1", "Common Wall 2",
|
||
)
|
||
|
||
def _extract_room_in_roof(
|
||
self, main_dim_body: str, age_band_text: str
|
||
) -> Optional[RoomInRoof]:
|
||
"""Parse the §8.1 Rooms in Roof block for the Main bp."""
|
||
section = self._between("8.1 Rooms in Roof:", "9.0 Floors:")
|
||
bp_chunks = self._split_section_by_bp(section) if section.strip() else []
|
||
main_body = bp_chunks[0][1] if bp_chunks else ""
|
||
# Age band from §3: "Main Prop. Room(s) in Roof H 1991-1995"
|
||
age_m = re.search(
|
||
r"Main Prop\. Room\(s\) in Roof\s+([A-M] [^\n]+)", age_band_text
|
||
)
|
||
age_band = age_m.group(1).strip() if age_m else None
|
||
return self._room_in_roof_from_bodies(
|
||
dim_body=main_dim_body,
|
||
rir_body=main_body,
|
||
age_band=age_band,
|
||
)
|
||
|
||
def _room_in_roof_from_bodies(
|
||
self,
|
||
dim_body: str,
|
||
rir_body: str,
|
||
age_band: Optional[str],
|
||
) -> Optional[RoomInRoof]:
|
||
"""Parse a single-BP Room(s) in Roof from the §4 dimension body
|
||
(floor area) and §8.1 construction body (assessment + surfaces).
|
||
Used for both Main and each extension — extensions get their
|
||
own per-BP slice of §4 and §8.1 + the per-extension age band
|
||
from §3's "<N>th Ext. Room(s) in Roof <age>" line.
|
||
"""
|
||
m = re.search(r"Room\(s\) in Roof:\s+(\d+(?:\.\d+)?)", dim_body)
|
||
if m is None:
|
||
return None
|
||
floor_area = float(m.group(1))
|
||
if floor_area <= 0:
|
||
return None
|
||
if not rir_body.strip() or "Room in roof type" not in rir_body:
|
||
# §4 lodged an RR area but §8.1 has no construction details
|
||
# for this BP — surface as a partial RR so the cascade can
|
||
# still attribute the floor area to TFA. Empty surfaces
|
||
# tuple is the sentinel the mapper consumes.
|
||
return RoomInRoof(
|
||
floor_area_m2=floor_area,
|
||
construction_age_band=age_band,
|
||
assessment="",
|
||
surfaces=[],
|
||
)
|
||
lines = [l.strip() for l in rir_body.splitlines() if l.strip()]
|
||
assessment_idx = next(
|
||
(i for i, l in enumerate(lines) if l == "Assessment"), None
|
||
)
|
||
assessment = (
|
||
lines[assessment_idx + 1]
|
||
if assessment_idx is not None and assessment_idx + 1 < len(lines)
|
||
else ""
|
||
)
|
||
surfaces: List[RoomInRoofSurface] = []
|
||
for name in self._RIR_SURFACE_NAMES:
|
||
try:
|
||
idx = lines.index(name)
|
||
except ValueError:
|
||
continue
|
||
surfaces.append(self._parse_rir_surface_row(name, lines, idx))
|
||
return RoomInRoof(
|
||
floor_area_m2=floor_area,
|
||
construction_age_band=age_band,
|
||
assessment=assessment,
|
||
surfaces=surfaces,
|
||
)
|
||
|
||
_RIR_NUMERIC_RE = re.compile(r"^-?\d+(?:\.\d+)?$")
|
||
# Elmhurst insulation cell formats: "100 mm", "125 mm", ... and the
|
||
# bucket-cap "400+ mm" (Table 17 max tabulated row). Optional trailing
|
||
# "+" allows the bucket-cap to parse through to the cascade with the
|
||
# same numeric value.
|
||
_RIR_INSULATION_THICKNESS_RE = re.compile(r"^\d+\+?\s*mm$")
|
||
|
||
def _parse_rir_surface_row(
|
||
self, name: str, lines: List[str], idx: int
|
||
) -> RoomInRoofSurface:
|
||
"""One RR surface row spans the name line followed by ~6-9 tokens
|
||
depending on which optional cells the surveyor filled. The token
|
||
order is stable: length, height, [insulation], [ins_type],
|
||
[gable_type], default_u, u_known, u_value. Numeric cells (length,
|
||
height, default_u, u_value) are the anchor; everything else is
|
||
slotted into the appropriate textual field."""
|
||
# Walk forward until either we exhaust the cell budget or hit
|
||
# the next RIR row's name marker — the layout dump puts each
|
||
# numeric / textual cell on its own line and we can't tell
|
||
# the LAST cell of THIS row from the FIRST cell of the next
|
||
# without that signal.
|
||
tokens: List[str] = []
|
||
scan_end = min(idx + 10, len(lines))
|
||
for j in range(idx + 1, scan_end):
|
||
if self._is_next_rir_row(lines[j]):
|
||
break
|
||
tokens.append(lines[j])
|
||
# First two numerics = length, height
|
||
length = float(tokens[0]) if tokens and self._RIR_NUMERIC_RE.match(tokens[0]) else 0.0
|
||
height = float(tokens[1]) if len(tokens) > 1 and self._RIR_NUMERIC_RE.match(tokens[1]) else 0.0
|
||
|
||
# Last numeric is u_value; preceding "Yes"/"No" is u_value_known;
|
||
# the numeric before that is default_u.
|
||
# Walk from the end backwards looking for the u_value, then known
|
||
# flag, then default_u.
|
||
u_value = 0.0
|
||
u_value_known = False
|
||
default_u: Optional[float] = None
|
||
# The known/default_u tail is fairly stable; collect the trailing
|
||
# tokens and slot by position. The "known" token is "No" or "Yes".
|
||
rev = list(reversed(tokens[2:]))
|
||
# rev[0] = u_value, rev[1] = u_value_known, rev[2] = default_u
|
||
if len(rev) >= 1 and self._RIR_NUMERIC_RE.match(rev[0]):
|
||
u_value = float(rev[0])
|
||
if len(rev) >= 2 and rev[1] in ("Yes", "No"):
|
||
u_value_known = rev[1] == "Yes"
|
||
if len(rev) >= 3 and self._RIR_NUMERIC_RE.match(rev[2]):
|
||
default_u = float(rev[2])
|
||
|
||
# Middle textual cells: insulation, insulation_type, gable_type.
|
||
# Drop the leading length/height (already consumed) and the
|
||
# trailing 3 tokens (default_u, known, u_value).
|
||
middle = tokens[2:-3] if len(tokens) >= 5 else []
|
||
insulation = ""
|
||
insulation_type: Optional[str] = None
|
||
gable_type: Optional[str] = None
|
||
for t in middle:
|
||
if self._RIR_INSULATION_THICKNESS_RE.match(t) or t in ("As Built", "None", "Unknown"):
|
||
# "Unknown" is the third spec-valid thickness token
|
||
# (RdSAP 10 §3.10.1 PDF p.24: "default U-values apply
|
||
# when the roof room insulation is 'as built' or
|
||
# 'unknown'"). Mapper routes "Unknown" to
|
||
# insulation_thickness_mm=None so the cascade falls
|
||
# back to Table 18 col 4 default.
|
||
if not insulation:
|
||
insulation = t
|
||
elif t in ("Mineral or EPS", "PUR", "PIR", "PUR or PIR"):
|
||
# Summary §8.1 lodges the rigid-foam column as the
|
||
# disjunction "PUR or PIR" when the assessor doesn't
|
||
# distinguish between the two; the mapper canonicalises
|
||
# all three forms to SAP10 "rigid_foam" (cascade Table
|
||
# 17 col (b)).
|
||
insulation_type = t
|
||
elif t in (
|
||
"Party", "Sheltered", "Exposed",
|
||
"Connected", "Connected to heated space",
|
||
):
|
||
gable_type = t
|
||
return RoomInRoofSurface(
|
||
name=name,
|
||
length_m=length,
|
||
height_m=height,
|
||
insulation=insulation,
|
||
insulation_type=insulation_type,
|
||
gable_type=gable_type,
|
||
default_u_value=default_u,
|
||
u_value_known=u_value_known,
|
||
u_value=u_value,
|
||
)
|
||
|
||
def _is_next_rir_row(self, line: str) -> bool:
|
||
return line in self._RIR_SURFACE_NAMES
|
||
|
||
def _extract_extensions(self) -> List[ExtensionPart]:
|
||
"""Collect non-Main building parts. Cross-references the §4, §7,
|
||
§8, §9 per-bp subsections by extension name. "As Main: Yes"
|
||
within a section body inherits the main bp's data for that
|
||
section; otherwise the section body is parsed in isolation."""
|
||
# Gather per-section chunks once.
|
||
dim_section = self._between("4.0 Dimensions:", "5.0 Conservatory:")
|
||
wall_section = self._between("7.0 Walls:", "8.0 Roofs:")
|
||
roof_section = self._between("8.0 Roofs:", "8.1 Rooms in Roof:")
|
||
rir_section = self._between("8.1 Rooms in Roof:", "9.0 Floors:")
|
||
floor_section = self._between("9.0 Floors:", "10.0 Doors:")
|
||
dim_type = self._str_val("Dimension type")
|
||
|
||
dim_chunks = dict(self._split_section_by_bp(dim_section))
|
||
wall_chunks = dict(self._split_section_by_bp(wall_section))
|
||
roof_chunks = dict(self._split_section_by_bp(roof_section))
|
||
rir_chunks = dict(self._split_section_by_bp(rir_section)) if rir_section.strip() else {}
|
||
floor_chunks = dict(self._split_section_by_bp(floor_section))
|
||
|
||
# Per-extension RR age bands from §3: "1st Ext. Room(s) in Roof I 1996-2002".
|
||
ext_rir_age_re = re.compile(
|
||
r"(\d+(?:st|nd|rd|th))\s+Ext\.\s+Room\(s\) in Roof\s+([A-M] [^\n]+)",
|
||
re.MULTILINE,
|
||
)
|
||
ext_rir_age_bands: dict[str, str] = {
|
||
f"{m.group(1)} Extension": m.group(2).strip()
|
||
for m in ext_rir_age_re.finditer(self._text)
|
||
}
|
||
|
||
main_walls = self._extract_walls()
|
||
main_roof = self._extract_roof()
|
||
main_floor = self._extract_floor()
|
||
|
||
# Per-bp age-band lookup. Section 3 contains lines like
|
||
# "1st Extension B 1900-1929" — the band sits after the name.
|
||
age_band_re = re.compile(
|
||
r"^(\d+(?:st|nd|rd|th) Extension)\s+([A-M] [^\n]+)$",
|
||
re.MULTILINE,
|
||
)
|
||
age_bands = {m.group(1): m.group(2).strip() for m in age_band_re.finditer(self._text)}
|
||
|
||
# Collect names in document order from the dimensions section
|
||
# (excluding Main Property).
|
||
names = [
|
||
name for name, _ in self._split_section_by_bp(dim_section)
|
||
if name != "Main Property"
|
||
]
|
||
|
||
extensions: List[ExtensionPart] = []
|
||
for name in names:
|
||
dim_body = dim_chunks.get(name, "")
|
||
wall_body = wall_chunks.get(name, "")
|
||
roof_body = roof_chunks.get(name, "")
|
||
floor_body = floor_chunks.get(name, "")
|
||
|
||
wall_lines = [l.strip() for l in wall_body.splitlines() if l.strip()]
|
||
roof_lines = [l.strip() for l in roof_body.splitlines() if l.strip()]
|
||
floor_lines = [l.strip() for l in floor_body.splitlines() if l.strip()]
|
||
|
||
if self._local_bool(wall_lines, "As Main Wall"):
|
||
# Alternative walls live in the extension's own chunk
|
||
# even when the main wall fields are inherited; merge
|
||
# them into the inherited WallDetails so the bp carries
|
||
# them through to its SapBuildingPart.
|
||
#
|
||
# "As Main Wall: Yes" inherits the EXTERNAL wall
|
||
# construction only — the PARTY WALL TYPE is lodged
|
||
# separately in the extension's §7 block and may differ
|
||
# (cert 001431: Main "CU Cavity masonry unfilled" U=0.5,
|
||
# 1st Extension "U Unable to determine" → RdSAP default
|
||
# U=0.25). Read the extension's own party wall type when
|
||
# present; fall back to the main's only when absent.
|
||
ext_party_wall_type = (
|
||
self._local_str(wall_lines, "Party Wall Type")
|
||
or main_walls.party_wall_type
|
||
)
|
||
walls = WallDetails(
|
||
wall_type=main_walls.wall_type,
|
||
insulation=main_walls.insulation,
|
||
thickness_unknown=main_walls.thickness_unknown,
|
||
u_value_known=main_walls.u_value_known,
|
||
party_wall_type=ext_party_wall_type,
|
||
thickness_mm=main_walls.thickness_mm,
|
||
insulation_thickness_mm=main_walls.insulation_thickness_mm,
|
||
alternative_walls=self._alternative_walls_from_lines(wall_lines),
|
||
)
|
||
else:
|
||
walls = self._wall_details_from_lines(wall_lines)
|
||
roof = main_roof if self._local_bool(roof_lines, "As Main") else self._roof_details_from_lines(roof_lines)
|
||
floor = main_floor if self._local_bool(floor_lines, "As Main") else self._floor_details_from_lines(floor_lines)
|
||
|
||
rir = self._room_in_roof_from_bodies(
|
||
dim_body=dim_body,
|
||
rir_body=rir_chunks.get(name, ""),
|
||
age_band=ext_rir_age_bands.get(name),
|
||
)
|
||
extensions.append(
|
||
ExtensionPart(
|
||
name=name,
|
||
construction_age_band=age_bands.get(name, ""),
|
||
dimensions=BuildingPartDimensions(
|
||
dimension_type=dim_type,
|
||
floors=self._floors_from_dimensions_body(dim_body),
|
||
),
|
||
walls=walls,
|
||
roof=roof,
|
||
floor=floor,
|
||
room_in_roof=rir,
|
||
)
|
||
)
|
||
return extensions
|
||
|
||
def _extract_windows(self) -> List[Window]:
|
||
# Textract-style pages keep "Permanent\s+Shutters" adjacent in
|
||
# reading order and the windows table flows as one column-block
|
||
# the existing token-walker can step through. PDF-derived pages
|
||
# (Summary PDFs preprocessed from `pdftotext -layout`) break the
|
||
# header across lines, so this regex misses entirely and the
|
||
# `_extract_windows_from_layout` fallback below picks them up
|
||
# by anchoring on the W/H/Area data line.
|
||
m = re.search(
|
||
r"Permanent\s+Shutters\n(.*?)Draught Proofing",
|
||
self._text,
|
||
re.DOTALL,
|
||
)
|
||
if not m:
|
||
return self._extract_windows_from_layout()
|
||
tokens = [t.strip() for t in m.group(1).splitlines() if t.strip()]
|
||
windows: List[Window] = []
|
||
i = 0
|
||
while i + 12 < len(tokens):
|
||
try:
|
||
width_m = float(tokens[i])
|
||
height_m = float(tokens[i + 1])
|
||
area_m2 = float(tokens[i + 2])
|
||
except (ValueError, IndexError):
|
||
i += 1
|
||
continue
|
||
i += 3
|
||
# Collect glazing type tokens until frame_factor (0 < v ≤ 1.0)
|
||
glazing_parts: List[str] = []
|
||
while i < len(tokens):
|
||
try:
|
||
v = float(tokens[i])
|
||
if 0.0 < v <= 1.0:
|
||
break
|
||
glazing_parts.append(tokens[i])
|
||
except ValueError:
|
||
glazing_parts.append(tokens[i])
|
||
i += 1
|
||
# If last glazing token is a single word (no spaces, not numeric) it's the frame_type
|
||
frame_type: Optional[str] = None
|
||
if glazing_parts and " " not in glazing_parts[-1] and not glazing_parts[-1].replace(".", "").isdigit():
|
||
frame_type = glazing_parts.pop()
|
||
glazing_type = " ".join(glazing_parts).strip()
|
||
if i >= len(tokens):
|
||
break
|
||
frame_factor = float(tokens[i]); i += 1
|
||
# Consume glazing_gap if present ("mm" token, possibly multi-token e.g. "16 mm or more")
|
||
glazing_gap: Optional[str] = None
|
||
if i < len(tokens) and "mm" in tokens[i]:
|
||
gap_parts = [tokens[i]]; i += 1
|
||
while i < len(tokens) and tokens[i].lower() in {"or", "more"}:
|
||
gap_parts.append(tokens[i]); i += 1
|
||
glazing_gap = " ".join(gap_parts)
|
||
building_part = tokens[i]; i += 1
|
||
location = tokens[i]; i += 1
|
||
orientation = tokens[i]; i += 1
|
||
data_source = tokens[i]; i += 1
|
||
u_value = float(tokens[i]); i += 1
|
||
g_value = float(tokens[i]); i += 1
|
||
draught_proofed = tokens[i].lower() == "yes"; i += 1
|
||
permanent_shutters = tokens[i]; i += 1
|
||
windows.append(
|
||
Window(
|
||
width_m=width_m,
|
||
height_m=height_m,
|
||
area_m2=area_m2,
|
||
glazing_type=glazing_type,
|
||
frame_factor=frame_factor,
|
||
building_part=building_part,
|
||
location=location,
|
||
orientation=orientation,
|
||
data_source=data_source,
|
||
u_value=u_value,
|
||
g_value=g_value,
|
||
draught_proofed=draught_proofed,
|
||
permanent_shutters=permanent_shutters,
|
||
frame_type=frame_type,
|
||
glazing_gap=glazing_gap,
|
||
)
|
||
)
|
||
return windows
|
||
|
||
# Anchors used by the layout-style window parser. The W/H/Area anchor
|
||
# is sometimes followed by a joined glazing-type phrase on the same
|
||
# line (e.g. '1.22 1.76 2.15 Double pre 2002'); the optional 4th
|
||
# capture surfaces that text so the parser can use it instead of a
|
||
# separately-laid-out prefix line.
|
||
_WIDTH_HEIGHT_AREA_RE = re.compile(
|
||
r"^(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)(?:\s+(\S.*?))?$"
|
||
)
|
||
_MANUFACTURER_RE = re.compile(r"^(Manufacturer|Default)\s+(\d+\.\d+)$")
|
||
_ORIENTATION_TOKENS = frozenset({
|
||
"North", "South", "East", "West", "NE", "NW", "SE", "SW",
|
||
})
|
||
_BP_INLINE_TOKENS = frozenset({"Main"}) # "Extension" only appears as suffix
|
||
# A room-in-roof window (rooflight) lodges its §11 "Location" cell as
|
||
# "Roof of Room in Roof", which the layout preprocessor wraps onto two
|
||
# tokens ("Roof of Room" in the prefix block, "in Roof" in the suffix).
|
||
# Detected so the window routes to a roof window (worksheet (27a))
|
||
# and the tokens don't leak into the glazing-type phrase.
|
||
_ROOF_OF_ROOM_LOCATION_TOKENS = frozenset({"Roof of Room", "in Roof"})
|
||
# The Elmhurst Summary PDF lodges each window's glazing-type as a
|
||
# capitalised phrase like "Double between 2002" / "Double with unknown"
|
||
# / "Single" / "Triple" / "Secondary". The first token of that phrase
|
||
# marks the start of a new window's prefix block in the layout dump,
|
||
# which is the only stable signal partitioning one window's suffix
|
||
# from the next window's prefix.
|
||
_GLAZING_TYPE_PREFIX_WORDS = frozenset({
|
||
"Single", "Double", "Triple", "Secondary",
|
||
})
|
||
|
||
def _extract_windows_from_layout(self) -> List[Window]:
|
||
"""Fallback window parser for Summary PDFs preprocessed from
|
||
`pdftotext -layout`. Each window has two stable anchors:
|
||
a "W H Area" line and a "Manufacturer <U_value>" line a few
|
||
lines further down. Everything between holds frame_type,
|
||
frame_factor, and a variable mix of glazing_gap, building_part,
|
||
location, and orientation (depending on which fields the
|
||
surveyor lodged); everything around the window holds glazing-
|
||
type/building-part/orientation prefix/suffix tokens split by
|
||
the layout preprocessor.
|
||
"""
|
||
m = re.search(
|
||
r"11\.0 Windows:(.*?)(Draught Proofing|12\.0 Ventilation)",
|
||
self._text, re.DOTALL,
|
||
)
|
||
if not m:
|
||
return []
|
||
lines = m.group(1).splitlines()
|
||
|
||
# Locate all (data_line, manufacturer_line) pairs in document
|
||
# order. Each pair is one window.
|
||
data_anchors: List[tuple[int, re.Match[str]]] = []
|
||
for i, line in enumerate(lines):
|
||
anchor = self._WIDTH_HEIGHT_AREA_RE.match(line.strip())
|
||
if anchor is not None:
|
||
data_anchors.append((i, anchor))
|
||
|
||
windows: List[Window] = []
|
||
for k, (data_idx, anchor) in enumerate(data_anchors):
|
||
manuf_idx = self._find_manufacturer_after(lines, data_idx)
|
||
if manuf_idx is None:
|
||
continue
|
||
prev_manuf_idx = (
|
||
self._find_manufacturer_after(lines, data_anchors[k - 1][0])
|
||
if k > 0 else None
|
||
)
|
||
next_data_idx = (
|
||
data_anchors[k + 1][0] if k + 1 < len(data_anchors) else len(lines)
|
||
)
|
||
# Partition the cross-window gap between this window's suffix
|
||
# and the next window's prefix on the first glazing-type-start
|
||
# token (Single/Double/Triple/Secondary). The same boundary
|
||
# is used symmetrically — current window's `after_end` = next
|
||
# window's `before_start` — so prefix tokens of W_{k+1} never
|
||
# get attributed as suffix of W_k (which was the bug producing
|
||
# orientation='East-South' for windows where 'South' actually
|
||
# belonged to the next row).
|
||
before_start = (
|
||
self._partition_after_manuf(lines, prev_manuf_idx, data_idx)
|
||
if prev_manuf_idx is not None else 0
|
||
)
|
||
after_end = self._partition_after_manuf(lines, manuf_idx, next_data_idx)
|
||
try:
|
||
window = self._parse_window_from_anchors(
|
||
lines=lines,
|
||
data_idx=data_idx,
|
||
manuf_idx=manuf_idx,
|
||
anchor=anchor,
|
||
before_start=before_start,
|
||
after_end=after_end,
|
||
)
|
||
except (ValueError, IndexError):
|
||
continue
|
||
if window is not None:
|
||
windows.append(window)
|
||
return windows
|
||
|
||
def _find_manufacturer_after(self, lines: List[str], data_idx: int) -> Optional[int]:
|
||
for j in range(data_idx + 1, min(data_idx + 12, len(lines))):
|
||
if self._MANUFACTURER_RE.match(lines[j].strip()):
|
||
return j
|
||
return None
|
||
|
||
_FRAME_TYPE_AND_FACTOR_RE = re.compile(r"^(\S+(?:\s+\S+)*?)\s+(\d\.\d+)$")
|
||
_FRAME_FACTOR_ONLY_RE = re.compile(r"^(\d\.\d+)$")
|
||
|
||
def _parse_frame_type_and_factor(
|
||
self, lines: List[str], data_idx: int
|
||
) -> tuple[str, Optional[float], int]:
|
||
"""Return `(frame_type, frame_factor, middle_start_idx)` from
|
||
the lines immediately after the data anchor. Layouts vary:
|
||
(a) "PVC" on data+1, "0.70" on data+2 — the original 000474
|
||
shape;
|
||
(b) "Wood 0.70" on data+1 — joined-cell variant from 000487
|
||
and 000516 first-row windows;
|
||
(c) "0.70" alone on data+1 (no frame_type word at all) —
|
||
seen in 000487's subsequent windows where the
|
||
preprocessor dropped the frame-type column. frame_type
|
||
is recovered downstream from glazing-type defaults or
|
||
left empty."""
|
||
first = lines[data_idx + 1].strip()
|
||
combined = self._FRAME_TYPE_AND_FACTOR_RE.match(first)
|
||
if combined is not None:
|
||
return combined.group(1), float(combined.group(2)), data_idx + 2
|
||
factor_only = self._FRAME_FACTOR_ONLY_RE.match(first)
|
||
if factor_only is not None:
|
||
return "", float(factor_only.group(1)), data_idx + 2
|
||
if data_idx + 2 >= len(lines):
|
||
return first, None, data_idx + 2
|
||
frame_type = first
|
||
try:
|
||
frame_factor = float(lines[data_idx + 2].strip())
|
||
except ValueError:
|
||
return frame_type, None, data_idx + 3
|
||
return frame_type, frame_factor, data_idx + 3
|
||
|
||
def _partition_after_manuf(
|
||
self, lines: List[str], manuf_idx: int, next_data_idx: int
|
||
) -> int:
|
||
"""Return the exclusive upper bound for this window's suffix
|
||
block (and the inclusive lower bound for the next window's prefix
|
||
block). After the manufacturer line come 3 fixed tokens (g_value,
|
||
draught, shutters); the variable suffix lines start at manuf+4
|
||
and run until either (a) the next window's glazing-type-start
|
||
token (e.g. 'Double between 2002', 'Single', 'Triple ...') or
|
||
(b) the second orientation token in the gap, whichever comes
|
||
first. Branch (b) covers layouts where the glazing-type is
|
||
joined to the data line (no separate prefix line exists), so
|
||
the only signal of window-transition is the orientation tokens
|
||
rotating: orient_suffix(k) → orient_prefix(k+1). Falls through
|
||
to `next_data_idx` when neither marker is present."""
|
||
scan_start = manuf_idx + 4
|
||
seen_orient = False
|
||
for j in range(scan_start, next_data_idx):
|
||
stripped = lines[j].strip()
|
||
first_word = stripped.split(" ", 1)[0]
|
||
if first_word in self._GLAZING_TYPE_PREFIX_WORDS:
|
||
return j
|
||
if stripped in self._ORIENTATION_TOKENS:
|
||
if seen_orient:
|
||
return j
|
||
seen_orient = True
|
||
return next_data_idx
|
||
|
||
def _parse_window_from_anchors(
|
||
self,
|
||
*,
|
||
lines: List[str],
|
||
data_idx: int,
|
||
manuf_idx: int,
|
||
anchor: re.Match[str],
|
||
before_start: int,
|
||
after_end: int,
|
||
) -> Optional[Window]:
|
||
width = float(anchor.group(1))
|
||
height = float(anchor.group(2))
|
||
area = float(anchor.group(3))
|
||
# Layout-style cell joining sometimes leaves the glazing-type
|
||
# phrase trailing the W H Area triplet on the same line (e.g.
|
||
# "1.22 1.76 2.15 Double pre 2002"); when present we pass it
|
||
# through as `inline_glazing_type` and the composer skips the
|
||
# would-be glazing-prefix scan.
|
||
inline_glazing_type = anchor.group(4) if anchor.lastindex and anchor.lastindex >= 4 else None
|
||
|
||
# frame_type and frame_factor immediately follow the data line.
|
||
# Layout-style cell joining sometimes collapses them onto a
|
||
# single "Wood 0.70" line; treat both shapes uniformly so the
|
||
# downstream `middle` slice still starts at the first variable
|
||
# field (glazing_gap / bp / location / orient).
|
||
if data_idx + 1 >= len(lines):
|
||
return None
|
||
frame_type, frame_factor, middle_start = self._parse_frame_type_and_factor(
|
||
lines, data_idx
|
||
)
|
||
if frame_factor is None or not 0.0 < frame_factor <= 1.0:
|
||
return None
|
||
|
||
# Variable-order tokens between frame_factor and Manufacturer.
|
||
middle = [lines[j].strip() for j in range(middle_start, manuf_idx)]
|
||
glazing_gap = next((t for t in middle if "mm" in t.lower()), None)
|
||
# Wall-location lodging. Most rows put "External wall" in
|
||
# `middle`; alt-wall rows (cert 2636 window-4 / cert 9418 alt-
|
||
# wall window) put "Alternative wall" in the PRE-data slice
|
||
# (between the previous window's end and W×H×A). Search both
|
||
# slices so either layout resolves to the correct location.
|
||
pre_data = [lines[j].strip() for j in range(before_start, data_idx)]
|
||
location = (
|
||
next((t for t in middle if "wall" in t.lower()), None)
|
||
or next((t for t in pre_data if "wall" in t.lower()), None)
|
||
or "External wall"
|
||
)
|
||
bp_inline = next((t for t in middle if t in self._BP_INLINE_TOKENS), None)
|
||
orient_inline = next(
|
||
(t for t in middle if t in self._ORIENTATION_TOKENS), None
|
||
)
|
||
|
||
# Manufacturer line carries data_source + u_value.
|
||
manuf_match = self._MANUFACTURER_RE.match(lines[manuf_idx].strip())
|
||
if manuf_match is None:
|
||
return None
|
||
data_source = manuf_match.group(1)
|
||
u_value = float(manuf_match.group(2))
|
||
|
||
# Post-manufacturer: g_value, draught, shutters.
|
||
if manuf_idx + 3 >= len(lines):
|
||
return None
|
||
try:
|
||
g_value = float(lines[manuf_idx + 1].strip())
|
||
except ValueError:
|
||
return None
|
||
draught_proofed = lines[manuf_idx + 2].strip().lower() == "yes"
|
||
permanent_shutters = lines[manuf_idx + 3].strip()
|
||
|
||
# Prefix / suffix tokens (variable count) carry the
|
||
# glazing-type, building-part, and orientation strings split by
|
||
# the layout preprocessor.
|
||
before = [lines[j].strip() for j in range(before_start, data_idx) if lines[j].strip()]
|
||
after = [lines[j].strip() for j in range(manuf_idx + 4, after_end) if lines[j].strip()]
|
||
|
||
# Room-in-roof windows lodge their location as "Roof of Room in
|
||
# Roof" (wrapped across the prefix/suffix blocks). Detect it, pull
|
||
# those tokens out so they don't contaminate the glazing-type
|
||
# phrase, and override the wall-keyed `location` with the roof-of-
|
||
# room marker the roof-window classifier keys on.
|
||
if any(
|
||
t in self._ROOF_OF_ROOM_LOCATION_TOKENS for t in (*before, *after)
|
||
):
|
||
location = "Roof of Room"
|
||
before = [t for t in before if t not in self._ROOF_OF_ROOM_LOCATION_TOKENS]
|
||
after = [t for t in after if t not in self._ROOF_OF_ROOM_LOCATION_TOKENS]
|
||
|
||
glazing_type, building_part, orientation = self._compose_window_descriptors(
|
||
before=before,
|
||
after=after,
|
||
bp_inline=bp_inline,
|
||
orient_inline=orient_inline,
|
||
inline_glazing_type=inline_glazing_type,
|
||
)
|
||
|
||
return Window(
|
||
width_m=width,
|
||
height_m=height,
|
||
area_m2=area,
|
||
glazing_type=glazing_type,
|
||
frame_factor=frame_factor,
|
||
building_part=building_part,
|
||
location=location,
|
||
orientation=orientation,
|
||
data_source=data_source,
|
||
u_value=u_value,
|
||
g_value=g_value,
|
||
draught_proofed=draught_proofed,
|
||
permanent_shutters=permanent_shutters,
|
||
frame_type=frame_type,
|
||
glazing_gap=glazing_gap,
|
||
)
|
||
|
||
def _compose_window_descriptors(
|
||
self,
|
||
*,
|
||
before: List[str],
|
||
after: List[str],
|
||
bp_inline: Optional[str],
|
||
orient_inline: Optional[str],
|
||
inline_glazing_type: Optional[str] = None,
|
||
) -> tuple[str, str, str]:
|
||
"""Re-join the glazing-type / building-part / orientation tokens
|
||
split by the layout preprocessor. Each is at most 2 fragments
|
||
(one before the data line, one after); inline tokens in the
|
||
between-segment win over prefix/suffix fragments."""
|
||
# before holds (in document order, possibly): glazing_prefix,
|
||
# bp_prefix, orient_prefix — bp/orient may be missing.
|
||
# after holds: glazing_suffix, bp_suffix, orient_suffix — same.
|
||
prefix = list(before[-3:]) # last 3 lines preceding data
|
||
suffix = list(after[:3])
|
||
|
||
def pop_if_orientation(tokens: List[str]) -> Optional[str]:
|
||
for t in tokens:
|
||
if t in self._ORIENTATION_TOKENS:
|
||
tokens.remove(t)
|
||
return t
|
||
return None
|
||
|
||
def pop_if_bp_fragment(tokens: List[str]) -> Optional[str]:
|
||
# Prefix fragments like "1st" / "2nd" — match digit-prefixed
|
||
# ordinals; suffix fragments are always "Extension".
|
||
for t in tokens:
|
||
if re.match(r"^\d+(?:st|nd|rd|th)$", t) or t == "Extension":
|
||
tokens.remove(t)
|
||
return t
|
||
return None
|
||
|
||
orient_prefix_token = pop_if_orientation(prefix)
|
||
orient_suffix_token = pop_if_orientation(suffix)
|
||
bp_prefix_frag = pop_if_bp_fragment(prefix)
|
||
bp_suffix_frag = pop_if_bp_fragment(suffix)
|
||
|
||
# Glazing type: an inline glazing-type captured from the data
|
||
# line (layout-joined variant) wins; otherwise join the remaining
|
||
# prefix + suffix fragments.
|
||
if inline_glazing_type is not None:
|
||
glazing_type = inline_glazing_type
|
||
else:
|
||
# The glazing-type phrase always starts with a glazing-start
|
||
# word (Single/Double/Triple/Secondary). The FIRST window in
|
||
# a building part has `before_start = 0`, so its prefix block
|
||
# reaches back into the wrapped windows-table header; the
|
||
# third header line's tail tokenises to "value value Proofed
|
||
# Shutters" (the "U value / g value / Draught Proofed /
|
||
# Permanent Shutters" column titles) and is neither an
|
||
# orientation nor a bp fragment, so it survives the pops.
|
||
# Drop any prefix fragments preceding the glazing-start word
|
||
# so they don't leak into the glazing type.
|
||
glazing_start = next(
|
||
(
|
||
idx
|
||
for idx, frag in enumerate(prefix)
|
||
if frag.split(" ", 1)[0] in self._GLAZING_TYPE_PREFIX_WORDS
|
||
),
|
||
None,
|
||
)
|
||
glazing_prefix = (
|
||
prefix[glazing_start:] if glazing_start is not None else prefix
|
||
)
|
||
glazing_type = " ".join([*glazing_prefix, *suffix]).strip()
|
||
|
||
# Building part: inline token wins; otherwise join prefix + suffix.
|
||
if bp_inline is not None:
|
||
building_part = bp_inline
|
||
else:
|
||
building_part = " ".join(
|
||
t for t in (bp_prefix_frag, bp_suffix_frag) if t
|
||
).strip()
|
||
|
||
# Orientation: inline token wins for the primary direction;
|
||
# combine with the opposite-direction fragment when present.
|
||
primary = orient_inline or orient_prefix_token or ""
|
||
secondary_candidates = [
|
||
t for t in (orient_prefix_token, orient_suffix_token) if t and t != primary
|
||
]
|
||
if primary and secondary_candidates:
|
||
orientation = f"{primary}-{secondary_candidates[0]}"
|
||
else:
|
||
orientation = primary
|
||
|
||
return glazing_type, building_part, orientation
|
||
|
||
def _extract_ventilation(self) -> VentilationAndCooling:
|
||
# SAP 10.2 §2 (17a) "Air permeability value, AP4". Scoped to
|
||
# §12.2..§13.0 so the per-window U-values + door U-values can't
|
||
# shadow the float read. Absent when `pressure_test_method !=
|
||
# "Pulse"` (the modal cohort lodgement).
|
||
pressure_lines = self._section_lines(
|
||
"12.2 Air Pressure Test", "13.0 Lighting"
|
||
)
|
||
ap4_raw = self._local_val(pressure_lines, "Pressure Test Result (AP4)")
|
||
air_permeability_ap4_m3_h_m2: Optional[float] = None
|
||
if ap4_raw:
|
||
try:
|
||
air_permeability_ap4_m3_h_m2 = float(ap4_raw.split()[0])
|
||
except (ValueError, IndexError):
|
||
air_permeability_ap4_m3_h_m2 = None
|
||
# Summary §12.1 "Mechanical Ventilation Type" — scoped to §12.1
|
||
# body so the global "Type" labels in §14 / §15 can't shadow it.
|
||
mv_lines = self._section_lines(
|
||
"12.1 Mechanical Ventilation", "12.2 Air Pressure Test"
|
||
)
|
||
mv_type_raw = self._local_val(mv_lines, "Mechanical Ventilation Type")
|
||
mechanical_ventilation_type = (
|
||
" ".join(mv_type_raw.split()) if mv_type_raw else None
|
||
)
|
||
# SAP 10.2 §2.6.4 + Table 4f line (230a) — MEV PCDB lookup
|
||
# inputs. Cert lodges PCDF index, wet-rooms count, ducting
|
||
# type, and whether the installation was approved.
|
||
mev_pcdf_raw = self._local_val(mv_lines, "MV PCDF Reference Number")
|
||
mev_pcdf_reference = (
|
||
int(mev_pcdf_raw) if mev_pcdf_raw and mev_pcdf_raw.isdigit() else None
|
||
)
|
||
wet_rooms_raw = self._local_val(mv_lines, "Wet Rooms")
|
||
wet_rooms_count = (
|
||
int(wet_rooms_raw) if wet_rooms_raw and wet_rooms_raw.isdigit() else None
|
||
)
|
||
duct_type_raw = self._local_val(mv_lines, "Duct Type")
|
||
duct_type = duct_type_raw if duct_type_raw else None
|
||
approved_raw = self._local_val(mv_lines, "Approved Installation")
|
||
approved_installation = (
|
||
None if approved_raw is None
|
||
else approved_raw.strip().lower() == "yes"
|
||
)
|
||
return VentilationAndCooling(
|
||
open_chimneys_count=self._int_val("No. of open chimneys"),
|
||
open_flues_count=self._int_val("No. of open flues"),
|
||
open_chimneys_closed_fire_count=self._int_val(
|
||
"No. of open chimneys/open flues attached to closed fire"
|
||
),
|
||
solid_fuel_boiler_flues_count=self._int_val(
|
||
"No. of flues attached to solid fuel boiler"
|
||
),
|
||
other_heater_flues_count=self._int_val(
|
||
"No. of open flues attached to other heater"
|
||
),
|
||
blocked_chimneys_count=self._int_val("No. of blocked chimneys"),
|
||
extract_fans_count=self._int_val("No. of intermittent extract fans"),
|
||
passive_vents_count=self._int_val("No. of passive vents"),
|
||
flueless_gas_fires_count=self._int_val("No. of flueless gas fires"),
|
||
fixed_space_cooling=self._bool_val("Fixed Space Cooling"),
|
||
draught_lobby=self._str_val("Draught Lobby"),
|
||
mechanical_ventilation=self._bool_val("Mechanical Ventilation"),
|
||
pressure_test_method=self._str_val("Test Method"),
|
||
air_permeability_ap4_m3_h_m2=air_permeability_ap4_m3_h_m2,
|
||
mechanical_ventilation_type=mechanical_ventilation_type,
|
||
mechanical_ventilation_pcdf_reference=mev_pcdf_reference,
|
||
wet_rooms_count=wet_rooms_count,
|
||
duct_type=duct_type,
|
||
approved_installation=approved_installation,
|
||
)
|
||
|
||
def _extract_lighting(self) -> Lighting:
|
||
led_cfl_count_known = self._bool_val("Number of LED and CFL Known")
|
||
return Lighting(
|
||
total_bulbs=self._int_val("Total number of bulbs"),
|
||
led_cfl_count_known=led_cfl_count_known,
|
||
led_count=self._int_val("Number of LED lights"),
|
||
cfl_count=self._int_val("Number of CFL lights"),
|
||
incandescent_count=self._int_val("Total number of incandescents"),
|
||
low_energy_count=(
|
||
0 if led_cfl_count_known
|
||
else self._int_val("Total number of Low Energy")
|
||
),
|
||
)
|
||
|
||
def _extract_main_heating(self) -> MainHeating:
|
||
# Community-heated dwellings (e.g. SAP code 301 "Community heating
|
||
# scheme" per SAP10.2 Table 4a category 6) and "no system" certs
|
||
# (SAP code 699 "Electric heaters assumed where no system lodged")
|
||
# lodge §14.0 Main Heating1 directly followed by §14.1 Community
|
||
# Heating/Heat Network rather than §14.1 Main Heating2 — there is
|
||
# no second main system on a community-heated dwelling. Close the
|
||
# §14.0 block at whichever §14.1 form appears first so every
|
||
# Summary shape surfaces the SAP code.
|
||
lines = self._section_lines_first_end(
|
||
"14.0 Main Heating1",
|
||
("14.1 Main Heating2", "14.1 Community Heating"),
|
||
)
|
||
pct_raw = self._local_val(lines, "Percentage of Heat")
|
||
pct = int(pct_raw.split()[0]) if pct_raw else 0
|
||
# §14.0 "Main Heating SAP Code" identifies Main 1 by SAP 10.2
|
||
# Table 4a code (e.g. 224 = "Air source heat pump, 2013 or
|
||
# later"). PCDB-boiler certs leave this empty / lodge "0" — the
|
||
# PCDB index in `PCDF boiler Reference` is the identifier in
|
||
# that case. Treat 0 (or absent) as None so the mapper can
|
||
# distinguish "no SAP code lodged" from a real Table 4a code.
|
||
sap_code_raw = self._local_val(lines, "Main Heating SAP Code")
|
||
main_heating_sap_code: Optional[int] = None
|
||
if sap_code_raw is not None:
|
||
head = sap_code_raw.split()[0] if sap_code_raw.split() else ""
|
||
if head.isdigit():
|
||
v = int(head)
|
||
main_heating_sap_code = v if v > 0 else None
|
||
# The "Secondary Heating SapCode" key is lodged inside §14.1 Main
|
||
# Heating2 — Elmhurst uses the Main-2 block to also carry the
|
||
# cert's secondary heating system (when one exists). Look for it
|
||
# in that section; absence (or "0") means no secondary lodged.
|
||
secondary_lines = self._section_lines(
|
||
"14.1 Main Heating2", "14.1 Community Heating"
|
||
)
|
||
secondary_raw = self._local_val(secondary_lines, "Secondary Heating SapCode")
|
||
secondary_code = (
|
||
int(secondary_raw)
|
||
if secondary_raw is not None and secondary_raw.isdigit()
|
||
and int(secondary_raw) > 0
|
||
else None
|
||
)
|
||
main_heating_2 = self._extract_main_heating_2()
|
||
community_heating = self._extract_community_heating()
|
||
return MainHeating(
|
||
heat_emitter=self._local_str(lines, "Heat Emitter"),
|
||
fuel_type=self._local_str(lines, "Fuel Type"),
|
||
flue_type=self._local_str(lines, "Flue Type"),
|
||
fan_assisted_flue=self._local_bool(lines, "Fan Assisted Flue"),
|
||
design_flow_temperature=self._local_str(lines, "Design flow temperature"),
|
||
heating_controls_ees=self._local_str(lines, "Main Heating Controls EES"),
|
||
heating_controls_sap=self._local_str(lines, "Main Heating Controls Sap"),
|
||
percentage_of_heat=pct,
|
||
pcdf_boiler_reference=self._local_val(lines, "PCDF boiler Reference"),
|
||
heat_pump_age=self._local_val(lines, "Heat pump age"),
|
||
main_heating_sap_code=main_heating_sap_code,
|
||
main_heating_ees=self._local_str(lines, "Main Heating EES Code"),
|
||
secondary_heating_sap_code=secondary_code,
|
||
main_heating_2=main_heating_2,
|
||
community_heating=community_heating,
|
||
)
|
||
|
||
def _extract_main_heating_2(self) -> Optional[MainHeating2]:
|
||
"""§14.1 Main Heating2 block — returns None when the block is
|
||
either absent or lodges only placeholder zeros (the PCDB-only
|
||
convention for "no Main 2"). Otherwise builds a populated
|
||
`MainHeating2` from the lodged §14.1 fields.
|
||
|
||
Identifier signal: Main 2 is "present" when the §14.1 block
|
||
lodges either a non-zero PCDB boiler reference (e.g. cert 000565
|
||
Main 2 PCDB 15100 Vaillant Ecotec plus 415) OR a non-zero SAP
|
||
code. PCDB-only certs lodge `PCDF boiler Reference = 0` +
|
||
`Main Heating SAP Code = 0` for an absent Main 2 (per the two
|
||
JSON fixtures at `elmhurst_site_notes_{1,2}_text.json`).
|
||
"""
|
||
lines = self._section_lines(
|
||
"14.1 Main Heating2", "14.1 Community Heating",
|
||
)
|
||
pcdf_raw = self._local_val(lines, "PCDF boiler Reference")
|
||
pcdf_first = (
|
||
pcdf_raw.split()[0] if pcdf_raw and pcdf_raw.split() else ""
|
||
)
|
||
has_pcdb_ref = pcdf_first.isdigit() and int(pcdf_first) > 0
|
||
sap_code_raw = self._local_val(lines, "Main Heating SAP Code")
|
||
main_heating_sap_code: Optional[int] = None
|
||
if sap_code_raw is not None:
|
||
head = sap_code_raw.split()[0] if sap_code_raw.split() else ""
|
||
if head.isdigit():
|
||
v = int(head)
|
||
main_heating_sap_code = v if v > 0 else None
|
||
if not has_pcdb_ref and main_heating_sap_code is None:
|
||
return None
|
||
# §14.1's "Percentage of Heat" lodges either "0 %" (with space)
|
||
# or "0%" (no space). Strip the '%' before int() rather than
|
||
# split() so both forms parse.
|
||
pct_raw = self._local_val(lines, "Percentage of Heat")
|
||
pct = (
|
||
int(pct_raw.rstrip("%").strip().split()[0])
|
||
if pct_raw and pct_raw.rstrip("%").strip()
|
||
else 0
|
||
)
|
||
return MainHeating2(
|
||
pcdf_boiler_reference=pcdf_raw,
|
||
fuel_type=self._local_str(lines, "Fuel Type"),
|
||
flue_type=self._local_str(lines, "Flue Type"),
|
||
fan_assisted_flue=self._local_bool(lines, "Fan Assisted Flue"),
|
||
percentage_of_heat=pct,
|
||
main_heating_sap_code=main_heating_sap_code,
|
||
heat_emitter=self._local_str(lines, "Heat Emitter"),
|
||
heating_controls_sap=self._local_str(lines, "Main Heating Controls Sap"),
|
||
)
|
||
|
||
def _extract_community_heating(self) -> Optional[CommunityHeating]:
|
||
"""§14.1 Community Heating/Heat Network block. Lodged in place of
|
||
§14.1 Main Heating2 when the §14.0 Main Heating SAP code names a
|
||
heat-network row (Table 4a 301/302/304). Returns None when no
|
||
§14.1 Community Heating block is present on the cert.
|
||
|
||
The block carries the Community Heat Source (Boilers / CHP /
|
||
Heat pump) + Community Fuel Type (Mains Gas / Electricity /
|
||
Mineral oil or biodiesel / Coal) — together these resolve the
|
||
Table 12 heat-network fuel code that bills the cascade. See
|
||
`_resolve_community_heating_fuel_code` in the mapper.
|
||
"""
|
||
lines = self._section_lines(
|
||
"14.1 Community Heating/Heat Network", "14.2 Meters",
|
||
)
|
||
# Absence of the §14.1 Community Heating block: no marker found
|
||
# → `_section_lines` returns []. Lodgement convention also
|
||
# leaves Community Heat Source empty on individually-heated
|
||
# dwellings; treat both as "no community heating present".
|
||
heat_source = self._local_str(lines, "Community Heat Source")
|
||
if not lines or not heat_source:
|
||
return None
|
||
return CommunityHeating(
|
||
heating_type=self._local_str(lines, "Heating Type"),
|
||
pcdf_boiler_reference=self._local_val(lines, "PCDF Boiler Reference"),
|
||
community_heat_source=heat_source,
|
||
community_fuel_type=self._local_str(lines, "Community Fuel Type"),
|
||
heating_controls_ees=self._local_str(lines, "Heating Controls EES"),
|
||
heating_controls_sap=self._local_str(lines, "Heating Controls SAP"),
|
||
chp_fuel_factor=self._local_val(lines, "CHP Fuel Factor"),
|
||
)
|
||
|
||
def _extract_meters(self) -> Meters:
|
||
return Meters(
|
||
electricity_meter_type=self._str_val("Electricity meter type"),
|
||
main_gas=self._bool_val("Main gas"),
|
||
electricity_smart_meter=self._bool_val("Electricity Smart Meter Present"),
|
||
gas_smart_meter=self._bool_val("Gas Smart Meter Present"),
|
||
)
|
||
|
||
def _extract_water_heating(self) -> WaterHeating:
|
||
# §15.1 lodgings — Summary writes these only when a cylinder
|
||
# is present. The §15.1 block uses labels ("Cylinder Size",
|
||
# "Insulated", "Insulation Thickness") that collide with
|
||
# global occurrences elsewhere ("Insulation Thickness" also
|
||
# appears in §7 Walls / §8 Roofs); scope the lookups via
|
||
# `_local_val` against the §15.1..§15.2 slice to disambiguate.
|
||
cylinder_lines = self._section_lines(
|
||
"15.1 Hot Water Cylinder", "15.2 Community Hot Water",
|
||
)
|
||
cylinder_size_label = self._local_val(
|
||
cylinder_lines, "Cylinder Size",
|
||
)
|
||
cylinder_insulation_label = self._local_val(
|
||
cylinder_lines, "Insulated",
|
||
)
|
||
cylinder_ins_thickness_raw = self._local_val(
|
||
cylinder_lines, "Insulation Thickness",
|
||
)
|
||
cylinder_insulation_thickness_mm: Optional[int] = None
|
||
if cylinder_ins_thickness_raw:
|
||
first = cylinder_ins_thickness_raw.split()[0]
|
||
if first.isdigit():
|
||
cylinder_insulation_thickness_mm = int(first)
|
||
cylinder_thermostat_raw = self._local_val(
|
||
cylinder_lines, "Cylinder Thermostat",
|
||
)
|
||
cylinder_thermostat: Optional[bool] = (
|
||
cylinder_thermostat_raw.strip().lower() == "yes"
|
||
if cylinder_thermostat_raw is not None
|
||
else None
|
||
)
|
||
# Fallback: Elmhurst Summary §16 "Recommendations" block carries
|
||
# existing fittings as `<feature> (Already installed)` lines.
|
||
# When §15.1 doesn't lodge "Cylinder Thermostat" directly, treat
|
||
# the "Cylinder thermostat (Already installed)" recommendation
|
||
# line as confirmation that the thermostat is present (per
|
||
# S0380.140 corpus probe — all 41 variants on property 001431
|
||
# lodge this in §16 but none in §15.1, so the §15.1-only lookup
|
||
# returned None and the cascade defaulted `has_cylinder_thermostat
|
||
# = False`, mis-applying SAP 10.2 Table 2b's ×1.3 "no thermostat"
|
||
# multiplier).
|
||
if cylinder_thermostat is None:
|
||
if "Cylinder thermostat (Already installed)" in self._lines:
|
||
cylinder_thermostat = True
|
||
return WaterHeating(
|
||
water_heating_code=self._str_val("Water Heating Code"),
|
||
water_heating_sap_code=self._int_val("Water Heating SapCode"),
|
||
water_heating_fuel_type=self._str_val("Water Heating Fuel Type"),
|
||
hot_water_cylinder_present=self._bool_val("Hot Water Cylinder Present"),
|
||
cylinder_size_label=cylinder_size_label,
|
||
cylinder_insulation_label=cylinder_insulation_label,
|
||
cylinder_insulation_thickness_mm=cylinder_insulation_thickness_mm,
|
||
cylinder_thermostat=cylinder_thermostat,
|
||
)
|
||
|
||
def _extract_baths_and_showers(self) -> BathsAndShowers:
|
||
n_baths = self._int_val("Total Number of Baths")
|
||
n_connected = self._int_val("Number of Baths Connected")
|
||
# Section-bounded "Connected" lookup. Global `_lines.index` collides
|
||
# with §3 building-parts elevation flags ("Connected" / "Exposed" /
|
||
# "Sheltered"), losing the shower roster on multi-extension certs
|
||
# (cert 000565 lodges 4 extensions and an electric shower; pre-fix
|
||
# the global match landed on a wall row and the digit-check broke).
|
||
# `1x.0 Baths and Showers` and `18.0 Flue Gas Heat Recovery System`
|
||
# are both unique single-occurrence anchors in the Elmhurst Summary
|
||
# PDF schema.
|
||
section = self._section_lines(
|
||
"1x.0 Baths and Showers", "18.0 Flue Gas Heat Recovery System",
|
||
)
|
||
try:
|
||
idx = section.index("Connected")
|
||
except ValueError:
|
||
return BathsAndShowers(
|
||
number_of_baths=n_baths,
|
||
number_of_baths_connected=n_connected,
|
||
showers=[],
|
||
)
|
||
showers: List[Shower] = []
|
||
j = idx + 1
|
||
while j + 2 <= len(section) - 1:
|
||
num_line = section[j]
|
||
if not num_line.isdigit():
|
||
break
|
||
showers.append(
|
||
Shower(
|
||
shower_number=int(num_line),
|
||
outlet_type=section[j + 1],
|
||
connected=section[j + 2],
|
||
)
|
||
)
|
||
j += 3
|
||
return BathsAndShowers(
|
||
number_of_baths=n_baths,
|
||
number_of_baths_connected=n_connected,
|
||
showers=showers,
|
||
)
|
||
|
||
def _rating_val(self, label: str) -> int:
|
||
v = self._next_val(label)
|
||
try:
|
||
return int(v.split()[-1]) if v else 0
|
||
except (ValueError, IndexError):
|
||
return 0
|
||
|
||
def _extract_renewables(self) -> Renewables:
|
||
fghrs_lines = self._section_lines(
|
||
"18.0 Flue Gas Heat Recovery System", "19.0 Photovoltaic Panel"
|
||
)
|
||
fghrs = self._local_bool(fghrs_lines, "Present")
|
||
|
||
terrain = self._str_val("Terrain Type")
|
||
hydro_raw = self._next_val("Electricity generated [kWh/year]")
|
||
hydro = float(hydro_raw) if hydro_raw else 0.0
|
||
|
||
# RdSAP 10 §11.1 b): the Summary §19.0 may lodge a "% of roof
|
||
# area" row when the surveyor doesn't capture detailed kWp /
|
||
# orientation / pitch. `_int_val` returns 0 when the label is
|
||
# absent (cert lodges detailed pv_arrays instead) — collapse to
|
||
# None so downstream can distinguish "no PV" from "PV via %
|
||
# roof area path".
|
||
pv_pct = self._int_val("Proportion of roof area")
|
||
# Solar HW collector geometry — Summary §16.0. Only populated
|
||
# when the cert lodges "Are details known? Yes" in the solar
|
||
# block. Cert 000565 lodges West / 30° / Modest. When absent
|
||
# (cert says no, or no solar HW at all) → None and the cascade
|
||
# falls back to RdSAP 10 §10.11 Table 29 defaults (South / 30°
|
||
# / Modest).
|
||
solar_lines = self._section_lines(
|
||
"16.0 Solar water heating",
|
||
"17.0 Waste Water Heat Recovery System",
|
||
)
|
||
solar_orientation = self._local_val(
|
||
solar_lines, "Collector orientation",
|
||
)
|
||
solar_pitch_raw = self._local_val(solar_lines, "Collector elevation")
|
||
solar_pitch = _parse_solar_pitch_deg(solar_pitch_raw)
|
||
solar_overshading = self._local_val(solar_lines, "Overshading")
|
||
return Renewables(
|
||
solar_water_heating=self._bool_val("Solar Water Heating"),
|
||
wwhrs_present=self._bool_val("Is WWHRS present in the property?"),
|
||
flue_gas_heat_recovery_present=fghrs,
|
||
photovoltaic_panel=self._str_val("Photovoltaic Panel"),
|
||
export_capable_meter=self._bool_val("Export capable meter"),
|
||
wind_turbine_present=self._bool_val("Wind turbine present?"),
|
||
wind_turbines_terrain_type=terrain,
|
||
hydro_electricity_generated_kwh=hydro,
|
||
pv_arrays=self._extract_pv_arrays(),
|
||
pv_diverter_present=self._bool_val("Diverter present"),
|
||
pv_percent_roof_area=pv_pct if pv_pct > 0 else None,
|
||
solar_hw_collector_orientation=solar_orientation,
|
||
solar_hw_collector_pitch_deg=solar_pitch,
|
||
solar_hw_overshading=solar_overshading,
|
||
)
|
||
|
||
def _extract_pv_arrays(self) -> List[ElmhurstPvArray]:
|
||
"""Parse the Elmhurst Summary §19.0 PV Panel section. Returns
|
||
one `ElmhurstPvArray` per lodged array, or [] when absent.
|
||
|
||
The Summary's PV block looks like (single-array, e.g. cert 0380):
|
||
Photovoltaic panel details
|
||
PV Cells kW Peak Orientation
|
||
Elevation
|
||
Overshading
|
||
|
||
3.00
|
||
South-East
|
||
45°
|
||
None Or Little
|
||
|
||
Multi-array (e.g. cert 0350 lodges 2 arrays):
|
||
...
|
||
1.50
|
||
South-East
|
||
45°
|
||
None Or Little
|
||
1.50
|
||
North-West
|
||
45°
|
||
None Or Little
|
||
|
||
— each array is 4 values in (kW Peak, Orientation, Elevation,
|
||
Overshading) order. Anchor on "Photovoltaic panel details",
|
||
skip header lines, then read values in 4-tuples until the
|
||
section breaks at the next §header or end-of-array tokens
|
||
(Batteries / Export / Capacity / etc.).
|
||
"""
|
||
anchor = "Photovoltaic panel details"
|
||
try:
|
||
idx = next(i for i, l in enumerate(self._lines) if l == anchor)
|
||
except StopIteration:
|
||
return []
|
||
# The header lines after the anchor are: "PV Cells kW Peak
|
||
# Orientation", "Elevation", "Overshading". Subsequent lines
|
||
# carry values for one OR MORE arrays. Stop at the next
|
||
# §-header (a "20.0" or "21.0") or post-PV section tokens
|
||
# ("Batteries", "Connected to", "Diverter", "Capacity", etc.).
|
||
header_tokens = {"pv cells", "kw peak", "orientation", "elevation", "overshading"}
|
||
stop_tokens = {
|
||
"batteries", "capacity known", "capacity",
|
||
"connected to the dwelling's meter", "diverter present",
|
||
"export capable meter",
|
||
}
|
||
values: List[str] = []
|
||
for line in self._lines[idx + 1:]:
|
||
stripped = line.strip()
|
||
if not stripped:
|
||
continue
|
||
lower = stripped.lower()
|
||
if lower in stop_tokens:
|
||
break
|
||
# Next §-header (e.g. "20.0 Wind Turbine") closes the block —
|
||
# match "<digits>.<digit><whitespace><word>" so kWp values
|
||
# like "1.50" don't trip the close.
|
||
if re.match(r"^\d{1,2}\.\d\s+\w", stripped):
|
||
break
|
||
if any(h in lower for h in header_tokens):
|
||
continue
|
||
values.append(stripped)
|
||
# Walk values in 4-tuples; an incomplete trailing tuple is dropped.
|
||
arrays: List[ElmhurstPvArray] = []
|
||
for i in range(0, len(values) - 3, 4):
|
||
try:
|
||
kwp = float(values[i])
|
||
except ValueError:
|
||
continue
|
||
orientation = values[i + 1]
|
||
# Elevation lodged as "45°" — strip trailing degree symbol.
|
||
m = re.match(r"^(\d+)", values[i + 2])
|
||
if m is None:
|
||
continue
|
||
elevation = int(m.group(1))
|
||
overshading = values[i + 3]
|
||
arrays.append(ElmhurstPvArray(
|
||
peak_power_kw=kwp,
|
||
orientation=orientation,
|
||
elevation_deg=elevation,
|
||
overshading=overshading,
|
||
))
|
||
return arrays
|
||
|
||
def extract(self) -> ElmhurstSiteNotes:
|
||
emissions_raw = self._next_val("Emissions (t/year)")
|
||
co2 = float(emissions_raw.split()[0]) if emissions_raw else 0.0
|
||
|
||
return ElmhurstSiteNotes(
|
||
surveyor_info=self._extract_surveyor_info(),
|
||
property_details=self._extract_property_details(),
|
||
current_sap_rating=self._rating_val("Current SAP rating"),
|
||
potential_sap_rating=self._rating_val("Potential SAP rating"),
|
||
current_ei_rating=self._rating_val("Current EI rating"),
|
||
potential_ei_rating=self._rating_val("Potential EI rating"),
|
||
co2_emissions_current_t=co2,
|
||
property_type=self._str_val("1.0 Property type"),
|
||
attachment=self._extract_attachment(),
|
||
number_of_storeys=self._int_val("Storeys"),
|
||
habitable_rooms=self._int_val("Habitable Rooms"),
|
||
heated_habitable_rooms=self._int_val("Heated Habitable Rooms"),
|
||
construction_age_band=self._str_val("Main Property"),
|
||
dimensions=self._extract_dimensions(),
|
||
has_conservatory=self._bool_val("Is there a conservatory?"),
|
||
walls=self._extract_walls(),
|
||
roof=self._extract_roof(),
|
||
floor=self._extract_floor(),
|
||
door_count=self._int_val("Total Number of Doors"),
|
||
insulated_door_count=self._int_val("Number of Insulated Doors"),
|
||
insulated_door_u_value=self._extract_door_u_value(),
|
||
windows=self._extract_windows(),
|
||
draught_proofing_percent=self._int_val("Draught Proofing"),
|
||
ventilation=self._extract_ventilation(),
|
||
lighting=self._extract_lighting(),
|
||
main_heating=self._extract_main_heating(),
|
||
meters=self._extract_meters(),
|
||
water_heating=self._extract_water_heating(),
|
||
baths_and_showers=self._extract_baths_and_showers(),
|
||
renewables=self._extract_renewables(),
|
||
extensions=self._extract_extensions(),
|
||
room_in_roof=self._extract_room_in_roof_from_text(),
|
||
)
|
||
|
||
def _extract_room_in_roof_from_text(self) -> Optional[RoomInRoof]:
|
||
"""Convenience wrapper: pulls the Main §4 body + the §3 age-band
|
||
text once so `_extract_room_in_roof` doesn't need to re-slice
|
||
the document."""
|
||
dim_section = self._between("4.0 Dimensions:", "5.0 Conservatory:")
|
||
bp_chunks = self._split_section_by_bp(dim_section)
|
||
main_body = bp_chunks[0][1] if bp_chunks else dim_section
|
||
return self._extract_room_in_roof(main_body, self._text)
|