mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
The §11 Windows table in the Summary PDF doesn't lay out identically across the cohort. Three new quirks added to the layout-style parser so the remaining 5 certs can be debugged with windows actually extracted: 1. `Wood 0.70` combined frame_type+frame_factor line — previously the parser expected them on separate lines (data+1 / data+2) and rejected the window when the joined form appeared. 2. Trailing glazing-type on the data line — `1.22 1.76 2.15 Double pre 2002` is the joined-cell variant in 000516; the W/H/Area anchor now captures the trailing phrase as an optional 4th group and feeds it through as `inline_glazing_type`, bypassing the separate-line glazing-prefix scan. 3. Cross-window gap with no glazing marker — `_partition_after_manuf` now falls back to "second orientation token in gap" when no glazing-type-prefix word appears. Covers the 000516 layout where each window has prefix+suffix orient tokens (no inline orient) and the glazing-type is joined-to-data. The 5 remaining Summary PDFs are copied into `backend/documents_parser/tests/fixtures/` ready for per-cert mapper work. Mirror pin tests deferred — each cert still has its own diff to close (handover in NEXT_AGENT_PROMPT.md documents the per-cert state, e.g. 000477 needs secondary-heating extraction, 000516 needs roof-window separation). Current cohort SAP deltas vs the U985 worksheet PDFs (target 1e-4): 000474 0.0000 ✓ 000477 +6.3655 secondary heating + lighting 000480 +8.2695 diagnosis pending 000487 +8.1433 extractor still drops windows 000490 +5.6551 diagnosis pending 000516 +5.9812 roof-window separation Wider regression stays green (754 pass). Pyright net-zero on touched files. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
890 lines
38 KiB
Python
890 lines
38 KiB
Python
import re
|
|
from datetime import date, datetime
|
|
from typing import List, Optional
|
|
|
|
from datatypes.epc.surveys.elmhurst_site_notes import (
|
|
BathsAndShowers,
|
|
BuildingPartDimensions,
|
|
ElmhurstSiteNotes,
|
|
ExtensionPart,
|
|
FloorDetails,
|
|
FloorDimension,
|
|
Lighting,
|
|
MainHeating,
|
|
Meters,
|
|
PropertyDetails,
|
|
Renewables,
|
|
RoofDetails,
|
|
Shower,
|
|
SurveyorInfo,
|
|
VentilationAndCooling,
|
|
WallDetails,
|
|
WaterHeating,
|
|
Window,
|
|
)
|
|
|
|
|
|
class ElmhurstSiteNotesExtractor:
|
|
def __init__(self, pages: List[str]) -> None:
|
|
self._text = "\n".join(pages)
|
|
self._lines = [l.strip() for l in self._text.splitlines() if l.strip()]
|
|
|
|
# --- generic helpers ---
|
|
|
|
def _next_val(self, label: str) -> Optional[str]:
|
|
lc = label.rstrip(":") + ":"
|
|
lb = label.rstrip(":")
|
|
for i, line in enumerate(self._lines):
|
|
if line.startswith(lc) and len(line) > len(lc):
|
|
return line[len(lc):].strip() or None
|
|
if line == lc or line == lb:
|
|
for j in range(i + 1, min(i + 4, len(self._lines))):
|
|
v = self._lines[j]
|
|
if v.endswith(":") or v.startswith("©"):
|
|
return None
|
|
if v:
|
|
return v
|
|
return None
|
|
return None
|
|
|
|
def _str_val(self, label: str) -> str:
|
|
v = self._next_val(label)
|
|
return " ".join(v.split()) if v else ""
|
|
|
|
def _opt_str(self, label: str) -> Optional[str]:
|
|
v = self._next_val(label)
|
|
return " ".join(v.split()) if v else None
|
|
|
|
def _bool_val(self, label: str) -> bool:
|
|
v = self._next_val(label)
|
|
return v is not None and v.lower() == "yes"
|
|
|
|
def _int_val(self, label: str) -> int:
|
|
v = self._next_val(label)
|
|
try:
|
|
return int(v.split()[0]) if v else 0
|
|
except (ValueError, IndexError):
|
|
return 0
|
|
|
|
def _date_val(self, label: str) -> date:
|
|
v = self._next_val(label)
|
|
if not v:
|
|
raise ValueError(f"Missing date for label: {label}")
|
|
return datetime.strptime(v.strip(), "%d/%m/%Y").date()
|
|
|
|
def _between(self, start: str, end: str) -> str:
|
|
try:
|
|
s = self._text.index(start) + len(start)
|
|
e = self._text.index(end, s)
|
|
return self._text[s:e]
|
|
except ValueError:
|
|
return ""
|
|
|
|
# Multi-bp helpers: Summary PDFs subdivide §4/§7/§8/§9 with explicit
|
|
# "Main Property" / "1st Extension" / "2nd Extension" headers. The
|
|
# existing single-bp fixture also carries "Main Property" as a header
|
|
# before the body. This helper splits a section into per-bp chunks.
|
|
_BP_HEADER_RE = re.compile(
|
|
r"^(Main Property|\d+(?:st|nd|rd|th) Extension)\s*$",
|
|
re.MULTILINE,
|
|
)
|
|
|
|
def _split_section_by_bp(self, section_text: str) -> List[tuple[str, str]]:
|
|
"""Split a section's text into per-bp subsections.
|
|
|
|
Returns ``[(bp_name, body), ...]`` in document order. Body is
|
|
the text between this bp's header and the next bp's header
|
|
(exclusive). Returns ``[("Main Property", section_text)]`` when
|
|
no headers are found (defensive fallback for malformed PDFs).
|
|
"""
|
|
matches = list(self._BP_HEADER_RE.finditer(section_text))
|
|
if not matches:
|
|
return [("Main Property", section_text)]
|
|
result: List[tuple[str, str]] = []
|
|
for i, m in enumerate(matches):
|
|
name = m.group(1)
|
|
body_start = m.end()
|
|
body_end = (
|
|
matches[i + 1].start() if i + 1 < len(matches) else len(section_text)
|
|
)
|
|
result.append((name, section_text[body_start:body_end]))
|
|
return result
|
|
|
|
def _section_lines(self, start: str, end: str) -> List[str]:
|
|
text = self._between(start, end)
|
|
return [l.strip() for l in text.splitlines() if l.strip()]
|
|
|
|
def _local_val(self, lines: List[str], label: str) -> Optional[str]:
|
|
lb = label.rstrip(":")
|
|
lc = lb + ":"
|
|
for i, line in enumerate(lines):
|
|
if line.startswith(lc) and len(line) > len(lc):
|
|
return line[len(lc):].strip() or None
|
|
if line == lc or line == lb:
|
|
for j in range(i + 1, min(i + 4, len(lines))):
|
|
v = lines[j]
|
|
if v.endswith(":") or v.startswith("©"):
|
|
return None
|
|
if v:
|
|
return v
|
|
return None
|
|
return None
|
|
|
|
def _local_str(self, lines: List[str], label: str) -> str:
|
|
v = self._local_val(lines, label)
|
|
return " ".join(v.split()) if v else ""
|
|
|
|
def _local_bool(self, lines: List[str], label: str) -> bool:
|
|
v = self._local_val(lines, label)
|
|
return v is not None and v.lower() == "yes"
|
|
|
|
# --- section extractors ---
|
|
|
|
def _extract_surveyor_info(self) -> SurveyorInfo:
|
|
return SurveyorInfo(
|
|
surveyor_code=self._str_val("Surveyor"),
|
|
name=self._str_val("Name"),
|
|
title=self._str_val("Title"),
|
|
tel_number=self._str_val("Tel Number"),
|
|
survey_reference=self._str_val("Survey Reference"),
|
|
my_reference=self._opt_str("My Reference"),
|
|
)
|
|
|
|
def _extract_property_details(self) -> PropertyDetails:
|
|
epc_m = re.search(
|
|
r"Check for the existence of\nan EPC:\n(Yes|No)", self._text
|
|
)
|
|
epc_exists = epc_m.group(1).lower() == "yes" if epc_m else False
|
|
|
|
return PropertyDetails(
|
|
rdsap_version=self._str_val("RdSAP version"),
|
|
reference_number=self._str_val("Reference Number"),
|
|
lodgement_required=self._bool_val("Lodgement Required"),
|
|
regs_region=self._str_val("Regs Region"),
|
|
epc_language=self._str_val("EPC Language"),
|
|
postcode=self._str_val("Postcode"),
|
|
region=self._str_val("Region"),
|
|
street=self._str_val("Street"),
|
|
town=self._str_val("Town"),
|
|
tenure=self._str_val("Property Tenure"),
|
|
transaction_type=self._str_val("Transaction Type"),
|
|
inspection_date=self._date_val("Inspection Date"),
|
|
process_date=self._date_val("Process date"),
|
|
epc_exists=epc_exists,
|
|
uprn=self._opt_str("UPRN"),
|
|
house_name=self._opt_str("House Name"),
|
|
house_number=self._opt_str("House No"),
|
|
locality=self._opt_str("Locality"),
|
|
county=self._opt_str("County"),
|
|
)
|
|
|
|
def _extract_attachment(self) -> str:
|
|
m = re.search(r"1\.0 Property type:\n[^\n]+\n([^\n]+)", self._text)
|
|
return " ".join(m.group(1).strip().split()) if m else ""
|
|
|
|
def _floors_from_dimensions_body(self, body: str) -> List[FloorDimension]:
|
|
"""Parse FloorDimension entries from a single bp's §4 body."""
|
|
matches = re.findall(
|
|
r"([A-Za-z ]+Floor):\n([\d.]+)\n([\d.]+)\n([\d.]+)\n([\d.]+)",
|
|
body,
|
|
)
|
|
return [
|
|
FloorDimension(
|
|
name=name.strip(),
|
|
area_m2=float(area),
|
|
room_height_m=float(height),
|
|
heat_loss_perimeter_m=float(hlp),
|
|
party_wall_length_m=float(pwl),
|
|
)
|
|
for name, area, height, hlp, pwl in matches
|
|
]
|
|
|
|
def _extract_dimensions(self) -> BuildingPartDimensions:
|
|
"""Main-property dimensions only. Extensions are picked up by
|
|
`_extract_extensions`."""
|
|
dim_type = self._str_val("Dimension type")
|
|
section = self._between("4.0 Dimensions:", "5.0 Conservatory:")
|
|
bp_chunks = self._split_section_by_bp(section)
|
|
main_body = bp_chunks[0][1] if bp_chunks else section
|
|
return BuildingPartDimensions(
|
|
dimension_type=dim_type,
|
|
floors=self._floors_from_dimensions_body(main_body),
|
|
)
|
|
|
|
def _wall_details_from_lines(self, lines: List[str]) -> WallDetails:
|
|
thickness_raw = self._local_val(lines, "Wall Thickness")
|
|
thickness_mm = (
|
|
int(thickness_raw.split()[0]) if thickness_raw else None
|
|
)
|
|
return WallDetails(
|
|
wall_type=self._local_str(lines, "Type"),
|
|
insulation=self._local_str(lines, "Insulation"),
|
|
thickness_unknown=self._local_bool(lines, "Wall Thickness Unknown"),
|
|
u_value_known=self._local_bool(lines, "U-value Known"),
|
|
party_wall_type=self._local_str(lines, "Party Wall Type"),
|
|
thickness_mm=thickness_mm,
|
|
)
|
|
|
|
def _extract_walls(self) -> WallDetails:
|
|
section = self._between("7.0 Walls:", "8.0 Roofs:")
|
|
bp_chunks = self._split_section_by_bp(section)
|
|
main_body = bp_chunks[0][1] if bp_chunks else section
|
|
lines = [l.strip() for l in main_body.splitlines() if l.strip()]
|
|
return self._wall_details_from_lines(lines)
|
|
|
|
def _roof_details_from_lines(self, lines: List[str]) -> RoofDetails:
|
|
thickness_raw = self._local_val(lines, "Insulation Thickness")
|
|
thickness_mm = (
|
|
int(thickness_raw.split()[0]) if thickness_raw and thickness_raw.split()[0].isdigit() else None
|
|
)
|
|
return RoofDetails(
|
|
roof_type=self._local_str(lines, "Type"),
|
|
insulation=self._local_str(lines, "Insulation"),
|
|
u_value_known=self._local_bool(lines, "U-value Known"),
|
|
insulation_thickness_mm=thickness_mm,
|
|
)
|
|
|
|
def _extract_roof(self) -> RoofDetails:
|
|
section = self._between("8.0 Roofs:", "8.1 Rooms in Roof:")
|
|
bp_chunks = self._split_section_by_bp(section)
|
|
main_body = bp_chunks[0][1] if bp_chunks else section
|
|
lines = [l.strip() for l in main_body.splitlines() if l.strip()]
|
|
return self._roof_details_from_lines(lines)
|
|
|
|
def _floor_details_from_lines(self, lines: List[str]) -> FloorDetails:
|
|
u_val_raw = self._local_val(lines, "Default U-value")
|
|
default_u = float(u_val_raw) if u_val_raw else None
|
|
return FloorDetails(
|
|
location=self._local_str(lines, "Location"),
|
|
floor_type=self._local_str(lines, "Type"),
|
|
insulation=self._local_str(lines, "Insulation"),
|
|
u_value_known=self._local_bool(lines, "U-value Known"),
|
|
default_u_value=default_u,
|
|
)
|
|
|
|
def _extract_floor(self) -> FloorDetails:
|
|
section = self._between("9.0 Floors:", "10.0 Doors:")
|
|
bp_chunks = self._split_section_by_bp(section)
|
|
main_body = bp_chunks[0][1] if bp_chunks else section
|
|
lines = [l.strip() for l in main_body.splitlines() if l.strip()]
|
|
return self._floor_details_from_lines(lines)
|
|
|
|
def _extract_extensions(self) -> List[ExtensionPart]:
|
|
"""Collect non-Main building parts. Cross-references the §4, §7,
|
|
§8, §9 per-bp subsections by extension name. "As Main: Yes"
|
|
within a section body inherits the main bp's data for that
|
|
section; otherwise the section body is parsed in isolation."""
|
|
# Gather per-section chunks once.
|
|
dim_section = self._between("4.0 Dimensions:", "5.0 Conservatory:")
|
|
wall_section = self._between("7.0 Walls:", "8.0 Roofs:")
|
|
roof_section = self._between("8.0 Roofs:", "8.1 Rooms in Roof:")
|
|
floor_section = self._between("9.0 Floors:", "10.0 Doors:")
|
|
dim_type = self._str_val("Dimension type")
|
|
|
|
dim_chunks = dict(self._split_section_by_bp(dim_section))
|
|
wall_chunks = dict(self._split_section_by_bp(wall_section))
|
|
roof_chunks = dict(self._split_section_by_bp(roof_section))
|
|
floor_chunks = dict(self._split_section_by_bp(floor_section))
|
|
|
|
main_walls = self._extract_walls()
|
|
main_roof = self._extract_roof()
|
|
main_floor = self._extract_floor()
|
|
|
|
# Per-bp age-band lookup. Section 3 contains lines like
|
|
# "1st Extension B 1900-1929" — the band sits after the name.
|
|
age_band_re = re.compile(
|
|
r"^(\d+(?:st|nd|rd|th) Extension)\s+([A-M] [^\n]+)$",
|
|
re.MULTILINE,
|
|
)
|
|
age_bands = {m.group(1): m.group(2).strip() for m in age_band_re.finditer(self._text)}
|
|
|
|
# Collect names in document order from the dimensions section
|
|
# (excluding Main Property).
|
|
names = [
|
|
name for name, _ in self._split_section_by_bp(dim_section)
|
|
if name != "Main Property"
|
|
]
|
|
|
|
extensions: List[ExtensionPart] = []
|
|
for name in names:
|
|
dim_body = dim_chunks.get(name, "")
|
|
wall_body = wall_chunks.get(name, "")
|
|
roof_body = roof_chunks.get(name, "")
|
|
floor_body = floor_chunks.get(name, "")
|
|
|
|
wall_lines = [l.strip() for l in wall_body.splitlines() if l.strip()]
|
|
roof_lines = [l.strip() for l in roof_body.splitlines() if l.strip()]
|
|
floor_lines = [l.strip() for l in floor_body.splitlines() if l.strip()]
|
|
|
|
walls = main_walls if self._local_bool(wall_lines, "As Main Wall") else self._wall_details_from_lines(wall_lines)
|
|
roof = main_roof if self._local_bool(roof_lines, "As Main") else self._roof_details_from_lines(roof_lines)
|
|
floor = main_floor if self._local_bool(floor_lines, "As Main") else self._floor_details_from_lines(floor_lines)
|
|
|
|
extensions.append(
|
|
ExtensionPart(
|
|
name=name,
|
|
construction_age_band=age_bands.get(name, ""),
|
|
dimensions=BuildingPartDimensions(
|
|
dimension_type=dim_type,
|
|
floors=self._floors_from_dimensions_body(dim_body),
|
|
),
|
|
walls=walls,
|
|
roof=roof,
|
|
floor=floor,
|
|
)
|
|
)
|
|
return extensions
|
|
|
|
def _extract_windows(self) -> List[Window]:
|
|
# Textract-style pages keep "Permanent\s+Shutters" adjacent in
|
|
# reading order and the windows table flows as one column-block
|
|
# the existing token-walker can step through. PDF-derived pages
|
|
# (Summary PDFs preprocessed from `pdftotext -layout`) break the
|
|
# header across lines, so this regex misses entirely and the
|
|
# `_extract_windows_from_layout` fallback below picks them up
|
|
# by anchoring on the W/H/Area data line.
|
|
m = re.search(
|
|
r"Permanent\s+Shutters\n(.*?)Draught Proofing",
|
|
self._text,
|
|
re.DOTALL,
|
|
)
|
|
if not m:
|
|
return self._extract_windows_from_layout()
|
|
tokens = [t.strip() for t in m.group(1).splitlines() if t.strip()]
|
|
windows: List[Window] = []
|
|
i = 0
|
|
while i + 12 < len(tokens):
|
|
try:
|
|
width_m = float(tokens[i])
|
|
height_m = float(tokens[i + 1])
|
|
area_m2 = float(tokens[i + 2])
|
|
except (ValueError, IndexError):
|
|
i += 1
|
|
continue
|
|
i += 3
|
|
# Collect glazing type tokens until frame_factor (0 < v ≤ 1.0)
|
|
glazing_parts: List[str] = []
|
|
while i < len(tokens):
|
|
try:
|
|
v = float(tokens[i])
|
|
if 0.0 < v <= 1.0:
|
|
break
|
|
glazing_parts.append(tokens[i])
|
|
except ValueError:
|
|
glazing_parts.append(tokens[i])
|
|
i += 1
|
|
# If last glazing token is a single word (no spaces, not numeric) it's the frame_type
|
|
frame_type: Optional[str] = None
|
|
if glazing_parts and " " not in glazing_parts[-1] and not glazing_parts[-1].replace(".", "").isdigit():
|
|
frame_type = glazing_parts.pop()
|
|
glazing_type = " ".join(glazing_parts).strip()
|
|
if i >= len(tokens):
|
|
break
|
|
frame_factor = float(tokens[i]); i += 1
|
|
# Consume glazing_gap if present ("mm" token, possibly multi-token e.g. "16 mm or more")
|
|
glazing_gap: Optional[str] = None
|
|
if i < len(tokens) and "mm" in tokens[i]:
|
|
gap_parts = [tokens[i]]; i += 1
|
|
while i < len(tokens) and tokens[i].lower() in {"or", "more"}:
|
|
gap_parts.append(tokens[i]); i += 1
|
|
glazing_gap = " ".join(gap_parts)
|
|
building_part = tokens[i]; i += 1
|
|
location = tokens[i]; i += 1
|
|
orientation = tokens[i]; i += 1
|
|
data_source = tokens[i]; i += 1
|
|
u_value = float(tokens[i]); i += 1
|
|
g_value = float(tokens[i]); i += 1
|
|
draught_proofed = tokens[i].lower() == "yes"; i += 1
|
|
permanent_shutters = tokens[i]; i += 1
|
|
windows.append(
|
|
Window(
|
|
width_m=width_m,
|
|
height_m=height_m,
|
|
area_m2=area_m2,
|
|
glazing_type=glazing_type,
|
|
frame_factor=frame_factor,
|
|
building_part=building_part,
|
|
location=location,
|
|
orientation=orientation,
|
|
data_source=data_source,
|
|
u_value=u_value,
|
|
g_value=g_value,
|
|
draught_proofed=draught_proofed,
|
|
permanent_shutters=permanent_shutters,
|
|
frame_type=frame_type,
|
|
glazing_gap=glazing_gap,
|
|
)
|
|
)
|
|
return windows
|
|
|
|
# Anchors used by the layout-style window parser. The W/H/Area anchor
|
|
# is sometimes followed by a joined glazing-type phrase on the same
|
|
# line (e.g. '1.22 1.76 2.15 Double pre 2002'); the optional 4th
|
|
# capture surfaces that text so the parser can use it instead of a
|
|
# separately-laid-out prefix line.
|
|
_WIDTH_HEIGHT_AREA_RE = re.compile(
|
|
r"^(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)(?:\s+(\S.*?))?$"
|
|
)
|
|
_MANUFACTURER_RE = re.compile(r"^(Manufacturer|Default)\s+(\d+\.\d+)$")
|
|
_ORIENTATION_TOKENS = frozenset({
|
|
"North", "South", "East", "West", "NE", "NW", "SE", "SW",
|
|
})
|
|
_BP_INLINE_TOKENS = frozenset({"Main"}) # "Extension" only appears as suffix
|
|
# The Elmhurst Summary PDF lodges each window's glazing-type as a
|
|
# capitalised phrase like "Double between 2002" / "Double with unknown"
|
|
# / "Single" / "Triple" / "Secondary". The first token of that phrase
|
|
# marks the start of a new window's prefix block in the layout dump,
|
|
# which is the only stable signal partitioning one window's suffix
|
|
# from the next window's prefix.
|
|
_GLAZING_TYPE_PREFIX_WORDS = frozenset({
|
|
"Single", "Double", "Triple", "Secondary",
|
|
})
|
|
|
|
def _extract_windows_from_layout(self) -> List[Window]:
|
|
"""Fallback window parser for Summary PDFs preprocessed from
|
|
`pdftotext -layout`. Each window has two stable anchors:
|
|
a "W H Area" line and a "Manufacturer <U_value>" line a few
|
|
lines further down. Everything between holds frame_type,
|
|
frame_factor, and a variable mix of glazing_gap, building_part,
|
|
location, and orientation (depending on which fields the
|
|
surveyor lodged); everything around the window holds glazing-
|
|
type/building-part/orientation prefix/suffix tokens split by
|
|
the layout preprocessor.
|
|
"""
|
|
m = re.search(
|
|
r"11\.0 Windows:(.*?)(Draught Proofing|12\.0 Ventilation)",
|
|
self._text, re.DOTALL,
|
|
)
|
|
if not m:
|
|
return []
|
|
lines = m.group(1).splitlines()
|
|
|
|
# Locate all (data_line, manufacturer_line) pairs in document
|
|
# order. Each pair is one window.
|
|
data_anchors: List[tuple[int, re.Match[str]]] = []
|
|
for i, line in enumerate(lines):
|
|
anchor = self._WIDTH_HEIGHT_AREA_RE.match(line.strip())
|
|
if anchor is not None:
|
|
data_anchors.append((i, anchor))
|
|
|
|
windows: List[Window] = []
|
|
for k, (data_idx, anchor) in enumerate(data_anchors):
|
|
manuf_idx = self._find_manufacturer_after(lines, data_idx)
|
|
if manuf_idx is None:
|
|
continue
|
|
prev_manuf_idx = (
|
|
self._find_manufacturer_after(lines, data_anchors[k - 1][0])
|
|
if k > 0 else None
|
|
)
|
|
next_data_idx = (
|
|
data_anchors[k + 1][0] if k + 1 < len(data_anchors) else len(lines)
|
|
)
|
|
# Partition the cross-window gap between this window's suffix
|
|
# and the next window's prefix on the first glazing-type-start
|
|
# token (Single/Double/Triple/Secondary). The same boundary
|
|
# is used symmetrically — current window's `after_end` = next
|
|
# window's `before_start` — so prefix tokens of W_{k+1} never
|
|
# get attributed as suffix of W_k (which was the bug producing
|
|
# orientation='East-South' for windows where 'South' actually
|
|
# belonged to the next row).
|
|
before_start = (
|
|
self._partition_after_manuf(lines, prev_manuf_idx, data_idx)
|
|
if prev_manuf_idx is not None else 0
|
|
)
|
|
after_end = self._partition_after_manuf(lines, manuf_idx, next_data_idx)
|
|
try:
|
|
window = self._parse_window_from_anchors(
|
|
lines=lines,
|
|
data_idx=data_idx,
|
|
manuf_idx=manuf_idx,
|
|
anchor=anchor,
|
|
before_start=before_start,
|
|
after_end=after_end,
|
|
)
|
|
except (ValueError, IndexError):
|
|
continue
|
|
if window is not None:
|
|
windows.append(window)
|
|
return windows
|
|
|
|
def _find_manufacturer_after(self, lines: List[str], data_idx: int) -> Optional[int]:
|
|
for j in range(data_idx + 1, min(data_idx + 12, len(lines))):
|
|
if self._MANUFACTURER_RE.match(lines[j].strip()):
|
|
return j
|
|
return None
|
|
|
|
_FRAME_TYPE_AND_FACTOR_RE = re.compile(r"^(\S+(?:\s+\S+)*?)\s+(\d\.\d+)$")
|
|
|
|
def _parse_frame_type_and_factor(
|
|
self, lines: List[str], data_idx: int
|
|
) -> tuple[str, Optional[float], int]:
|
|
"""Return `(frame_type, frame_factor, middle_start_idx)` from
|
|
the lines immediately after the data anchor. Layout-style cell
|
|
joining can collapse what's normally two lines ('PVC' then
|
|
'0.70') into one ('Wood 0.70'); both shapes need to feed the
|
|
same downstream slice."""
|
|
combined = self._FRAME_TYPE_AND_FACTOR_RE.match(lines[data_idx + 1].strip())
|
|
if combined is not None:
|
|
return combined.group(1), float(combined.group(2)), data_idx + 2
|
|
if data_idx + 2 >= len(lines):
|
|
return lines[data_idx + 1].strip(), None, data_idx + 2
|
|
frame_type = lines[data_idx + 1].strip()
|
|
try:
|
|
frame_factor = float(lines[data_idx + 2].strip())
|
|
except ValueError:
|
|
return frame_type, None, data_idx + 3
|
|
return frame_type, frame_factor, data_idx + 3
|
|
|
|
def _partition_after_manuf(
|
|
self, lines: List[str], manuf_idx: int, next_data_idx: int
|
|
) -> int:
|
|
"""Return the exclusive upper bound for this window's suffix
|
|
block (and the inclusive lower bound for the next window's prefix
|
|
block). After the manufacturer line come 3 fixed tokens (g_value,
|
|
draught, shutters); the variable suffix lines start at manuf+4
|
|
and run until either (a) the next window's glazing-type-start
|
|
token (e.g. 'Double between 2002', 'Single', 'Triple ...') or
|
|
(b) the second orientation token in the gap, whichever comes
|
|
first. Branch (b) covers layouts where the glazing-type is
|
|
joined to the data line (no separate prefix line exists), so
|
|
the only signal of window-transition is the orientation tokens
|
|
rotating: orient_suffix(k) → orient_prefix(k+1). Falls through
|
|
to `next_data_idx` when neither marker is present."""
|
|
scan_start = manuf_idx + 4
|
|
seen_orient = False
|
|
for j in range(scan_start, next_data_idx):
|
|
stripped = lines[j].strip()
|
|
first_word = stripped.split(" ", 1)[0]
|
|
if first_word in self._GLAZING_TYPE_PREFIX_WORDS:
|
|
return j
|
|
if stripped in self._ORIENTATION_TOKENS:
|
|
if seen_orient:
|
|
return j
|
|
seen_orient = True
|
|
return next_data_idx
|
|
|
|
def _parse_window_from_anchors(
|
|
self,
|
|
*,
|
|
lines: List[str],
|
|
data_idx: int,
|
|
manuf_idx: int,
|
|
anchor: re.Match[str],
|
|
before_start: int,
|
|
after_end: int,
|
|
) -> Optional[Window]:
|
|
width = float(anchor.group(1))
|
|
height = float(anchor.group(2))
|
|
area = float(anchor.group(3))
|
|
# Layout-style cell joining sometimes leaves the glazing-type
|
|
# phrase trailing the W H Area triplet on the same line (e.g.
|
|
# "1.22 1.76 2.15 Double pre 2002"); when present we pass it
|
|
# through as `inline_glazing_type` and the composer skips the
|
|
# would-be glazing-prefix scan.
|
|
inline_glazing_type = anchor.group(4) if anchor.lastindex and anchor.lastindex >= 4 else None
|
|
|
|
# frame_type and frame_factor immediately follow the data line.
|
|
# Layout-style cell joining sometimes collapses them onto a
|
|
# single "Wood 0.70" line; treat both shapes uniformly so the
|
|
# downstream `middle` slice still starts at the first variable
|
|
# field (glazing_gap / bp / location / orient).
|
|
if data_idx + 1 >= len(lines):
|
|
return None
|
|
frame_type, frame_factor, middle_start = self._parse_frame_type_and_factor(
|
|
lines, data_idx
|
|
)
|
|
if frame_factor is None or not 0.0 < frame_factor <= 1.0:
|
|
return None
|
|
|
|
# Variable-order tokens between frame_factor and Manufacturer.
|
|
middle = [lines[j].strip() for j in range(middle_start, manuf_idx)]
|
|
glazing_gap = next((t for t in middle if "mm" in t.lower()), None)
|
|
location = next((t for t in middle if "wall" in t.lower()), "External wall")
|
|
bp_inline = next((t for t in middle if t in self._BP_INLINE_TOKENS), None)
|
|
orient_inline = next(
|
|
(t for t in middle if t in self._ORIENTATION_TOKENS), None
|
|
)
|
|
|
|
# Manufacturer line carries data_source + u_value.
|
|
manuf_match = self._MANUFACTURER_RE.match(lines[manuf_idx].strip())
|
|
if manuf_match is None:
|
|
return None
|
|
data_source = manuf_match.group(1)
|
|
u_value = float(manuf_match.group(2))
|
|
|
|
# Post-manufacturer: g_value, draught, shutters.
|
|
if manuf_idx + 3 >= len(lines):
|
|
return None
|
|
try:
|
|
g_value = float(lines[manuf_idx + 1].strip())
|
|
except ValueError:
|
|
return None
|
|
draught_proofed = lines[manuf_idx + 2].strip().lower() == "yes"
|
|
permanent_shutters = lines[manuf_idx + 3].strip()
|
|
|
|
# Prefix / suffix tokens (variable count) carry the
|
|
# glazing-type, building-part, and orientation strings split by
|
|
# the layout preprocessor.
|
|
before = [lines[j].strip() for j in range(before_start, data_idx) if lines[j].strip()]
|
|
after = [lines[j].strip() for j in range(manuf_idx + 4, after_end) if lines[j].strip()]
|
|
|
|
glazing_type, building_part, orientation = self._compose_window_descriptors(
|
|
before=before,
|
|
after=after,
|
|
bp_inline=bp_inline,
|
|
orient_inline=orient_inline,
|
|
inline_glazing_type=inline_glazing_type,
|
|
)
|
|
|
|
return Window(
|
|
width_m=width,
|
|
height_m=height,
|
|
area_m2=area,
|
|
glazing_type=glazing_type,
|
|
frame_factor=frame_factor,
|
|
building_part=building_part,
|
|
location=location,
|
|
orientation=orientation,
|
|
data_source=data_source,
|
|
u_value=u_value,
|
|
g_value=g_value,
|
|
draught_proofed=draught_proofed,
|
|
permanent_shutters=permanent_shutters,
|
|
frame_type=frame_type,
|
|
glazing_gap=glazing_gap,
|
|
)
|
|
|
|
def _compose_window_descriptors(
|
|
self,
|
|
*,
|
|
before: List[str],
|
|
after: List[str],
|
|
bp_inline: Optional[str],
|
|
orient_inline: Optional[str],
|
|
inline_glazing_type: Optional[str] = None,
|
|
) -> tuple[str, str, str]:
|
|
"""Re-join the glazing-type / building-part / orientation tokens
|
|
split by the layout preprocessor. Each is at most 2 fragments
|
|
(one before the data line, one after); inline tokens in the
|
|
between-segment win over prefix/suffix fragments."""
|
|
# before holds (in document order, possibly): glazing_prefix,
|
|
# bp_prefix, orient_prefix — bp/orient may be missing.
|
|
# after holds: glazing_suffix, bp_suffix, orient_suffix — same.
|
|
prefix = list(before[-3:]) # last 3 lines preceding data
|
|
suffix = list(after[:3])
|
|
|
|
def pop_if_orientation(tokens: List[str]) -> Optional[str]:
|
|
for t in tokens:
|
|
if t in self._ORIENTATION_TOKENS:
|
|
tokens.remove(t)
|
|
return t
|
|
return None
|
|
|
|
def pop_if_bp_fragment(tokens: List[str]) -> Optional[str]:
|
|
# Prefix fragments like "1st" / "2nd" — match digit-prefixed
|
|
# ordinals; suffix fragments are always "Extension".
|
|
for t in tokens:
|
|
if re.match(r"^\d+(?:st|nd|rd|th)$", t) or t == "Extension":
|
|
tokens.remove(t)
|
|
return t
|
|
return None
|
|
|
|
orient_prefix_token = pop_if_orientation(prefix)
|
|
orient_suffix_token = pop_if_orientation(suffix)
|
|
bp_prefix_frag = pop_if_bp_fragment(prefix)
|
|
bp_suffix_frag = pop_if_bp_fragment(suffix)
|
|
|
|
# Glazing type: an inline glazing-type captured from the data
|
|
# line (layout-joined variant) wins; otherwise join the remaining
|
|
# prefix + suffix fragments.
|
|
if inline_glazing_type is not None:
|
|
glazing_type = inline_glazing_type
|
|
else:
|
|
glazing_type = " ".join([*prefix, *suffix]).strip()
|
|
|
|
# Building part: inline token wins; otherwise join prefix + suffix.
|
|
if bp_inline is not None:
|
|
building_part = bp_inline
|
|
else:
|
|
building_part = " ".join(
|
|
t for t in (bp_prefix_frag, bp_suffix_frag) if t
|
|
).strip()
|
|
|
|
# Orientation: inline token wins for the primary direction;
|
|
# combine with the opposite-direction fragment when present.
|
|
primary = orient_inline or orient_prefix_token or ""
|
|
secondary_candidates = [
|
|
t for t in (orient_prefix_token, orient_suffix_token) if t and t != primary
|
|
]
|
|
if primary and secondary_candidates:
|
|
orientation = f"{primary}-{secondary_candidates[0]}"
|
|
else:
|
|
orientation = primary
|
|
|
|
return glazing_type, building_part, orientation
|
|
|
|
def _extract_ventilation(self) -> VentilationAndCooling:
|
|
return VentilationAndCooling(
|
|
open_chimneys_count=self._int_val("No. of open chimneys"),
|
|
open_flues_count=self._int_val("No. of open flues"),
|
|
open_chimneys_closed_fire_count=self._int_val(
|
|
"No. of open chimneys/open flues attached to closed fire"
|
|
),
|
|
solid_fuel_boiler_flues_count=self._int_val(
|
|
"No. of flues attached to solid fuel boiler"
|
|
),
|
|
other_heater_flues_count=self._int_val(
|
|
"No. of open flues attached to other heater"
|
|
),
|
|
blocked_chimneys_count=self._int_val("No. of blocked chimneys"),
|
|
extract_fans_count=self._int_val("No. of intermittent extract fans"),
|
|
passive_vents_count=self._int_val("No. of passive vents"),
|
|
flueless_gas_fires_count=self._int_val("No. of flueless gas fires"),
|
|
fixed_space_cooling=self._bool_val("Fixed Space Cooling"),
|
|
draught_lobby=self._str_val("Draught Lobby"),
|
|
mechanical_ventilation=self._bool_val("Mechanical Ventilation"),
|
|
pressure_test_method=self._str_val("Test Method"),
|
|
)
|
|
|
|
def _extract_lighting(self) -> Lighting:
|
|
led_cfl_count_known = self._bool_val("Number of LED and CFL Known")
|
|
return Lighting(
|
|
total_bulbs=self._int_val("Total number of bulbs"),
|
|
led_cfl_count_known=led_cfl_count_known,
|
|
led_count=self._int_val("Number of LED lights"),
|
|
cfl_count=self._int_val("Number of CFL lights"),
|
|
incandescent_count=self._int_val("Total number of incandescents"),
|
|
low_energy_count=(
|
|
0 if led_cfl_count_known
|
|
else self._int_val("Total number of Low Energy")
|
|
),
|
|
)
|
|
|
|
def _extract_main_heating(self) -> MainHeating:
|
|
lines = self._section_lines("14.0 Main Heating1", "14.1 Main Heating2")
|
|
pct_raw = self._local_val(lines, "Percentage of Heat")
|
|
pct = int(pct_raw.split()[0]) if pct_raw else 0
|
|
return MainHeating(
|
|
heat_emitter=self._local_str(lines, "Heat Emitter"),
|
|
fuel_type=self._local_str(lines, "Fuel Type"),
|
|
flue_type=self._local_str(lines, "Flue Type"),
|
|
fan_assisted_flue=self._local_bool(lines, "Fan Assisted Flue"),
|
|
design_flow_temperature=self._local_str(lines, "Design flow temperature"),
|
|
heating_controls_ees=self._local_str(lines, "Main Heating Controls EES"),
|
|
heating_controls_sap=self._local_str(lines, "Main Heating Controls Sap"),
|
|
percentage_of_heat=pct,
|
|
pcdf_boiler_reference=self._local_val(lines, "PCDF boiler Reference"),
|
|
heat_pump_age=self._local_val(lines, "Heat pump age"),
|
|
)
|
|
|
|
def _extract_meters(self) -> Meters:
|
|
return Meters(
|
|
electricity_meter_type=self._str_val("Electricity meter type"),
|
|
main_gas=self._bool_val("Main gas"),
|
|
electricity_smart_meter=self._bool_val("Electricity Smart Meter Present"),
|
|
gas_smart_meter=self._bool_val("Gas Smart Meter Present"),
|
|
)
|
|
|
|
def _extract_water_heating(self) -> WaterHeating:
|
|
return WaterHeating(
|
|
water_heating_code=self._str_val("Water Heating Code"),
|
|
water_heating_sap_code=self._int_val("Water Heating SapCode"),
|
|
water_heating_fuel_type=self._str_val("Water Heating Fuel Type"),
|
|
hot_water_cylinder_present=self._bool_val("Hot Water Cylinder Present"),
|
|
)
|
|
|
|
def _extract_baths_and_showers(self) -> BathsAndShowers:
|
|
n_baths = self._int_val("Total Number of Baths")
|
|
n_connected = self._int_val("Number of Baths Connected")
|
|
try:
|
|
idx = self._lines.index("Connected")
|
|
except ValueError:
|
|
return BathsAndShowers(
|
|
number_of_baths=n_baths,
|
|
number_of_baths_connected=n_connected,
|
|
showers=[],
|
|
)
|
|
showers: List[Shower] = []
|
|
j = idx + 1
|
|
while j + 2 <= len(self._lines) - 1:
|
|
num_line = self._lines[j]
|
|
if not num_line.isdigit():
|
|
break
|
|
showers.append(
|
|
Shower(
|
|
shower_number=int(num_line),
|
|
outlet_type=self._lines[j + 1],
|
|
connected=self._lines[j + 2],
|
|
)
|
|
)
|
|
j += 3
|
|
return BathsAndShowers(
|
|
number_of_baths=n_baths,
|
|
number_of_baths_connected=n_connected,
|
|
showers=showers,
|
|
)
|
|
|
|
def _rating_val(self, label: str) -> int:
|
|
v = self._next_val(label)
|
|
try:
|
|
return int(v.split()[-1]) if v else 0
|
|
except (ValueError, IndexError):
|
|
return 0
|
|
|
|
def _extract_renewables(self) -> Renewables:
|
|
fghrs_lines = self._section_lines(
|
|
"18.0 Flue Gas Heat Recovery System", "19.0 Photovoltaic Panel"
|
|
)
|
|
fghrs = self._local_bool(fghrs_lines, "Present")
|
|
|
|
terrain = self._str_val("Terrain Type")
|
|
hydro_raw = self._next_val("Electricity generated [kWh/year]")
|
|
hydro = float(hydro_raw) if hydro_raw else 0.0
|
|
|
|
return Renewables(
|
|
solar_water_heating=self._bool_val("Solar Water Heating"),
|
|
wwhrs_present=self._bool_val("Is WWHRS present in the property?"),
|
|
flue_gas_heat_recovery_present=fghrs,
|
|
photovoltaic_panel=self._str_val("Photovoltaic Panel"),
|
|
export_capable_meter=self._bool_val("Export capable meter"),
|
|
wind_turbine_present=self._bool_val("Wind turbine present?"),
|
|
wind_turbines_terrain_type=terrain,
|
|
hydro_electricity_generated_kwh=hydro,
|
|
)
|
|
|
|
def extract(self) -> ElmhurstSiteNotes:
|
|
emissions_raw = self._next_val("Emissions (t/year)")
|
|
co2 = float(emissions_raw.split()[0]) if emissions_raw else 0.0
|
|
|
|
return ElmhurstSiteNotes(
|
|
surveyor_info=self._extract_surveyor_info(),
|
|
property_details=self._extract_property_details(),
|
|
current_sap_rating=self._rating_val("Current SAP rating"),
|
|
potential_sap_rating=self._rating_val("Potential SAP rating"),
|
|
current_ei_rating=self._rating_val("Current EI rating"),
|
|
potential_ei_rating=self._rating_val("Potential EI rating"),
|
|
co2_emissions_current_t=co2,
|
|
property_type=self._str_val("1.0 Property type"),
|
|
attachment=self._extract_attachment(),
|
|
number_of_storeys=self._int_val("Storeys"),
|
|
habitable_rooms=self._int_val("Habitable Rooms"),
|
|
heated_habitable_rooms=self._int_val("Heated Habitable Rooms"),
|
|
construction_age_band=self._str_val("Main Property"),
|
|
dimensions=self._extract_dimensions(),
|
|
has_conservatory=self._bool_val("Is there a conservatory?"),
|
|
walls=self._extract_walls(),
|
|
roof=self._extract_roof(),
|
|
floor=self._extract_floor(),
|
|
door_count=self._int_val("Total Number of Doors"),
|
|
insulated_door_count=self._int_val("Number of Insulated Doors"),
|
|
windows=self._extract_windows(),
|
|
draught_proofing_percent=self._int_val("Draught Proofing"),
|
|
ventilation=self._extract_ventilation(),
|
|
lighting=self._extract_lighting(),
|
|
main_heating=self._extract_main_heating(),
|
|
meters=self._extract_meters(),
|
|
water_heating=self._extract_water_heating(),
|
|
baths_and_showers=self._extract_baths_and_showers(),
|
|
renewables=self._extract_renewables(),
|
|
extensions=self._extract_extensions(),
|
|
)
|