import re from datetime import date, datetime from typing import List, Optional from datatypes.epc.surveys.elmhurst_site_notes import ( AlternativeWall, BathsAndShowers, BuildingPartDimensions, CommunityHeating, ElmhurstSiteNotes, ExtensionPart, FloorDetails, FloorDimension, Lighting, MainHeating, MainHeating2, Meters, PropertyDetails, Renewables, RoofDetails, RoomInRoof, RoomInRoofSurface, Shower, SurveyorInfo, VentilationAndCooling, ElmhurstPvArray, WallDetails, WaterHeating, Window, ) def _parse_solar_pitch_deg(raw: Optional[str]) -> Optional[int]: """Parse the §16.0 "Collector elevation" lodgement (e.g. "30°", "60°", or a bare integer). Returns None when absent or unparseable.""" if not raw: return None m = re.search(r"(\d+)", raw) return int(m.group(1)) if m else None class ElmhurstSiteNotesExtractor: def __init__(self, pages: List[str]) -> None: self._text = "\n".join(pages) self._lines = [l.strip() for l in self._text.splitlines() if l.strip()] # --- generic helpers --- def _next_val(self, label: str) -> Optional[str]: lc = label.rstrip(":") + ":" lb = label.rstrip(":") for i, line in enumerate(self._lines): if line.startswith(lc) and len(line) > len(lc): return line[len(lc):].strip() or None if line == lc or line == lb: for j in range(i + 1, min(i + 4, len(self._lines))): v = self._lines[j] if v.endswith(":") or v.startswith("©"): return None if v: return v return None return None def _str_val(self, label: str) -> str: v = self._next_val(label) return " ".join(v.split()) if v else "" def _opt_str(self, label: str) -> Optional[str]: v = self._next_val(label) return " ".join(v.split()) if v else None def _bool_val(self, label: str) -> bool: v = self._next_val(label) return v is not None and v.lower() == "yes" def _int_val(self, label: str) -> int: v = self._next_val(label) try: return int(v.split()[0]) if v else 0 except (ValueError, IndexError): return 0 def _date_val(self, label: str) -> date: v = self._next_val(label) if not v: raise ValueError(f"Missing date for label: {label}") return datetime.strptime(v.strip(), "%d/%m/%Y").date() def _between(self, start: str, end: str) -> str: try: s = self._text.index(start) + len(start) e = self._text.index(end, s) return self._text[s:e] except ValueError: return "" # Multi-bp helpers: Summary PDFs subdivide §4/§7/§8/§9 with explicit # "Main Property" / "1st Extension" / "2nd Extension" headers. The # existing single-bp fixture also carries "Main Property" as a header # before the body. This helper splits a section into per-bp chunks. _BP_HEADER_RE = re.compile( r"^(Main Property|\d+(?:st|nd|rd|th) Extension)\s*$", re.MULTILINE, ) def _split_section_by_bp(self, section_text: str) -> List[tuple[str, str]]: """Split a section's text into per-bp subsections. Returns ``[(bp_name, body), ...]`` in document order. Body is the text between this bp's header and the next bp's header (exclusive). Returns ``[("Main Property", section_text)]`` when no headers are found (defensive fallback for malformed PDFs). """ matches = list(self._BP_HEADER_RE.finditer(section_text)) if not matches: return [("Main Property", section_text)] result: List[tuple[str, str]] = [] for i, m in enumerate(matches): name = m.group(1) body_start = m.end() body_end = ( matches[i + 1].start() if i + 1 < len(matches) else len(section_text) ) result.append((name, section_text[body_start:body_end])) return result def _section_lines(self, start: str, end: str) -> List[str]: text = self._between(start, end) return [l.strip() for l in text.splitlines() if l.strip()] def _section_lines_first_end( self, start: str, ends: tuple[str, ...], ) -> List[str]: """Like `_section_lines` but accepts multiple end-marker candidates and uses whichever appears first after `start`. Defends against Summary-shape variants where the next-section heading differs (e.g. §14.0 Main Heating1 closes at "14.1 Main Heating2" on boiler/HP certs but at "14.1 Community Heating" on community- heated certs).""" try: s = self._text.index(start) + len(start) except ValueError: return [] earliest: int | None = None for end in ends: try: idx = self._text.index(end, s) except ValueError: continue if earliest is None or idx < earliest: earliest = idx if earliest is None: return [] text = self._text[s:earliest] return [l.strip() for l in text.splitlines() if l.strip()] def _local_val(self, lines: List[str], label: str) -> Optional[str]: lb = label.rstrip(":") lc = lb + ":" for i, line in enumerate(lines): if line.startswith(lc) and len(line) > len(lc): return line[len(lc):].strip() or None if line == lc or line == lb: for j in range(i + 1, min(i + 4, len(lines))): v = lines[j] if v.endswith(":") or v.startswith("©"): return None if v: return v return None return None def _local_str(self, lines: List[str], label: str) -> str: v = self._local_val(lines, label) return " ".join(v.split()) if v else "" def _local_bool(self, lines: List[str], label: str) -> bool: v = self._local_val(lines, label) return v is not None and v.lower() == "yes" # --- section extractors --- def _extract_surveyor_info(self) -> SurveyorInfo: return SurveyorInfo( surveyor_code=self._str_val("Surveyor"), name=self._str_val("Name"), title=self._str_val("Title"), tel_number=self._str_val("Tel Number"), survey_reference=self._str_val("Survey Reference"), my_reference=self._opt_str("My Reference"), ) def _extract_property_details(self) -> PropertyDetails: epc_m = re.search( r"Check for the existence of\nan EPC:\n(Yes|No)", self._text ) epc_exists = epc_m.group(1).lower() == "yes" if epc_m else False return PropertyDetails( rdsap_version=self._str_val("RdSAP version"), reference_number=self._str_val("Reference Number"), lodgement_required=self._bool_val("Lodgement Required"), regs_region=self._str_val("Regs Region"), epc_language=self._str_val("EPC Language"), postcode=self._str_val("Postcode"), region=self._str_val("Region"), street=self._str_val("Street"), town=self._str_val("Town"), tenure=self._str_val("Property Tenure"), transaction_type=self._str_val("Transaction Type"), inspection_date=self._date_val("Inspection Date"), process_date=self._date_val("Process date"), epc_exists=epc_exists, uprn=self._opt_str("UPRN"), house_name=self._opt_str("House Name"), house_number=self._opt_str("House No"), locality=self._opt_str("Locality"), county=self._opt_str("County"), ) def _extract_attachment(self) -> str: """Extract the Summary's "attachment" line — the §1.0 built-form descriptor (e.g. "M Mid-Terrace", "D Detached") that sits between the property-type value and the §2.0 section header for HOUSES. Flats DON'T lodge an attachment line in the Elmhurst Summary; the §2.0 Number of Storeys header follows immediately after the "F Flat" property-type value. Detect that case and return "" so the mapper's `built_form` doesn't capture section- header noise. """ m = re.search(r"1\.0 Property type:\n[^\n]+\n([^\n]+)", self._text) if not m: return "" candidate = " ".join(m.group(1).strip().split()) if re.match(r"^\d+\.\d+\s", candidate) or "Number of Storeys" in candidate: return "" return candidate def _floors_from_dimensions_body(self, body: str) -> List[FloorDimension]: """Parse FloorDimension entries from a single bp's §4 body.""" matches = re.findall( r"([A-Za-z ]+Floor):\n([\d.]+)\n([\d.]+)\n([\d.]+)\n([\d.]+)", body, ) return [ FloorDimension( name=name.strip(), area_m2=float(area), room_height_m=float(height), heat_loss_perimeter_m=float(hlp), party_wall_length_m=float(pwl), ) for name, area, height, hlp, pwl in matches ] def _extract_dimensions(self) -> BuildingPartDimensions: """Main-property dimensions only. Extensions are picked up by `_extract_extensions`.""" dim_type = self._str_val("Dimension type") section = self._between("4.0 Dimensions:", "5.0 Conservatory:") bp_chunks = self._split_section_by_bp(section) main_body = bp_chunks[0][1] if bp_chunks else section return BuildingPartDimensions( dimension_type=dim_type, floors=self._floors_from_dimensions_body(main_body), ) def _wall_details_from_lines(self, lines: List[str]) -> WallDetails: thickness_raw = self._local_val(lines, "Wall Thickness") thickness_mm = ( int(thickness_raw.split()[0]) if thickness_raw else None ) # Composite / retrofit insulation thickness — Summary §7.0 # writes the value on the line pair "Insulation Thickness" / # "100 mm" when a composite filled-cavity-plus-external (or # equivalent) wall is lodged. The "Insulation Thickness" label # is local-scoped inside the §7 block so it does not collide # with the §8 Roofs / §9 Floors blocks. None when the PDF # omits the line (no retrofit lodged). ins_thickness_raw = self._local_val(lines, "Insulation Thickness") insulation_thickness_mm = self._parse_thickness_mm(ins_thickness_raw) return WallDetails( wall_type=self._local_str(lines, "Type"), insulation=self._local_str(lines, "Insulation"), thickness_unknown=self._local_bool(lines, "Wall Thickness Unknown"), u_value_known=self._local_bool(lines, "U-value Known"), party_wall_type=self._local_str(lines, "Party Wall Type"), thickness_mm=thickness_mm, insulation_thickness_mm=insulation_thickness_mm, alternative_walls=self._alternative_walls_from_lines(lines), # Summary §7 lodges the per-BP "Curtain Wall Age" line only # when `Type: CW Curtain Wall`. Per RdSAP 10 §5.18 (PDF # p.48) this drives the curtain-wall U-value (Post 2023 → # 1.4; Pre 2023 → 2.0) independent of the dwelling-wide # age band. Use `_local_val` (Optional[str]) so absent # lines surface as None, not the empty-string sentinel # `_local_str` returns. curtain_wall_age=self._local_val(lines, "Curtain Wall Age"), ) def _alternative_walls_from_lines(self, lines: List[str]) -> List[AlternativeWall]: """Parse up to two §7 "Alternative Wall N" sub-area lodgements. The Elmhurst Summary PDF lays them out as a contiguous block of prefixed labels ("Alternative Wall 1 Area", "Alternative Wall 1 Type", …); we read each numbered slot independently and drop slots whose Area is missing/zero.""" result: List[AlternativeWall] = [] for n in (1, 2): area_raw = self._local_val(lines, f"Alternative Wall {n} Area") if not area_raw: continue try: area = float(area_raw.split()[0]) except (ValueError, IndexError): continue if area <= 0: continue thickness_raw = self._local_val(lines, f"Alternative Wall {n} Thickness") thickness_mm = self._parse_thickness_mm(thickness_raw) result.append(AlternativeWall( area_m2=area, wall_type=self._local_str(lines, f"Alternative Wall {n} Type"), insulation=self._local_str(lines, f"Alternative Wall {n} Insulation"), thickness_unknown=self._local_bool( lines, f"Alternative Wall {n} Thickness Unknown" ), thickness_mm=thickness_mm, u_value_known=self._local_bool( lines, f"Alternative Wall {n} U-value Known" ), # RdSAP10 §5.8 + Table 14: dry-lined uninsulated wall adds # R = 0.17 m²K/W to base U. Cohort fixture: cert 7700 # Alt 1 "CavityWallPlasterOnDabs" lodges Dry-lining: Yes → # U = 1/(1/1.5 + 0.17) ≈ 1.20. dry_lined=self._local_bool( lines, f"Alternative Wall {n} Dry-lining" ), )) return result def _extract_walls(self) -> WallDetails: section = self._between("7.0 Walls:", "8.0 Roofs:") bp_chunks = self._split_section_by_bp(section) main_body = bp_chunks[0][1] if bp_chunks else section lines = [l.strip() for l in main_body.splitlines() if l.strip()] return self._wall_details_from_lines(lines) @staticmethod def _parse_thickness_mm(raw: Optional[str]) -> Optional[int]: """Parse an Elmhurst "Insulation Thickness" cell ("100 mm", "400+ mm") to integer mm. The bucket-cap "400+ mm" (Table 17/18 max tabulated row) carries a trailing "+" that a bare `.split()[0].isdigit()` test rejects — strip to the leading digits so the cap parses through to the cascade with its numeric value (simulated case 5: roof "400+ mm" was silently dropped → u_roof fell back to the age-J default 0.16 instead of the 300mm+ value 0.11). Returns None when the cell is absent or carries no leading number ("As Built", "N None").""" if not raw: return None match = re.match(r"\d+", raw.strip()) return int(match.group()) if match else None def _roof_details_from_lines(self, lines: List[str]) -> RoofDetails: thickness_raw = self._local_val(lines, "Insulation Thickness") thickness_mm = self._parse_thickness_mm(thickness_raw) insulation = self._local_str(lines, "Insulation") # The Summary PDF omits the "Insulation Thickness" line entirely # when no retrofit insulation is lodged (e.g. "Insulation: N None" # on 000516). Treat that case as 0 mm so the cascade picks Table # 16 row 0 (U=2.30) rather than the age-band default — the # surveyor explicitly recorded "None". if thickness_mm is None and insulation.split(" ", 1)[0] == "N": thickness_mm = 0 return RoofDetails( roof_type=self._local_str(lines, "Type"), insulation=insulation, u_value_known=self._local_bool(lines, "U-value Known"), insulation_thickness_mm=thickness_mm, ) def _extract_roof(self) -> RoofDetails: section = self._between("8.0 Roofs:", "8.1 Rooms in Roof:") bp_chunks = self._split_section_by_bp(section) main_body = bp_chunks[0][1] if bp_chunks else section lines = [l.strip() for l in main_body.splitlines() if l.strip()] return self._roof_details_from_lines(lines) def _floor_details_from_lines(self, lines: List[str]) -> FloorDetails: u_val_raw = self._local_val(lines, "Default U-value") default_u = float(u_val_raw) if u_val_raw else None # RdSAP 10 §5.13 Table 20 — retro-fitted upper floors lodge an # "Insulation Thickness: NNN mm" cell so the cascade can route # via the per-thickness column. Mirror of the §8 roof extractor # at `_roof_details_from_lines`. thickness_raw = self._local_val(lines, "Insulation Thickness") thickness_mm = self._parse_thickness_mm(thickness_raw) return FloorDetails( location=self._local_str(lines, "Location"), floor_type=self._local_str(lines, "Type"), insulation=self._local_str(lines, "Insulation"), u_value_known=self._local_bool(lines, "U-value Known"), default_u_value=default_u, insulation_thickness_mm=thickness_mm, ) def _extract_floor(self) -> FloorDetails: section = self._between("9.0 Floors:", "10.0 Doors:") bp_chunks = self._split_section_by_bp(section) main_body = bp_chunks[0][1] if bp_chunks else section lines = [l.strip() for l in main_body.splitlines() if l.strip()] return self._floor_details_from_lines(lines) def _extract_door_u_value(self) -> Optional[float]: """Read the §10 Doors block's "Average U-value" lodging. Scoped to the §10..§11 slice so the global "U-value" labels in Walls/Roofs/Floors can't shadow the door reading. None when the PDF omits the line (e.g. all doors recorded as uninsulated).""" lines = self._section_lines("10.0 Doors:", "11.0 Windows:") raw = self._local_val(lines, "Average U-value") if not raw: return None try: return float(raw.split()[0]) except (ValueError, IndexError): return None # RIR surface row: ` [ [] # [] ]`. The middle slot # widths vary by surface kind; we match the four leading numerics # robustly (length, height, default_u, u_value) and slot the # remaining textual fields by position. The layout preprocessor # collapses multi-space-separated cells into single newlines, so # each row in the dump occupies multiple lines per cell. _RIR_SURFACE_NAMES: tuple[str, ...] = ( "Flat Ceiling 1", "Flat Ceiling 2", "Stud Wall 1", "Stud Wall 2", "Slope 1", "Slope 2", "Gable Wall 1", "Gable Wall 2", "Common Wall 1", "Common Wall 2", ) def _extract_room_in_roof( self, main_dim_body: str, age_band_text: str ) -> Optional[RoomInRoof]: """Parse the §8.1 Rooms in Roof block for the Main bp.""" section = self._between("8.1 Rooms in Roof:", "9.0 Floors:") bp_chunks = self._split_section_by_bp(section) if section.strip() else [] main_body = bp_chunks[0][1] if bp_chunks else "" # Age band from §3: "Main Prop. Room(s) in Roof H 1991-1995" age_m = re.search( r"Main Prop\. Room\(s\) in Roof\s+([A-M] [^\n]+)", age_band_text ) age_band = age_m.group(1).strip() if age_m else None return self._room_in_roof_from_bodies( dim_body=main_dim_body, rir_body=main_body, age_band=age_band, ) def _room_in_roof_from_bodies( self, dim_body: str, rir_body: str, age_band: Optional[str], ) -> Optional[RoomInRoof]: """Parse a single-BP Room(s) in Roof from the §4 dimension body (floor area) and §8.1 construction body (assessment + surfaces). Used for both Main and each extension — extensions get their own per-BP slice of §4 and §8.1 + the per-extension age band from §3's "th Ext. Room(s) in Roof " line. """ m = re.search(r"Room\(s\) in Roof:\s+(\d+(?:\.\d+)?)", dim_body) if m is None: return None floor_area = float(m.group(1)) if floor_area <= 0: return None if not rir_body.strip() or "Room in roof type" not in rir_body: # §4 lodged an RR area but §8.1 has no construction details # for this BP — surface as a partial RR so the cascade can # still attribute the floor area to TFA. Empty surfaces # tuple is the sentinel the mapper consumes. return RoomInRoof( floor_area_m2=floor_area, construction_age_band=age_band, assessment="", surfaces=[], ) lines = [l.strip() for l in rir_body.splitlines() if l.strip()] assessment_idx = next( (i for i, l in enumerate(lines) if l == "Assessment"), None ) assessment = ( lines[assessment_idx + 1] if assessment_idx is not None and assessment_idx + 1 < len(lines) else "" ) surfaces: List[RoomInRoofSurface] = [] for name in self._RIR_SURFACE_NAMES: try: idx = lines.index(name) except ValueError: continue surfaces.append(self._parse_rir_surface_row(name, lines, idx)) return RoomInRoof( floor_area_m2=floor_area, construction_age_band=age_band, assessment=assessment, surfaces=surfaces, ) _RIR_NUMERIC_RE = re.compile(r"^-?\d+(?:\.\d+)?$") # Elmhurst insulation cell formats: "100 mm", "125 mm", ... and the # bucket-cap "400+ mm" (Table 17 max tabulated row). Optional trailing # "+" allows the bucket-cap to parse through to the cascade with the # same numeric value. _RIR_INSULATION_THICKNESS_RE = re.compile(r"^\d+\+?\s*mm$") def _parse_rir_surface_row( self, name: str, lines: List[str], idx: int ) -> RoomInRoofSurface: """One RR surface row spans the name line followed by ~6-9 tokens depending on which optional cells the surveyor filled. The token order is stable: length, height, [insulation], [ins_type], [gable_type], default_u, u_known, u_value. Numeric cells (length, height, default_u, u_value) are the anchor; everything else is slotted into the appropriate textual field.""" # Walk forward until either we exhaust the cell budget or hit # the next RIR row's name marker — the layout dump puts each # numeric / textual cell on its own line and we can't tell # the LAST cell of THIS row from the FIRST cell of the next # without that signal. tokens: List[str] = [] scan_end = min(idx + 10, len(lines)) for j in range(idx + 1, scan_end): if self._is_next_rir_row(lines[j]): break tokens.append(lines[j]) # First two numerics = length, height length = float(tokens[0]) if tokens and self._RIR_NUMERIC_RE.match(tokens[0]) else 0.0 height = float(tokens[1]) if len(tokens) > 1 and self._RIR_NUMERIC_RE.match(tokens[1]) else 0.0 # Last numeric is u_value; preceding "Yes"/"No" is u_value_known; # the numeric before that is default_u. # Walk from the end backwards looking for the u_value, then known # flag, then default_u. u_value = 0.0 u_value_known = False default_u: Optional[float] = None # The known/default_u tail is fairly stable; collect the trailing # tokens and slot by position. The "known" token is "No" or "Yes". rev = list(reversed(tokens[2:])) # rev[0] = u_value, rev[1] = u_value_known, rev[2] = default_u if len(rev) >= 1 and self._RIR_NUMERIC_RE.match(rev[0]): u_value = float(rev[0]) if len(rev) >= 2 and rev[1] in ("Yes", "No"): u_value_known = rev[1] == "Yes" if len(rev) >= 3 and self._RIR_NUMERIC_RE.match(rev[2]): default_u = float(rev[2]) # Middle textual cells: insulation, insulation_type, gable_type. # Drop the leading length/height (already consumed) and the # trailing 3 tokens (default_u, known, u_value). middle = tokens[2:-3] if len(tokens) >= 5 else [] insulation = "" insulation_type: Optional[str] = None gable_type: Optional[str] = None for t in middle: if self._RIR_INSULATION_THICKNESS_RE.match(t) or t in ("As Built", "None", "Unknown"): # "Unknown" is the third spec-valid thickness token # (RdSAP 10 §3.10.1 PDF p.24: "default U-values apply # when the roof room insulation is 'as built' or # 'unknown'"). Mapper routes "Unknown" to # insulation_thickness_mm=None so the cascade falls # back to Table 18 col 4 default. if not insulation: insulation = t elif t in ("Mineral or EPS", "PUR", "PIR", "PUR or PIR"): # Summary §8.1 lodges the rigid-foam column as the # disjunction "PUR or PIR" when the assessor doesn't # distinguish between the two; the mapper canonicalises # all three forms to SAP10 "rigid_foam" (cascade Table # 17 col (b)). insulation_type = t elif t in ( "Party", "Sheltered", "Exposed", "Connected", "Connected to heated space", ): gable_type = t return RoomInRoofSurface( name=name, length_m=length, height_m=height, insulation=insulation, insulation_type=insulation_type, gable_type=gable_type, default_u_value=default_u, u_value_known=u_value_known, u_value=u_value, ) def _is_next_rir_row(self, line: str) -> bool: return line in self._RIR_SURFACE_NAMES def _extract_extensions(self) -> List[ExtensionPart]: """Collect non-Main building parts. Cross-references the §4, §7, §8, §9 per-bp subsections by extension name. "As Main: Yes" within a section body inherits the main bp's data for that section; otherwise the section body is parsed in isolation.""" # Gather per-section chunks once. dim_section = self._between("4.0 Dimensions:", "5.0 Conservatory:") wall_section = self._between("7.0 Walls:", "8.0 Roofs:") roof_section = self._between("8.0 Roofs:", "8.1 Rooms in Roof:") rir_section = self._between("8.1 Rooms in Roof:", "9.0 Floors:") floor_section = self._between("9.0 Floors:", "10.0 Doors:") dim_type = self._str_val("Dimension type") dim_chunks = dict(self._split_section_by_bp(dim_section)) wall_chunks = dict(self._split_section_by_bp(wall_section)) roof_chunks = dict(self._split_section_by_bp(roof_section)) rir_chunks = dict(self._split_section_by_bp(rir_section)) if rir_section.strip() else {} floor_chunks = dict(self._split_section_by_bp(floor_section)) # Per-extension RR age bands from §3: "1st Ext. Room(s) in Roof I 1996-2002". ext_rir_age_re = re.compile( r"(\d+(?:st|nd|rd|th))\s+Ext\.\s+Room\(s\) in Roof\s+([A-M] [^\n]+)", re.MULTILINE, ) ext_rir_age_bands: dict[str, str] = { f"{m.group(1)} Extension": m.group(2).strip() for m in ext_rir_age_re.finditer(self._text) } main_walls = self._extract_walls() main_roof = self._extract_roof() main_floor = self._extract_floor() # Per-bp age-band lookup. Section 3 contains lines like # "1st Extension B 1900-1929" — the band sits after the name. age_band_re = re.compile( r"^(\d+(?:st|nd|rd|th) Extension)\s+([A-M] [^\n]+)$", re.MULTILINE, ) age_bands = {m.group(1): m.group(2).strip() for m in age_band_re.finditer(self._text)} # Collect names in document order from the dimensions section # (excluding Main Property). names = [ name for name, _ in self._split_section_by_bp(dim_section) if name != "Main Property" ] extensions: List[ExtensionPart] = [] for name in names: dim_body = dim_chunks.get(name, "") wall_body = wall_chunks.get(name, "") roof_body = roof_chunks.get(name, "") floor_body = floor_chunks.get(name, "") wall_lines = [l.strip() for l in wall_body.splitlines() if l.strip()] roof_lines = [l.strip() for l in roof_body.splitlines() if l.strip()] floor_lines = [l.strip() for l in floor_body.splitlines() if l.strip()] if self._local_bool(wall_lines, "As Main Wall"): # Alternative walls live in the extension's own chunk # even when the main wall fields are inherited; merge # them into the inherited WallDetails so the bp carries # them through to its SapBuildingPart. walls = WallDetails( wall_type=main_walls.wall_type, insulation=main_walls.insulation, thickness_unknown=main_walls.thickness_unknown, u_value_known=main_walls.u_value_known, party_wall_type=main_walls.party_wall_type, thickness_mm=main_walls.thickness_mm, insulation_thickness_mm=main_walls.insulation_thickness_mm, alternative_walls=self._alternative_walls_from_lines(wall_lines), ) else: walls = self._wall_details_from_lines(wall_lines) roof = main_roof if self._local_bool(roof_lines, "As Main") else self._roof_details_from_lines(roof_lines) floor = main_floor if self._local_bool(floor_lines, "As Main") else self._floor_details_from_lines(floor_lines) rir = self._room_in_roof_from_bodies( dim_body=dim_body, rir_body=rir_chunks.get(name, ""), age_band=ext_rir_age_bands.get(name), ) extensions.append( ExtensionPart( name=name, construction_age_band=age_bands.get(name, ""), dimensions=BuildingPartDimensions( dimension_type=dim_type, floors=self._floors_from_dimensions_body(dim_body), ), walls=walls, roof=roof, floor=floor, room_in_roof=rir, ) ) return extensions def _extract_windows(self) -> List[Window]: # Textract-style pages keep "Permanent\s+Shutters" adjacent in # reading order and the windows table flows as one column-block # the existing token-walker can step through. PDF-derived pages # (Summary PDFs preprocessed from `pdftotext -layout`) break the # header across lines, so this regex misses entirely and the # `_extract_windows_from_layout` fallback below picks them up # by anchoring on the W/H/Area data line. m = re.search( r"Permanent\s+Shutters\n(.*?)Draught Proofing", self._text, re.DOTALL, ) if not m: return self._extract_windows_from_layout() tokens = [t.strip() for t in m.group(1).splitlines() if t.strip()] windows: List[Window] = [] i = 0 while i + 12 < len(tokens): try: width_m = float(tokens[i]) height_m = float(tokens[i + 1]) area_m2 = float(tokens[i + 2]) except (ValueError, IndexError): i += 1 continue i += 3 # Collect glazing type tokens until frame_factor (0 < v ≤ 1.0) glazing_parts: List[str] = [] while i < len(tokens): try: v = float(tokens[i]) if 0.0 < v <= 1.0: break glazing_parts.append(tokens[i]) except ValueError: glazing_parts.append(tokens[i]) i += 1 # If last glazing token is a single word (no spaces, not numeric) it's the frame_type frame_type: Optional[str] = None if glazing_parts and " " not in glazing_parts[-1] and not glazing_parts[-1].replace(".", "").isdigit(): frame_type = glazing_parts.pop() glazing_type = " ".join(glazing_parts).strip() if i >= len(tokens): break frame_factor = float(tokens[i]); i += 1 # Consume glazing_gap if present ("mm" token, possibly multi-token e.g. "16 mm or more") glazing_gap: Optional[str] = None if i < len(tokens) and "mm" in tokens[i]: gap_parts = [tokens[i]]; i += 1 while i < len(tokens) and tokens[i].lower() in {"or", "more"}: gap_parts.append(tokens[i]); i += 1 glazing_gap = " ".join(gap_parts) building_part = tokens[i]; i += 1 location = tokens[i]; i += 1 orientation = tokens[i]; i += 1 data_source = tokens[i]; i += 1 u_value = float(tokens[i]); i += 1 g_value = float(tokens[i]); i += 1 draught_proofed = tokens[i].lower() == "yes"; i += 1 permanent_shutters = tokens[i]; i += 1 windows.append( Window( width_m=width_m, height_m=height_m, area_m2=area_m2, glazing_type=glazing_type, frame_factor=frame_factor, building_part=building_part, location=location, orientation=orientation, data_source=data_source, u_value=u_value, g_value=g_value, draught_proofed=draught_proofed, permanent_shutters=permanent_shutters, frame_type=frame_type, glazing_gap=glazing_gap, ) ) return windows # Anchors used by the layout-style window parser. The W/H/Area anchor # is sometimes followed by a joined glazing-type phrase on the same # line (e.g. '1.22 1.76 2.15 Double pre 2002'); the optional 4th # capture surfaces that text so the parser can use it instead of a # separately-laid-out prefix line. _WIDTH_HEIGHT_AREA_RE = re.compile( r"^(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)(?:\s+(\S.*?))?$" ) _MANUFACTURER_RE = re.compile(r"^(Manufacturer|Default)\s+(\d+\.\d+)$") _ORIENTATION_TOKENS = frozenset({ "North", "South", "East", "West", "NE", "NW", "SE", "SW", }) _BP_INLINE_TOKENS = frozenset({"Main"}) # "Extension" only appears as suffix # A room-in-roof window (rooflight) lodges its §11 "Location" cell as # "Roof of Room in Roof", which the layout preprocessor wraps onto two # tokens ("Roof of Room" in the prefix block, "in Roof" in the suffix). # Detected so the window routes to a roof window (worksheet (27a)) # and the tokens don't leak into the glazing-type phrase. _ROOF_OF_ROOM_LOCATION_TOKENS = frozenset({"Roof of Room", "in Roof"}) # The Elmhurst Summary PDF lodges each window's glazing-type as a # capitalised phrase like "Double between 2002" / "Double with unknown" # / "Single" / "Triple" / "Secondary". The first token of that phrase # marks the start of a new window's prefix block in the layout dump, # which is the only stable signal partitioning one window's suffix # from the next window's prefix. _GLAZING_TYPE_PREFIX_WORDS = frozenset({ "Single", "Double", "Triple", "Secondary", }) def _extract_windows_from_layout(self) -> List[Window]: """Fallback window parser for Summary PDFs preprocessed from `pdftotext -layout`. Each window has two stable anchors: a "W H Area" line and a "Manufacturer " line a few lines further down. Everything between holds frame_type, frame_factor, and a variable mix of glazing_gap, building_part, location, and orientation (depending on which fields the surveyor lodged); everything around the window holds glazing- type/building-part/orientation prefix/suffix tokens split by the layout preprocessor. """ m = re.search( r"11\.0 Windows:(.*?)(Draught Proofing|12\.0 Ventilation)", self._text, re.DOTALL, ) if not m: return [] lines = m.group(1).splitlines() # Locate all (data_line, manufacturer_line) pairs in document # order. Each pair is one window. data_anchors: List[tuple[int, re.Match[str]]] = [] for i, line in enumerate(lines): anchor = self._WIDTH_HEIGHT_AREA_RE.match(line.strip()) if anchor is not None: data_anchors.append((i, anchor)) windows: List[Window] = [] for k, (data_idx, anchor) in enumerate(data_anchors): manuf_idx = self._find_manufacturer_after(lines, data_idx) if manuf_idx is None: continue prev_manuf_idx = ( self._find_manufacturer_after(lines, data_anchors[k - 1][0]) if k > 0 else None ) next_data_idx = ( data_anchors[k + 1][0] if k + 1 < len(data_anchors) else len(lines) ) # Partition the cross-window gap between this window's suffix # and the next window's prefix on the first glazing-type-start # token (Single/Double/Triple/Secondary). The same boundary # is used symmetrically — current window's `after_end` = next # window's `before_start` — so prefix tokens of W_{k+1} never # get attributed as suffix of W_k (which was the bug producing # orientation='East-South' for windows where 'South' actually # belonged to the next row). before_start = ( self._partition_after_manuf(lines, prev_manuf_idx, data_idx) if prev_manuf_idx is not None else 0 ) after_end = self._partition_after_manuf(lines, manuf_idx, next_data_idx) try: window = self._parse_window_from_anchors( lines=lines, data_idx=data_idx, manuf_idx=manuf_idx, anchor=anchor, before_start=before_start, after_end=after_end, ) except (ValueError, IndexError): continue if window is not None: windows.append(window) return windows def _find_manufacturer_after(self, lines: List[str], data_idx: int) -> Optional[int]: for j in range(data_idx + 1, min(data_idx + 12, len(lines))): if self._MANUFACTURER_RE.match(lines[j].strip()): return j return None _FRAME_TYPE_AND_FACTOR_RE = re.compile(r"^(\S+(?:\s+\S+)*?)\s+(\d\.\d+)$") _FRAME_FACTOR_ONLY_RE = re.compile(r"^(\d\.\d+)$") def _parse_frame_type_and_factor( self, lines: List[str], data_idx: int ) -> tuple[str, Optional[float], int]: """Return `(frame_type, frame_factor, middle_start_idx)` from the lines immediately after the data anchor. Layouts vary: (a) "PVC" on data+1, "0.70" on data+2 — the original 000474 shape; (b) "Wood 0.70" on data+1 — joined-cell variant from 000487 and 000516 first-row windows; (c) "0.70" alone on data+1 (no frame_type word at all) — seen in 000487's subsequent windows where the preprocessor dropped the frame-type column. frame_type is recovered downstream from glazing-type defaults or left empty.""" first = lines[data_idx + 1].strip() combined = self._FRAME_TYPE_AND_FACTOR_RE.match(first) if combined is not None: return combined.group(1), float(combined.group(2)), data_idx + 2 factor_only = self._FRAME_FACTOR_ONLY_RE.match(first) if factor_only is not None: return "", float(factor_only.group(1)), data_idx + 2 if data_idx + 2 >= len(lines): return first, None, data_idx + 2 frame_type = first try: frame_factor = float(lines[data_idx + 2].strip()) except ValueError: return frame_type, None, data_idx + 3 return frame_type, frame_factor, data_idx + 3 def _partition_after_manuf( self, lines: List[str], manuf_idx: int, next_data_idx: int ) -> int: """Return the exclusive upper bound for this window's suffix block (and the inclusive lower bound for the next window's prefix block). After the manufacturer line come 3 fixed tokens (g_value, draught, shutters); the variable suffix lines start at manuf+4 and run until either (a) the next window's glazing-type-start token (e.g. 'Double between 2002', 'Single', 'Triple ...') or (b) the second orientation token in the gap, whichever comes first. Branch (b) covers layouts where the glazing-type is joined to the data line (no separate prefix line exists), so the only signal of window-transition is the orientation tokens rotating: orient_suffix(k) → orient_prefix(k+1). Falls through to `next_data_idx` when neither marker is present.""" scan_start = manuf_idx + 4 seen_orient = False for j in range(scan_start, next_data_idx): stripped = lines[j].strip() first_word = stripped.split(" ", 1)[0] if first_word in self._GLAZING_TYPE_PREFIX_WORDS: return j if stripped in self._ORIENTATION_TOKENS: if seen_orient: return j seen_orient = True return next_data_idx def _parse_window_from_anchors( self, *, lines: List[str], data_idx: int, manuf_idx: int, anchor: re.Match[str], before_start: int, after_end: int, ) -> Optional[Window]: width = float(anchor.group(1)) height = float(anchor.group(2)) area = float(anchor.group(3)) # Layout-style cell joining sometimes leaves the glazing-type # phrase trailing the W H Area triplet on the same line (e.g. # "1.22 1.76 2.15 Double pre 2002"); when present we pass it # through as `inline_glazing_type` and the composer skips the # would-be glazing-prefix scan. inline_glazing_type = anchor.group(4) if anchor.lastindex and anchor.lastindex >= 4 else None # frame_type and frame_factor immediately follow the data line. # Layout-style cell joining sometimes collapses them onto a # single "Wood 0.70" line; treat both shapes uniformly so the # downstream `middle` slice still starts at the first variable # field (glazing_gap / bp / location / orient). if data_idx + 1 >= len(lines): return None frame_type, frame_factor, middle_start = self._parse_frame_type_and_factor( lines, data_idx ) if frame_factor is None or not 0.0 < frame_factor <= 1.0: return None # Variable-order tokens between frame_factor and Manufacturer. middle = [lines[j].strip() for j in range(middle_start, manuf_idx)] glazing_gap = next((t for t in middle if "mm" in t.lower()), None) # Wall-location lodging. Most rows put "External wall" in # `middle`; alt-wall rows (cert 2636 window-4 / cert 9418 alt- # wall window) put "Alternative wall" in the PRE-data slice # (between the previous window's end and W×H×A). Search both # slices so either layout resolves to the correct location. pre_data = [lines[j].strip() for j in range(before_start, data_idx)] location = ( next((t for t in middle if "wall" in t.lower()), None) or next((t for t in pre_data if "wall" in t.lower()), None) or "External wall" ) bp_inline = next((t for t in middle if t in self._BP_INLINE_TOKENS), None) orient_inline = next( (t for t in middle if t in self._ORIENTATION_TOKENS), None ) # Manufacturer line carries data_source + u_value. manuf_match = self._MANUFACTURER_RE.match(lines[manuf_idx].strip()) if manuf_match is None: return None data_source = manuf_match.group(1) u_value = float(manuf_match.group(2)) # Post-manufacturer: g_value, draught, shutters. if manuf_idx + 3 >= len(lines): return None try: g_value = float(lines[manuf_idx + 1].strip()) except ValueError: return None draught_proofed = lines[manuf_idx + 2].strip().lower() == "yes" permanent_shutters = lines[manuf_idx + 3].strip() # Prefix / suffix tokens (variable count) carry the # glazing-type, building-part, and orientation strings split by # the layout preprocessor. before = [lines[j].strip() for j in range(before_start, data_idx) if lines[j].strip()] after = [lines[j].strip() for j in range(manuf_idx + 4, after_end) if lines[j].strip()] # Room-in-roof windows lodge their location as "Roof of Room in # Roof" (wrapped across the prefix/suffix blocks). Detect it, pull # those tokens out so they don't contaminate the glazing-type # phrase, and override the wall-keyed `location` with the roof-of- # room marker the roof-window classifier keys on. if any( t in self._ROOF_OF_ROOM_LOCATION_TOKENS for t in (*before, *after) ): location = "Roof of Room" before = [t for t in before if t not in self._ROOF_OF_ROOM_LOCATION_TOKENS] after = [t for t in after if t not in self._ROOF_OF_ROOM_LOCATION_TOKENS] glazing_type, building_part, orientation = self._compose_window_descriptors( before=before, after=after, bp_inline=bp_inline, orient_inline=orient_inline, inline_glazing_type=inline_glazing_type, ) return Window( width_m=width, height_m=height, area_m2=area, glazing_type=glazing_type, frame_factor=frame_factor, building_part=building_part, location=location, orientation=orientation, data_source=data_source, u_value=u_value, g_value=g_value, draught_proofed=draught_proofed, permanent_shutters=permanent_shutters, frame_type=frame_type, glazing_gap=glazing_gap, ) def _compose_window_descriptors( self, *, before: List[str], after: List[str], bp_inline: Optional[str], orient_inline: Optional[str], inline_glazing_type: Optional[str] = None, ) -> tuple[str, str, str]: """Re-join the glazing-type / building-part / orientation tokens split by the layout preprocessor. Each is at most 2 fragments (one before the data line, one after); inline tokens in the between-segment win over prefix/suffix fragments.""" # before holds (in document order, possibly): glazing_prefix, # bp_prefix, orient_prefix — bp/orient may be missing. # after holds: glazing_suffix, bp_suffix, orient_suffix — same. prefix = list(before[-3:]) # last 3 lines preceding data suffix = list(after[:3]) def pop_if_orientation(tokens: List[str]) -> Optional[str]: for t in tokens: if t in self._ORIENTATION_TOKENS: tokens.remove(t) return t return None def pop_if_bp_fragment(tokens: List[str]) -> Optional[str]: # Prefix fragments like "1st" / "2nd" — match digit-prefixed # ordinals; suffix fragments are always "Extension". for t in tokens: if re.match(r"^\d+(?:st|nd|rd|th)$", t) or t == "Extension": tokens.remove(t) return t return None orient_prefix_token = pop_if_orientation(prefix) orient_suffix_token = pop_if_orientation(suffix) bp_prefix_frag = pop_if_bp_fragment(prefix) bp_suffix_frag = pop_if_bp_fragment(suffix) # Glazing type: an inline glazing-type captured from the data # line (layout-joined variant) wins; otherwise join the remaining # prefix + suffix fragments. if inline_glazing_type is not None: glazing_type = inline_glazing_type else: # The glazing-type phrase always starts with a glazing-start # word (Single/Double/Triple/Secondary). The FIRST window in # a building part has `before_start = 0`, so its prefix block # reaches back into the wrapped windows-table header; the # third header line's tail tokenises to "value value Proofed # Shutters" (the "U value / g value / Draught Proofed / # Permanent Shutters" column titles) and is neither an # orientation nor a bp fragment, so it survives the pops. # Drop any prefix fragments preceding the glazing-start word # so they don't leak into the glazing type. glazing_start = next( ( idx for idx, frag in enumerate(prefix) if frag.split(" ", 1)[0] in self._GLAZING_TYPE_PREFIX_WORDS ), None, ) glazing_prefix = ( prefix[glazing_start:] if glazing_start is not None else prefix ) glazing_type = " ".join([*glazing_prefix, *suffix]).strip() # Building part: inline token wins; otherwise join prefix + suffix. if bp_inline is not None: building_part = bp_inline else: building_part = " ".join( t for t in (bp_prefix_frag, bp_suffix_frag) if t ).strip() # Orientation: inline token wins for the primary direction; # combine with the opposite-direction fragment when present. primary = orient_inline or orient_prefix_token or "" secondary_candidates = [ t for t in (orient_prefix_token, orient_suffix_token) if t and t != primary ] if primary and secondary_candidates: orientation = f"{primary}-{secondary_candidates[0]}" else: orientation = primary return glazing_type, building_part, orientation def _extract_ventilation(self) -> VentilationAndCooling: # SAP 10.2 §2 (17a) "Air permeability value, AP4". Scoped to # §12.2..§13.0 so the per-window U-values + door U-values can't # shadow the float read. Absent when `pressure_test_method != # "Pulse"` (the modal cohort lodgement). pressure_lines = self._section_lines( "12.2 Air Pressure Test", "13.0 Lighting" ) ap4_raw = self._local_val(pressure_lines, "Pressure Test Result (AP4)") air_permeability_ap4_m3_h_m2: Optional[float] = None if ap4_raw: try: air_permeability_ap4_m3_h_m2 = float(ap4_raw.split()[0]) except (ValueError, IndexError): air_permeability_ap4_m3_h_m2 = None # Summary §12.1 "Mechanical Ventilation Type" — scoped to §12.1 # body so the global "Type" labels in §14 / §15 can't shadow it. mv_lines = self._section_lines( "12.1 Mechanical Ventilation", "12.2 Air Pressure Test" ) mv_type_raw = self._local_val(mv_lines, "Mechanical Ventilation Type") mechanical_ventilation_type = ( " ".join(mv_type_raw.split()) if mv_type_raw else None ) # SAP 10.2 §2.6.4 + Table 4f line (230a) — MEV PCDB lookup # inputs. Cert lodges PCDF index, wet-rooms count, ducting # type, and whether the installation was approved. mev_pcdf_raw = self._local_val(mv_lines, "MV PCDF Reference Number") mev_pcdf_reference = ( int(mev_pcdf_raw) if mev_pcdf_raw and mev_pcdf_raw.isdigit() else None ) wet_rooms_raw = self._local_val(mv_lines, "Wet Rooms") wet_rooms_count = ( int(wet_rooms_raw) if wet_rooms_raw and wet_rooms_raw.isdigit() else None ) duct_type_raw = self._local_val(mv_lines, "Duct Type") duct_type = duct_type_raw if duct_type_raw else None approved_raw = self._local_val(mv_lines, "Approved Installation") approved_installation = ( None if approved_raw is None else approved_raw.strip().lower() == "yes" ) return VentilationAndCooling( open_chimneys_count=self._int_val("No. of open chimneys"), open_flues_count=self._int_val("No. of open flues"), open_chimneys_closed_fire_count=self._int_val( "No. of open chimneys/open flues attached to closed fire" ), solid_fuel_boiler_flues_count=self._int_val( "No. of flues attached to solid fuel boiler" ), other_heater_flues_count=self._int_val( "No. of open flues attached to other heater" ), blocked_chimneys_count=self._int_val("No. of blocked chimneys"), extract_fans_count=self._int_val("No. of intermittent extract fans"), passive_vents_count=self._int_val("No. of passive vents"), flueless_gas_fires_count=self._int_val("No. of flueless gas fires"), fixed_space_cooling=self._bool_val("Fixed Space Cooling"), draught_lobby=self._str_val("Draught Lobby"), mechanical_ventilation=self._bool_val("Mechanical Ventilation"), pressure_test_method=self._str_val("Test Method"), air_permeability_ap4_m3_h_m2=air_permeability_ap4_m3_h_m2, mechanical_ventilation_type=mechanical_ventilation_type, mechanical_ventilation_pcdf_reference=mev_pcdf_reference, wet_rooms_count=wet_rooms_count, duct_type=duct_type, approved_installation=approved_installation, ) def _extract_lighting(self) -> Lighting: led_cfl_count_known = self._bool_val("Number of LED and CFL Known") return Lighting( total_bulbs=self._int_val("Total number of bulbs"), led_cfl_count_known=led_cfl_count_known, led_count=self._int_val("Number of LED lights"), cfl_count=self._int_val("Number of CFL lights"), incandescent_count=self._int_val("Total number of incandescents"), low_energy_count=( 0 if led_cfl_count_known else self._int_val("Total number of Low Energy") ), ) def _extract_main_heating(self) -> MainHeating: # Community-heated dwellings (e.g. SAP code 301 "Community heating # scheme" per SAP10.2 Table 4a category 6) and "no system" certs # (SAP code 699 "Electric heaters assumed where no system lodged") # lodge §14.0 Main Heating1 directly followed by §14.1 Community # Heating/Heat Network rather than §14.1 Main Heating2 — there is # no second main system on a community-heated dwelling. Close the # §14.0 block at whichever §14.1 form appears first so every # Summary shape surfaces the SAP code. lines = self._section_lines_first_end( "14.0 Main Heating1", ("14.1 Main Heating2", "14.1 Community Heating"), ) pct_raw = self._local_val(lines, "Percentage of Heat") pct = int(pct_raw.split()[0]) if pct_raw else 0 # §14.0 "Main Heating SAP Code" identifies Main 1 by SAP 10.2 # Table 4a code (e.g. 224 = "Air source heat pump, 2013 or # later"). PCDB-boiler certs leave this empty / lodge "0" — the # PCDB index in `PCDF boiler Reference` is the identifier in # that case. Treat 0 (or absent) as None so the mapper can # distinguish "no SAP code lodged" from a real Table 4a code. sap_code_raw = self._local_val(lines, "Main Heating SAP Code") main_heating_sap_code: Optional[int] = None if sap_code_raw is not None: head = sap_code_raw.split()[0] if sap_code_raw.split() else "" if head.isdigit(): v = int(head) main_heating_sap_code = v if v > 0 else None # The "Secondary Heating SapCode" key is lodged inside §14.1 Main # Heating2 — Elmhurst uses the Main-2 block to also carry the # cert's secondary heating system (when one exists). Look for it # in that section; absence (or "0") means no secondary lodged. secondary_lines = self._section_lines( "14.1 Main Heating2", "14.1 Community Heating" ) secondary_raw = self._local_val(secondary_lines, "Secondary Heating SapCode") secondary_code = ( int(secondary_raw) if secondary_raw is not None and secondary_raw.isdigit() and int(secondary_raw) > 0 else None ) main_heating_2 = self._extract_main_heating_2() community_heating = self._extract_community_heating() return MainHeating( heat_emitter=self._local_str(lines, "Heat Emitter"), fuel_type=self._local_str(lines, "Fuel Type"), flue_type=self._local_str(lines, "Flue Type"), fan_assisted_flue=self._local_bool(lines, "Fan Assisted Flue"), design_flow_temperature=self._local_str(lines, "Design flow temperature"), heating_controls_ees=self._local_str(lines, "Main Heating Controls EES"), heating_controls_sap=self._local_str(lines, "Main Heating Controls Sap"), percentage_of_heat=pct, pcdf_boiler_reference=self._local_val(lines, "PCDF boiler Reference"), heat_pump_age=self._local_val(lines, "Heat pump age"), main_heating_sap_code=main_heating_sap_code, main_heating_ees=self._local_str(lines, "Main Heating EES Code"), secondary_heating_sap_code=secondary_code, main_heating_2=main_heating_2, community_heating=community_heating, ) def _extract_main_heating_2(self) -> Optional[MainHeating2]: """§14.1 Main Heating2 block — returns None when the block is either absent or lodges only placeholder zeros (the PCDB-only convention for "no Main 2"). Otherwise builds a populated `MainHeating2` from the lodged §14.1 fields. Identifier signal: Main 2 is "present" when the §14.1 block lodges either a non-zero PCDB boiler reference (e.g. cert 000565 Main 2 PCDB 15100 Vaillant Ecotec plus 415) OR a non-zero SAP code. PCDB-only certs lodge `PCDF boiler Reference = 0` + `Main Heating SAP Code = 0` for an absent Main 2 (per the two JSON fixtures at `elmhurst_site_notes_{1,2}_text.json`). """ lines = self._section_lines( "14.1 Main Heating2", "14.1 Community Heating", ) pcdf_raw = self._local_val(lines, "PCDF boiler Reference") pcdf_first = ( pcdf_raw.split()[0] if pcdf_raw and pcdf_raw.split() else "" ) has_pcdb_ref = pcdf_first.isdigit() and int(pcdf_first) > 0 sap_code_raw = self._local_val(lines, "Main Heating SAP Code") main_heating_sap_code: Optional[int] = None if sap_code_raw is not None: head = sap_code_raw.split()[0] if sap_code_raw.split() else "" if head.isdigit(): v = int(head) main_heating_sap_code = v if v > 0 else None if not has_pcdb_ref and main_heating_sap_code is None: return None # §14.1's "Percentage of Heat" lodges either "0 %" (with space) # or "0%" (no space). Strip the '%' before int() rather than # split() so both forms parse. pct_raw = self._local_val(lines, "Percentage of Heat") pct = ( int(pct_raw.rstrip("%").strip().split()[0]) if pct_raw and pct_raw.rstrip("%").strip() else 0 ) return MainHeating2( pcdf_boiler_reference=pcdf_raw, fuel_type=self._local_str(lines, "Fuel Type"), flue_type=self._local_str(lines, "Flue Type"), fan_assisted_flue=self._local_bool(lines, "Fan Assisted Flue"), percentage_of_heat=pct, main_heating_sap_code=main_heating_sap_code, ) def _extract_community_heating(self) -> Optional[CommunityHeating]: """§14.1 Community Heating/Heat Network block. Lodged in place of §14.1 Main Heating2 when the §14.0 Main Heating SAP code names a heat-network row (Table 4a 301/302/304). Returns None when no §14.1 Community Heating block is present on the cert. The block carries the Community Heat Source (Boilers / CHP / Heat pump) + Community Fuel Type (Mains Gas / Electricity / Mineral oil or biodiesel / Coal) — together these resolve the Table 12 heat-network fuel code that bills the cascade. See `_resolve_community_heating_fuel_code` in the mapper. """ lines = self._section_lines( "14.1 Community Heating/Heat Network", "14.2 Meters", ) # Absence of the §14.1 Community Heating block: no marker found # → `_section_lines` returns []. Lodgement convention also # leaves Community Heat Source empty on individually-heated # dwellings; treat both as "no community heating present". heat_source = self._local_str(lines, "Community Heat Source") if not lines or not heat_source: return None return CommunityHeating( heating_type=self._local_str(lines, "Heating Type"), pcdf_boiler_reference=self._local_val(lines, "PCDF Boiler Reference"), community_heat_source=heat_source, community_fuel_type=self._local_str(lines, "Community Fuel Type"), heating_controls_ees=self._local_str(lines, "Heating Controls EES"), heating_controls_sap=self._local_str(lines, "Heating Controls SAP"), chp_fuel_factor=self._local_val(lines, "CHP Fuel Factor"), ) def _extract_meters(self) -> Meters: return Meters( electricity_meter_type=self._str_val("Electricity meter type"), main_gas=self._bool_val("Main gas"), electricity_smart_meter=self._bool_val("Electricity Smart Meter Present"), gas_smart_meter=self._bool_val("Gas Smart Meter Present"), ) def _extract_water_heating(self) -> WaterHeating: # §15.1 lodgings — Summary writes these only when a cylinder # is present. The §15.1 block uses labels ("Cylinder Size", # "Insulated", "Insulation Thickness") that collide with # global occurrences elsewhere ("Insulation Thickness" also # appears in §7 Walls / §8 Roofs); scope the lookups via # `_local_val` against the §15.1..§15.2 slice to disambiguate. cylinder_lines = self._section_lines( "15.1 Hot Water Cylinder", "15.2 Community Hot Water", ) cylinder_size_label = self._local_val( cylinder_lines, "Cylinder Size", ) cylinder_insulation_label = self._local_val( cylinder_lines, "Insulated", ) cylinder_ins_thickness_raw = self._local_val( cylinder_lines, "Insulation Thickness", ) cylinder_insulation_thickness_mm: Optional[int] = None if cylinder_ins_thickness_raw: first = cylinder_ins_thickness_raw.split()[0] if first.isdigit(): cylinder_insulation_thickness_mm = int(first) cylinder_thermostat_raw = self._local_val( cylinder_lines, "Cylinder Thermostat", ) cylinder_thermostat: Optional[bool] = ( cylinder_thermostat_raw.strip().lower() == "yes" if cylinder_thermostat_raw is not None else None ) # Fallback: Elmhurst Summary §16 "Recommendations" block carries # existing fittings as ` (Already installed)` lines. # When §15.1 doesn't lodge "Cylinder Thermostat" directly, treat # the "Cylinder thermostat (Already installed)" recommendation # line as confirmation that the thermostat is present (per # S0380.140 corpus probe — all 41 variants on property 001431 # lodge this in §16 but none in §15.1, so the §15.1-only lookup # returned None and the cascade defaulted `has_cylinder_thermostat # = False`, mis-applying SAP 10.2 Table 2b's ×1.3 "no thermostat" # multiplier). if cylinder_thermostat is None: if "Cylinder thermostat (Already installed)" in self._lines: cylinder_thermostat = True return WaterHeating( water_heating_code=self._str_val("Water Heating Code"), water_heating_sap_code=self._int_val("Water Heating SapCode"), water_heating_fuel_type=self._str_val("Water Heating Fuel Type"), hot_water_cylinder_present=self._bool_val("Hot Water Cylinder Present"), cylinder_size_label=cylinder_size_label, cylinder_insulation_label=cylinder_insulation_label, cylinder_insulation_thickness_mm=cylinder_insulation_thickness_mm, cylinder_thermostat=cylinder_thermostat, ) def _extract_baths_and_showers(self) -> BathsAndShowers: n_baths = self._int_val("Total Number of Baths") n_connected = self._int_val("Number of Baths Connected") # Section-bounded "Connected" lookup. Global `_lines.index` collides # with §3 building-parts elevation flags ("Connected" / "Exposed" / # "Sheltered"), losing the shower roster on multi-extension certs # (cert 000565 lodges 4 extensions and an electric shower; pre-fix # the global match landed on a wall row and the digit-check broke). # `1x.0 Baths and Showers` and `18.0 Flue Gas Heat Recovery System` # are both unique single-occurrence anchors in the Elmhurst Summary # PDF schema. section = self._section_lines( "1x.0 Baths and Showers", "18.0 Flue Gas Heat Recovery System", ) try: idx = section.index("Connected") except ValueError: return BathsAndShowers( number_of_baths=n_baths, number_of_baths_connected=n_connected, showers=[], ) showers: List[Shower] = [] j = idx + 1 while j + 2 <= len(section) - 1: num_line = section[j] if not num_line.isdigit(): break showers.append( Shower( shower_number=int(num_line), outlet_type=section[j + 1], connected=section[j + 2], ) ) j += 3 return BathsAndShowers( number_of_baths=n_baths, number_of_baths_connected=n_connected, showers=showers, ) def _rating_val(self, label: str) -> int: v = self._next_val(label) try: return int(v.split()[-1]) if v else 0 except (ValueError, IndexError): return 0 def _extract_renewables(self) -> Renewables: fghrs_lines = self._section_lines( "18.0 Flue Gas Heat Recovery System", "19.0 Photovoltaic Panel" ) fghrs = self._local_bool(fghrs_lines, "Present") terrain = self._str_val("Terrain Type") hydro_raw = self._next_val("Electricity generated [kWh/year]") hydro = float(hydro_raw) if hydro_raw else 0.0 # RdSAP 10 §11.1 b): the Summary §19.0 may lodge a "% of roof # area" row when the surveyor doesn't capture detailed kWp / # orientation / pitch. `_int_val` returns 0 when the label is # absent (cert lodges detailed pv_arrays instead) — collapse to # None so downstream can distinguish "no PV" from "PV via % # roof area path". pv_pct = self._int_val("Proportion of roof area") # Solar HW collector geometry — Summary §16.0. Only populated # when the cert lodges "Are details known? Yes" in the solar # block. Cert 000565 lodges West / 30° / Modest. When absent # (cert says no, or no solar HW at all) → None and the cascade # falls back to RdSAP 10 §10.11 Table 29 defaults (South / 30° # / Modest). solar_lines = self._section_lines( "16.0 Solar water heating", "17.0 Waste Water Heat Recovery System", ) solar_orientation = self._local_val( solar_lines, "Collector orientation", ) solar_pitch_raw = self._local_val(solar_lines, "Collector elevation") solar_pitch = _parse_solar_pitch_deg(solar_pitch_raw) solar_overshading = self._local_val(solar_lines, "Overshading") return Renewables( solar_water_heating=self._bool_val("Solar Water Heating"), wwhrs_present=self._bool_val("Is WWHRS present in the property?"), flue_gas_heat_recovery_present=fghrs, photovoltaic_panel=self._str_val("Photovoltaic Panel"), export_capable_meter=self._bool_val("Export capable meter"), wind_turbine_present=self._bool_val("Wind turbine present?"), wind_turbines_terrain_type=terrain, hydro_electricity_generated_kwh=hydro, pv_arrays=self._extract_pv_arrays(), pv_percent_roof_area=pv_pct if pv_pct > 0 else None, solar_hw_collector_orientation=solar_orientation, solar_hw_collector_pitch_deg=solar_pitch, solar_hw_overshading=solar_overshading, ) def _extract_pv_arrays(self) -> List[ElmhurstPvArray]: """Parse the Elmhurst Summary §19.0 PV Panel section. Returns one `ElmhurstPvArray` per lodged array, or [] when absent. The Summary's PV block looks like (single-array, e.g. cert 0380): Photovoltaic panel details PV Cells kW Peak Orientation Elevation Overshading 3.00 South-East 45° None Or Little Multi-array (e.g. cert 0350 lodges 2 arrays): ... 1.50 South-East 45° None Or Little 1.50 North-West 45° None Or Little — each array is 4 values in (kW Peak, Orientation, Elevation, Overshading) order. Anchor on "Photovoltaic panel details", skip header lines, then read values in 4-tuples until the section breaks at the next §header or end-of-array tokens (Batteries / Export / Capacity / etc.). """ anchor = "Photovoltaic panel details" try: idx = next(i for i, l in enumerate(self._lines) if l == anchor) except StopIteration: return [] # The header lines after the anchor are: "PV Cells kW Peak # Orientation", "Elevation", "Overshading". Subsequent lines # carry values for one OR MORE arrays. Stop at the next # §-header (a "20.0" or "21.0") or post-PV section tokens # ("Batteries", "Connected to", "Diverter", "Capacity", etc.). header_tokens = {"pv cells", "kw peak", "orientation", "elevation", "overshading"} stop_tokens = { "batteries", "capacity known", "capacity", "connected to the dwelling's meter", "diverter present", "export capable meter", } values: List[str] = [] for line in self._lines[idx + 1:]: stripped = line.strip() if not stripped: continue lower = stripped.lower() if lower in stop_tokens: break # Next §-header (e.g. "20.0 Wind Turbine") closes the block — # match "." so kWp values # like "1.50" don't trip the close. if re.match(r"^\d{1,2}\.\d\s+\w", stripped): break if any(h in lower for h in header_tokens): continue values.append(stripped) # Walk values in 4-tuples; an incomplete trailing tuple is dropped. arrays: List[ElmhurstPvArray] = [] for i in range(0, len(values) - 3, 4): try: kwp = float(values[i]) except ValueError: continue orientation = values[i + 1] # Elevation lodged as "45°" — strip trailing degree symbol. m = re.match(r"^(\d+)", values[i + 2]) if m is None: continue elevation = int(m.group(1)) overshading = values[i + 3] arrays.append(ElmhurstPvArray( peak_power_kw=kwp, orientation=orientation, elevation_deg=elevation, overshading=overshading, )) return arrays def extract(self) -> ElmhurstSiteNotes: emissions_raw = self._next_val("Emissions (t/year)") co2 = float(emissions_raw.split()[0]) if emissions_raw else 0.0 return ElmhurstSiteNotes( surveyor_info=self._extract_surveyor_info(), property_details=self._extract_property_details(), current_sap_rating=self._rating_val("Current SAP rating"), potential_sap_rating=self._rating_val("Potential SAP rating"), current_ei_rating=self._rating_val("Current EI rating"), potential_ei_rating=self._rating_val("Potential EI rating"), co2_emissions_current_t=co2, property_type=self._str_val("1.0 Property type"), attachment=self._extract_attachment(), number_of_storeys=self._int_val("Storeys"), habitable_rooms=self._int_val("Habitable Rooms"), heated_habitable_rooms=self._int_val("Heated Habitable Rooms"), construction_age_band=self._str_val("Main Property"), dimensions=self._extract_dimensions(), has_conservatory=self._bool_val("Is there a conservatory?"), walls=self._extract_walls(), roof=self._extract_roof(), floor=self._extract_floor(), door_count=self._int_val("Total Number of Doors"), insulated_door_count=self._int_val("Number of Insulated Doors"), insulated_door_u_value=self._extract_door_u_value(), windows=self._extract_windows(), draught_proofing_percent=self._int_val("Draught Proofing"), ventilation=self._extract_ventilation(), lighting=self._extract_lighting(), main_heating=self._extract_main_heating(), meters=self._extract_meters(), water_heating=self._extract_water_heating(), baths_and_showers=self._extract_baths_and_showers(), renewables=self._extract_renewables(), extensions=self._extract_extensions(), room_in_roof=self._extract_room_in_roof_from_text(), ) def _extract_room_in_roof_from_text(self) -> Optional[RoomInRoof]: """Convenience wrapper: pulls the Main §4 body + the §3 age-band text once so `_extract_room_in_roof` doesn't need to re-slice the document.""" dim_section = self._between("4.0 Dimensions:", "5.0 Conservatory:") bp_chunks = self._split_section_by_bp(dim_section) main_body = bp_chunks[0][1] if bp_chunks else dim_section return self._extract_room_in_roof(main_body, self._text)