Merge remote-tracking branch 'origin/main' into feature/e2e-runs

# Conflicts:
#	repositories/comparable_properties/epc_comparable_properties_repository.py
#	tests/repositories/comparable_properties/test_epc_comparable_properties_repository.py
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-23 17:07:27 +00:00
commit 4f4ec32e51
29 changed files with 6003 additions and 192 deletions

View file

@ -11,12 +11,199 @@ large, or a new/complex build pattern needed; NEEDS INVESTIGATION.
2020 new-build flat) — full loop proven: eng 77 / elm 78, engine-on-Elmhurst-
inputs 79 (calculator faithful within ~1). Use it to sanity-check the pipeline.
# Files saving instructions
I've noticed we arn't saving the epc.json (from epc api), input summary and worksheets from elmhurst. We shuold be doing that so we can reproduce behaviours quicker and verify. This is important as other memebers of team can help improve the verification. Files should be saved on backend/epc_api/json_samples/real_life_examples/<schema>/uprn_<uprn>/<files>
## Do next — new schemas (need a mapper)
These two were flagged NOT MAPPABLE in the UI (red ✗). Mapper coverage now ADDED
(dedicated per-schema methods in `EpcPropertyDataMapper`; corpus gauge green, 0
new pyright errors). Elmhurst build + pin still pending — run the normal loop.
### 🔧🔍 FOR KHALIM — 100061905751 (cert 2628-3046-6233-4584-2970, SAP-16.3) — mapper fix landed, Elmhurst +5 to review
Surfaced as a PRODUCTION crash (NOT in The 100): `modelling_e2e` property 730259
`predict_epc``EpcComparablePropertiesRepository.candidates_for(postcode)` maps
EVERY cohort cert in one list comprehension, and THIS cert raised
`ValueError: RdSapSchema17_1: missing required field 'insulated_door_count'`,
aborting the whole prediction. 🔧 FIXED generically in `_normalize_sap_schema_16_x`
(`mapper.py`): `setdefault("insulated_door_count", 0)` — door refinement, absent →
"no insulated doors recorded" → 0 (conservative RdSAP). +regression test
`test_16_x_missing_insulated_door_count_defaults_to_zero` (test_from_sap_schema.py);
0 new pyright errors; real-cert accuracy + 16.x suites green.
Cert: END-TERRACE HOUSE 2-storey, band A pre-1900, cavity wall AS-BUILT **no
insulation** (U0.70), pitched 100mm loft, suspended uninsulated floor, mains-gas
COMBI (PCDB 16366 Glow-worm Ultracom 2 35cxi), control 2106 CBE, double glazed,
**2 OPEN FIREPLACES** (chimneys), lodged **secondary** open-fire smokeless fuel
(SAP 631 = Elmhurst RKJ), 3.84m² band-B extension, natural vent, 17% LED, TFA 139.
**eng 57 = lodged 57 EXACTLY** / elm worksheet 52 (+5). Built in Elmhurst end-to-end
(boiler 16366, CBE/2106, 2 open chimneys, secondary RKJ open-fire-in-grate, window
20.57m², party wall geometry GF 6.31/1F 5.14 CF). engine-on-Elmhurst-inputs 53 ≈
worksheet 52 → **calculator faithful**. The +5 decomposes as: ~2 SAP = the documented
16.x reduced-field **party-wall gap** (lodges no party_wall_length → engine models
NONE; Elmhurst forces `Party walls Main 31.02m² × U0.250 = 7.76 W/K`), SCALED UP by
2-storey geometry (party wall area ~31m² vs ~16m² on the single-storey 16.2 pins
100021985993/100090182288 which showed only +2); remainder ~2-3 = reduced-field
build choices (3.84m² extension folded into GF, wall thickness 250 vs lodged 300,
draughtproofing default). build_100061905751.py complete. **NOT yet pinned** — to
review with Khalim (his call per CONTEXT.md): pin engine=lodged 57 like other 16.x,
and/or whether 16.x end-terrace should model a party wall to close the reduced-field
gap. Sample saved under SAP-Schema-16.3/uprn_100061905751 (epc.json + both PDFs).
### 🔧🔍 modelling_e2e production failure sweep (2026-06-23) — cohort resilience + mapper gaps
Investigated the 45 FAILED `modelling_e2e` subtasks (DevAssessmentModelDB, portfolio
796) — 36 property rows, ~25 unique UPRNs. Root cause for most: `predict_epc`
`EpcComparablePropertiesRepository.candidates_for(postcode)` mapped EVERY cohort cert
in one comprehension, so ONE unmappable cohort cert aborted the whole prediction.
**✅ DONE — cohort resilience (skip + report)** so a single bad cert can't sink a
prediction, and the skipped certs surface for follow-up:
- `candidates_for` now skips certs that raise `ValueError` (mapper failures:
missing-field + `UnmappedApiCode`/`UnmappedElmhurstLabel`, both `ValueError`),
recording `SkippedCohortCert(certificate_number, error)` on `repo.skipped` + a
`logger.warning`. Transient API errors (not `ValueError`) still propagate.
- handler surfaces them in the subtask **outputs** (success → `outputs.result.
skipped_unmappable_cohort_certs`; failure → appended to the error message), with
cert numbers, so the gaps can be closed deliberately. +3 tests; 0 new pyright.
Live-verified on BN11 4EP: cohort now builds 35 + records the 1 skip.
**✅ DONE — generic mapper fixes (+regression tests, 0 new pyright):**
- `_normalize_sap_schema_16_x`: `setdefault("insulated_door_count", 0)` (the
original prod crash).
- ⚠️ REVERTED `setdefault("multiple_glazed_proportion", 100)` — it made an
otherwise-unmappable 16.3 cert (0418-3986-7250-2884-7970) join the EPC-prediction
**frozen-fixture donor pool** and tipped near-tie similarity matches, regressing
the component-accuracy gate (`has_hot_water_cylinder` 30→29/36; `door_count`
residual 23→25/36 — both ≫ the gate's 1e-3 float tolerance). The field is ML-only
(SAP calc never reads it) and the cohort skip-and-report path handles the cert,
so we leave it fail-loud instead of synthesising a value. Re-baselining the gate
was rejected (loosens a tighten-only gate for a real regression). For Khalim: if
16.x certs missing `multiple_glazed_proportion` should map, derive it (single→0 /
double→100) rather than a flat default, and re-measure the gate.
- `_with_recorded_performance` co2/consumption/rating: `float(co2)` → crashed on
certs lodging `co2_emissions_current` as a Measurement dict `{'value':3.5,
'quantity':'tonnes per year'}` (16.x cert 2308-4997-7262-0137-9930, surfaced as
`TypeError: float() argument must be ... not 'dict'`). Now coerced via
`_measurement_value` (handles Measurement/dict/number). Verified maps → 3.5.
**⚖️ FOR KHALIM — uncovered mapper gaps (mapper is your domain per CONTEXT.md):**
- **`built_form` missing (SAP-16.0)** — cert 8742-6624-9300-2780-4926 (uprn
10070004512); has `property_type` but no `built_form`. Defaulting it drives
party-wall/exposed-perimeter modelling → a real modelling decision, not a neutral
default. Blocks props 710339, 723589.
- **`window` missing + genuinely sparse (SAP-16.2)** — cert 8257-7539-1649-0633-4992
(uprn 10070009758); `windows` is a DICT not list, AND it omits `door_count`,
`habitable_room_count`, `glazed_area` — fail-loud is likely correct (data
insufficient). Blocks props 733315, 730259.
- **🔍 RdSAP-21.0.0 systematically lacks the ADR-0028 Optional widenings 21.0.1 got.**
Cert 3135-3223-5500-0919-2206 (uprn 100021919725) omits 13 top-level fields
(`wet_rooms_count`, `open_chimneys_count`, `mechanical_ventilation_index_number`,
`led/cfl_fixed_lighting_bulbs_count`, the `mechanical_vent_duct_*`,
`insulated_door_u_value`, `pressure_test_certificate_number`,
`windows_transmission_details`) + `SapWindow` (`pvc_frame`/`glazing_gap`/
`frame_factor`/`window_transmission_details`) — ALL required in 21.0.0 but
`Optional[...] = None` in 21.0.1. NOT applied: widening risks pushing `None` into
`from_rdsap_schema_21_0_0` which may assume non-None (needs mapper-coalescing
review). Recommend aligning RdSapSchema21_0_0 to 21.0.1 (incl. the same Optionals
on insulated_door_count etc.) + reviewing the 21.0.0 mapper. Blocks prop 719897.
(The resilience change already stops all of these from crashing prod.)
Caveat: ~13 other failed props showed no mapper gap on replay — genuinely
not-predictable (empty/insufficient cohort) or already fixed by the
`insulated_door_count` default; cohort replay only checked the first 60 certs/postcode
and skipped rate-limited fetches, so a deeper gap could be missed.
**Also done:** `deployment/terraform/lambda/modelling_e2e/variables.tf`
`maximum_concurrency` default 2 → 4 (per request). Cohort skip-guard widened to
`(ValueError, AttributeError, KeyError, TypeError)` (mapper-shape errors) so a
malformed cert (e.g. PV-as-list AttributeError) is skipped+reported too, not just
missing-field ValueErrors; transient `EpcApiError` (subclasses `Exception`) still
propagates. +regression test.
### 📋 PLAN — close the 8 modelling_e2e mapping gaps (2026-06-23 run, portfolio 796)
The 8 failed prediction targets reduce to **5 distinct mapper-gap classes** (the fix
targets). Per class: fix the mapper GENERICALLY, guard with BOTH the RdSAP-21.0.1
corpus gauge AND the **component-accuracy gate** (`test_component_accuracy_gate.py`
— every newly-mappable cert joins the prediction donor pool and can tip near-tie
similarity, as `multiple_glazed_proportion=100` did: regressed has_hot_water_cylinder
30→29/36 + door_count 23→25/36 ⇒ REVERTED). Then validate the now-mappable cert via
`/expand-sap-accuracy-corpus` on **the cohort cert's UPRN** (below), saving
epc.json + elmhurst_summary.pdf + elmhurst_worksheet.pdf under
`real_life_examples/<schema>/uprn_<uprn>/` and committing them.
- **P1 — `built_form` missing (SAP-16.0)** — HIGHEST impact (5 certs / 3 targets:
10013151061, 100020397529, 100020407755). Cert→UPRN: 8742-…-4926→10070004512,
0004-…-7395→100020407745, 2338-…-5950→100020407732, 9328-…-0924→100020407771
(8904-…-4623 has NO uprn — validate via one of the others). ⚠ built_form IS a
strong prediction similarity feature → highest gate-regression risk: DERIVE it
(from `dwelling_type` text / `property_type`), don't flat-default; re-measure the
gate. Corpus-validate uprn 10070004512.
- **P2 — `photovoltaic_supply` lodged as a LIST → AttributeError (`'list' has no
attribute none_or_no_details`)** — target 22086690 (BN2 9ZN). NEW real shape bug
(not missing-field); escaped the skip net until widened above. Clean fix: coerce
the list shape in the PV mapper (like the Measurement-dict co2 fix). Low gate risk.
⚠ FIRST identify the offending cert in the BN2 9ZN cohort (lookup didn't surface it
in the first 50 — search the full cohort for `photovoltaic_supply` as list).
- **P3 — `window` missing, `windows` is a DICT not list (SAP-16.1)** — cert
9768-…-7974 → uprn 100062614005, target 100061905741. Clean normaliser fix: handle
`windows` as a dict (take it directly) in `_normalize_sap_schema_16_x`. Corpus-
validate uprn 100062614005.
- **P4 — `multiple_glazed_proportion` missing (SAP-16.3)** — cert 0418-…-7970 → uprn
22144943, target 22082258. The flat-100 default REGRESSED the gate (reverted).
DERIVE it (single→0 / double→100 from `multiple_glazing_type` / window desc) so it
doesn't perturb similarity, then re-measure the gate. Corpus-validate uprn 22144943.
- **P5 — RdSAP-21.0.0 systematic ADR-0028 alignment** (`SapWindow.glazing_gap` +
13 top-level fields) — cert 3135-…-2206 → uprn 100021919725, target 100021919718.
Align RdSapSchema21_0_0 Optionals to 21.0.1 + review `from_rdsap_schema_21_0_0`
None-coalescing. Larger/riskier. Corpus-validate uprn 100021919725.
- **P6 — `window` + genuinely sparse (SAP-16.2)** — cert 8257-…-4992 → uprn
10070009758, target 100020401711. Also omits door_count/habitable/glazed_area →
fail-loud is likely CORRECT (insufficient data). DOCUMENT decision; don't force-map.
After all: re-run the modelling_e2e failure sweep to confirm the cohort skips drop
to zero (or only P6-class), and that no new gate regression slipped in.
### ✅ OUTCOMES (2026-06-23, "do all") — 553 tests pass, gate 26/26, 0 new pyright
- **P1 ✅ FIXED**`_normalize_sap_schema_16_x` derives `built_form` from the
dwelling_type text (`_derive_built_form_16x`: Mid-terrace→4 / End-terrace→3 /
Semi-detached→2 / Detached→1; flats → modal 4). built_form is ML-only (SAP calc
never reads it) so SAP-neutral; gate verified insensitive (26/26). 5 SAP-16.0
flat certs now map (8742→SAP 66=lodged, etc.). +2 tests.
- **P2 ✅ FIXED**`photovoltaic_supply` as a measured-array LIST crashed
`from_rdsap_schema_{17_0,17_1,18_0,19_0,20_0_0}` (`'list' has no attribute
none_or_no_details`). All 5 now route through `_map_schema_21_pv`, whose list
branch is made dict-tolerant (`_pv_array_field` reads dict OR dataclass) — they
now CAPTURE the PV arrays like 21.0.x instead of crashing/dropping. cert
6102-…-2292 (uprn 22086693, RdSAP-20.0.0) maps with 2 arrays @1.14kW. +test.
- **P3 ✅ generalised / cert fail-loud**`_normalize_sap_schema_16_x` now handles
`windows` as a dict (not just list). cert 9768 (uprn 100062614005, 16.1) still
fails loud — it ALSO omits door_count/habitable_room_count/glazed_area
(genuinely sparse "High performance glazing" cert) → CORRECT to fail; resilience
skips+reports it.
- **P6 ✅ same as P3** — cert 8257 (uprn 10070009758, 16.2) — identical sparse
"High performance glazing" pattern, fails loud on door_count. Correct.
- **P4 ⚖️ FOR KHALIM (gate decision)** — left `multiple_glazed_proportion` fail-loud.
Re-confirmed: defaulting it (ANY value) still regresses the gate (29/36 +
25/36) because it makes a frozen-fixture cert a prediction donor — the field is
ML-only (not carried to EpcPropertyData), so the regression is donor-pool
composition, NOT the value. Closing this gap needs a GATE RE-BASELINE decision
(whether the donor-pool tip is benign per the #1245 precedent) — your call; I did
not unilaterally loosen the tighten-only gate.
- **P5 🔍 FOR KHALIM (large recursive alignment)** — RdSAP-21.0.0 lacks the
ADR-0028 Optional widenings 21.0.1 has, and the gap is RECURSIVE (not just the
~13 top-level + SapWindow): `SapBuildingPart` (13 fields incl.
construction_age_band/wall_construction/identifier — CORE FABRIC),
`SapEnergySource` (pv_battery_count, wind_turbine_details), MainHeatingDetail,
ShowerOutlets, PvBatteries, Addendum, + top-level low_energy_fixed_lighting_
bulbs_count/multiple_glazed_proportion/suggested_improvements. Aligning to
21.0.1's "everything Optional, tolerate sparse" means the 21.0.0 mapper must
None-coalesce CORE fabric — a modelling decision in your domain. Partial widening
was REVERTED (half-aligned schema is worse). cert 3135 (uprn 100021919725) stays
fail-loud → resilience skips+reports. Recommend: align RdSapSchema21_0_0 fully to
21.0.1 (recursive) + review mapper coalescing, in a dedicated change.
- **Corpus-validation pending** for the 2 now-mappable certs (P1 uprn 10070004512,
P2 uprn 22086693) — Elmhurst build + summary/worksheet + commit.
- [x] 🔧 10096028301 — SAP-Schema-19.1.0 (full-SAP g/f FLAT, band M, combi PCDB 17929, MEV, AP50 3.5) · eng 82 / elm 82 (lodged 85) · PINNED engine 82. 🔧 mapper added: `from_sap_schema_19_1_0`. Built in Elmhurst (boiler 17929 via search, control CBE/2106, water from primary, MEV on, AP50 Blower Door 3.5+cert). Engine EXACTLY matches Elmhurst worksheet (82.11 vs 82); engine-on-Elmhurst-inputs 82.16 ≈ 82 → calculator faithful. 3 vs lodged = measured-U-vs-RdSAP-default + MEV extract-not-recovery (documented). No mapper change beyond coverage.
- [x] 🔧 100021943298 — SAP-Schema-16.1 (g/f FLAT, band B, solid-brick internal, combi PCDB 10328) · eng 76 / elm 75 (lodged 72) · PINNED engine 76. 🔧 mapper added: `from_sap_schema_16_1`. Built in Elmhurst (boiler 10328 via search, control CBE/2106, water from primary, wall insulation thickness Unknown); worksheet 75 → engine within ~1 (tightest agreement, reduced-field). Boiler-select + water-heating + control dialogs all driven via automation (two-step row→Select / cascade + coordinate-OK). No mapper change beyond coverage.

View file

@ -23,6 +23,7 @@ invocations (ADR-0012).
from __future__ import annotations
import dataclasses
import io
import os
from collections.abc import Callable
@ -65,6 +66,7 @@ from applications.modelling_e2e.modelling_e2e_trigger_body import (
)
from repositories.comparable_properties.epc_comparable_properties_repository import (
EpcComparablePropertiesRepository,
SkippedCohortCert,
)
from repositories.geospatial.geospatial_s3_repository import (
GeospatialS3Repository,
@ -100,7 +102,11 @@ logger = setup_logger()
def _get_engine() -> Engine:
global _engine
if _engine is None:
_engine = make_engine(PostgresConfig.from_env(dict(os.environ)))
config = PostgresConfig.from_env(dict(os.environ))
# Reduced pool for Lambda: 32 concurrent containers × 3 connections = 96 max,
# vs the default 3+5=8 which would reach 256+ and exhaust RDS max_connections.
# pool_size=2 covers the simultaneous read_session + UoW session per invocation.
_engine = make_engine(dataclasses.replace(config, pool_size=2, max_overflow=1))
return _engine
@ -139,6 +145,20 @@ def _solar_insights_for(
return None
def _dedupe_skipped(
skipped: list[SkippedCohortCert],
) -> list[SkippedCohortCert]:
"""First occurrence of each skipped cert number (the same cert can appear in
more than one postcode cohort across a batch)."""
seen: set[str] = set()
unique: list[SkippedCohortCert] = []
for cert in skipped:
if cert.certificate_number not in seen:
seen.add(cert.certificate_number)
unique.append(cert)
return unique
def _predict_epc(
*,
property_id: int,
@ -183,7 +203,7 @@ def _predict_epc(
@task_handler(task_source="modelling_e2e", source=Source.PROPERTY)
def handler(body: dict[str, Any], context: Any) -> None:
def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]:
trigger = ModellingE2ETriggerBody.model_validate(body)
property_ids = trigger.property_ids
portfolio_id = trigger.portfolio_id
@ -386,7 +406,25 @@ def handler(body: dict[str, Any], context: Any) -> None:
)
errors.append(property_id)
# Cohort certs the mapper could not consume were skipped (not aborted on)
# so prediction could proceed; surface them — with cert numbers — in the
# subtask outputs so the mapper gaps can be closed later.
skipped_certs: list[dict[str, str]] = [
{"certificate_number": s.certificate_number, "error": s.error}
for s in _dedupe_skipped(comparables_repo.skipped)
]
if skipped_certs:
logger.info(
f"skipped {len(skipped_certs)} unmappable cohort cert(s): "
f"{[s['certificate_number'] for s in skipped_certs]}"
)
if errors:
raise RuntimeError(f"failed property_ids: {errors}")
message = f"failed property_ids: {errors}"
if skipped_certs:
message += f"; skipped_unmappable_cohort_certs: {skipped_certs}"
raise RuntimeError(message)
return {"skipped_unmappable_cohort_certs": skipped_certs} if skipped_certs else None
finally:
read_session.close()

View file

@ -0,0 +1,438 @@
{
"uprn": 22086693,
"roofs": [
{
"description": {
"value": "Pitched, 200 mm loft insulation",
"language": "1"
},
"energy_efficiency_rating": 4,
"environmental_efficiency_rating": 4
},
{
"description": {
"value": "Flat, insulated (assumed)",
"language": "1"
},
"energy_efficiency_rating": 3,
"environmental_efficiency_rating": 3
}
],
"walls": [
{
"description": {
"value": "Cavity wall, as built, no insulation (assumed)",
"language": "1"
},
"energy_efficiency_rating": 2,
"environmental_efficiency_rating": 2
},
{
"description": {
"value": "Cavity wall, as built, insulated (assumed)",
"language": "1"
},
"energy_efficiency_rating": 4,
"environmental_efficiency_rating": 4
}
],
"floors": [
{
"description": {
"value": "Suspended, no insulation (assumed)",
"language": "1"
},
"energy_efficiency_rating": 0,
"environmental_efficiency_rating": 0
},
{
"description": {
"value": "Solid, no insulation (assumed)",
"language": "1"
},
"energy_efficiency_rating": 0,
"environmental_efficiency_rating": 0
}
],
"status": "entered",
"tenure": 2,
"window": {
"description": {
"value": "Fully double glazed",
"language": "1"
},
"energy_efficiency_rating": 3,
"environmental_efficiency_rating": 3
},
"addendum": {
"addendum_numbers": [
8
],
"cavity_fill_recommended": "true"
},
"lighting": {
"description": {
"value": "Low energy lighting in all fixed outlets",
"language": "1"
},
"energy_efficiency_rating": 5,
"environmental_efficiency_rating": 5
},
"postcode": "BN2 9ZN",
"hot_water": {
"description": {
"value": "From main system",
"language": "1"
},
"energy_efficiency_rating": 4,
"environmental_efficiency_rating": 4
},
"post_town": "BRIGHTON",
"built_form": 2,
"created_at": "2022-03-22 14:09:56",
"door_count": 1,
"glazed_area": 1,
"glazing_gap": "16+",
"region_code": 14,
"report_type": 2,
"sap_heating": {
"cylinder_size": 1,
"water_heating_code": 901,
"water_heating_fuel": 26,
"instantaneous_wwhrs": {
"rooms_with_bath_and_or_shower": 1,
"rooms_with_mixer_shower_no_bath": 0,
"rooms_with_bath_and_mixer_shower": 0
},
"secondary_fuel_type": 29,
"main_heating_details": [
{
"has_fghrs": "N",
"main_fuel_type": 26,
"boiler_flue_type": 2,
"fan_flue_present": "Y",
"heat_emitter_type": 1,
"emitter_temperature": 0,
"main_heating_number": 1,
"main_heating_control": 2106,
"main_heating_category": 2,
"main_heating_fraction": 1,
"sap_main_heating_code": 113,
"central_heating_pump_age": 0,
"main_heating_data_source": 2
}
],
"immersion_heating_type": "NA",
"secondary_heating_type": 691,
"has_fixed_air_conditioning": "false"
},
"sap_version": 9.94,
"schema_type": "RdSAP-Schema-20.0.0",
"uprn_source": "Energy Assessor",
"country_code": "EAW",
"main_heating": [
{
"description": {
"value": "Boiler and radiators, mains gas",
"language": "1"
},
"energy_efficiency_rating": 4,
"environmental_efficiency_rating": 4
}
],
"dwelling_type": {
"value": "Semi-detached house",
"language": "1"
},
"language_code": 1,
"property_type": 0,
"address_line_1": "30 Hallett Road",
"assessment_type": "RdSAP",
"completion_date": "2022-03-22",
"inspection_date": "2022-03-17",
"extensions_count": 1,
"measurement_type": 1,
"total_floor_area": 80,
"transaction_type": 8,
"conservatory_type": 1,
"heated_room_count": 4,
"pvc_window_frames": "true",
"registration_date": "2022-03-22",
"sap_energy_source": {
"mains_gas": "Y",
"meter_type": 1,
"photovoltaic_supply": [
[
{
"pitch": 2,
"peak_power": {
"value": 1.14,
"quantity": "kW"
},
"orientation": 5,
"overshading": 1,
"pv_connection": 2
}
],
[
{
"pitch": 2,
"peak_power": {
"value": 1.14,
"quantity": "kW"
},
"orientation": 7,
"overshading": 1,
"pv_connection": 2
}
]
],
"wind_turbines_count": 0,
"wind_turbines_terrain_type": 2
},
"secondary_heating": {
"description": {
"value": "Room heaters, electric",
"language": "1"
},
"energy_efficiency_rating": 0,
"environmental_efficiency_rating": 0
},
"lzc_energy_sources": [
11
],
"sap_building_parts": [
{
"identifier": "Main Dwelling",
"wall_dry_lined": "N",
"wall_thickness": 280,
"floor_heat_loss": 7,
"roof_construction": 4,
"wall_construction": 4,
"building_part_number": 1,
"sap_floor_dimensions": [
{
"floor": 0,
"room_height": {
"value": 2.3,
"quantity": "metres"
},
"floor_insulation": 1,
"total_floor_area": {
"value": 36.86,
"quantity": "square metres"
},
"party_wall_length": {
"value": 6.8,
"quantity": "metres"
},
"floor_construction": 2,
"heat_loss_perimeter": {
"value": 13.4,
"quantity": "metres"
}
},
{
"floor": 1,
"room_height": {
"value": 2.3,
"quantity": "metres"
},
"total_floor_area": {
"value": 36.86,
"quantity": "square metres"
},
"party_wall_length": {
"value": 6.8,
"quantity": "metres"
},
"heat_loss_perimeter": {
"value": 17.4,
"quantity": "metres"
}
}
],
"wall_insulation_type": 4,
"construction_age_band": "C",
"party_wall_construction": 0,
"wall_thickness_measured": "Y",
"roof_insulation_location": 2,
"roof_insulation_thickness": "200mm",
"wall_insulation_thickness": "NI",
"floor_insulation_thickness": "NI"
},
{
"identifier": "Extension",
"wall_dry_lined": "N",
"floor_heat_loss": 7,
"roof_construction": 1,
"wall_construction": 4,
"building_part_number": 2,
"sap_floor_dimensions": [
{
"floor": 0,
"room_height": {
"value": 2.3,
"quantity": "metres"
},
"floor_insulation": 1,
"total_floor_area": {
"value": 6,
"quantity": "square metres"
},
"party_wall_length": 0,
"floor_construction": 1,
"heat_loss_perimeter": {
"value": 7,
"quantity": "metres"
}
}
],
"wall_insulation_type": 4,
"construction_age_band": "H",
"party_wall_construction": 0,
"wall_thickness_measured": "N",
"roof_insulation_location": 6,
"wall_insulation_thickness": "NI",
"floor_insulation_thickness": "NI",
"flat_roof_insulation_thickness": "AB"
}
],
"low_energy_lighting": 100,
"solar_water_heating": "N",
"habitable_room_count": 4,
"heating_cost_current": {
"value": 710,
"currency": "GBP"
},
"insulated_door_count": 0,
"co2_emissions_current": 2.4,
"energy_rating_average": 60,
"energy_rating_current": 72,
"lighting_cost_current": {
"value": 74,
"currency": "GBP"
},
"main_heating_controls": [
{
"description": {
"value": "Programmer, room thermostat and TRVs",
"language": "1"
},
"energy_efficiency_rating": 4,
"environmental_efficiency_rating": 4
}
],
"multiple_glazing_type": 3,
"open_fireplaces_count": 0,
"has_hot_water_cylinder": "false",
"heating_cost_potential": {
"value": 548,
"currency": "GBP"
},
"hot_water_cost_current": {
"value": 95,
"currency": "GBP"
},
"mechanical_ventilation": 0,
"percent_draughtproofed": 100,
"suggested_improvements": [
{
"sequence": 1,
"typical_saving": {
"value": 112,
"currency": "GBP"
},
"indicative_cost": "\u00a3500 - \u00a31,500",
"improvement_type": "B",
"improvement_details": {
"improvement_number": 6
},
"improvement_category": 5,
"energy_performance_rating": 77,
"environmental_impact_rating": 75
},
{
"sequence": 2,
"typical_saving": {
"value": 49,
"currency": "GBP"
},
"indicative_cost": "\u00a3800 - \u00a31,200",
"improvement_type": "W1",
"improvement_details": {
"improvement_number": 57
},
"improvement_category": 5,
"energy_performance_rating": 79,
"environmental_impact_rating": 78
},
{
"sequence": 3,
"typical_saving": {
"value": 30,
"currency": "GBP"
},
"indicative_cost": "\u00a34,000 - \u00a36,000",
"improvement_type": "N",
"improvement_details": {
"improvement_number": 19
},
"improvement_category": 5,
"energy_performance_rating": 81,
"environmental_impact_rating": 80
}
],
"co2_emissions_potential": 1.5,
"energy_rating_potential": 81,
"lighting_cost_potential": {
"value": 74,
"currency": "GBP"
},
"schema_version_original": "LIG-19.0",
"alternative_improvements": [
{
"improvement": {
"sequence": 1,
"typical_saving": {
"value": 59,
"currency": "GBP"
},
"improvement_type": "Q2",
"improvement_details": {
"improvement_number": 55
},
"improvement_category": 6,
"energy_performance_rating": 80,
"environmental_impact_rating": 78
}
}
],
"hot_water_cost_potential": {
"value": 66,
"currency": "GBP"
},
"renewable_heat_incentive": {
"water_heating": 2107,
"impact_of_cavity_insulation": -1824,
"space_heating_existing_dwelling": 9574
},
"energy_consumption_current": 171,
"has_fixed_air_conditioning": "false",
"multiple_glazed_proportion": 100,
"calculation_software_version": "2.1.0.1",
"energy_consumption_potential": 103,
"environmental_impact_current": 69,
"fixed_lighting_outlets_count": 10,
"windows_transmission_details": {
"u_value": 2.6,
"data_source": 2,
"solar_transmittance": 0.76
},
"current_energy_efficiency_band": "C",
"environmental_impact_potential": 80,
"has_heated_separate_conservatory": "false",
"potential_energy_efficiency_band": "B",
"co2_emissions_current_per_floor_area": 30,
"low_energy_fixed_lighting_outlets_count": 10
}

View file

@ -0,0 +1,335 @@
{
"uprn": 100061905751,
"roofs": [
{
"description": "Pitched, 100 mm loft insulation",
"energy_efficiency_rating": 3,
"environmental_efficiency_rating": 3
}
],
"walls": [
{
"description": "Cavity wall, as built, no insulation (assumed)",
"energy_efficiency_rating": 1,
"environmental_efficiency_rating": 1
}
],
"floors": [
{
"description": "Suspended, no insulation (assumed)",
"energy_efficiency_rating": 0,
"environmental_efficiency_rating": 0
}
],
"status": "entered",
"tenure": 1,
"windows": [
{
"description": "Fully double glazed",
"energy_efficiency_rating": 4,
"environmental_efficiency_rating": 4
}
],
"addendum": {
"cavity_fill_recommended": "true"
},
"lighting": {
"description": "Low energy lighting in 17% of fixed outlets",
"energy_efficiency_rating": 2,
"environmental_efficiency_rating": 2
},
"postcode": "BN11 4EP",
"hot_water": {
"description": "From main system",
"energy_efficiency_rating": 4,
"environmental_efficiency_rating": 4
},
"post_town": "WORTHING",
"built_form": 3,
"created_at": "2014-07-24 12:49:23",
"door_count": 2,
"glazed_area": 1,
"region_code": 14,
"report_type": 2,
"sap_heating": {
"wwhrs": {
"rooms_with_bath_and_or_shower": 1,
"rooms_with_mixer_shower_no_bath": 0,
"rooms_with_bath_and_mixer_shower": 1
},
"cylinder_size": 1,
"water_heating_code": 901,
"water_heating_fuel": 26,
"secondary_fuel_type": 15,
"main_heating_details": [
{
"has_fghrs": "N",
"main_fuel_type": 26,
"boiler_flue_type": 2,
"fan_flue_present": "Y",
"heat_emitter_type": 1,
"boiler_index_number": 16366,
"main_heating_number": 1,
"main_heating_control": 2106,
"main_heating_category": 2,
"main_heating_fraction": 1,
"main_heating_data_source": 1
}
],
"secondary_heating_type": 631,
"has_fixed_air_conditioning": "false"
},
"sap_version": 9.91,
"schema_type": "SAP-Schema-16.3",
"uprn_source": "Energy Assessor",
"country_code": "EAW",
"main_heating": [
{
"description": "Boiler and radiators, mains gas",
"energy_efficiency_rating": 4,
"environmental_efficiency_rating": 4
}
],
"dwelling_type": "End-terrace house",
"language_code": 1,
"property_type": 0,
"address_line_1": "77, Tarring Road",
"schema_version": "LIG-16.1",
"assessment_type": "RdSAP",
"completion_date": "2014-07-24",
"inspection_date": "2014-07-22",
"extensions_count": 1,
"measurement_type": 1,
"total_floor_area": 139,
"transaction_type": 5,
"conservatory_type": 1,
"heated_room_count": 5,
"registration_date": "2014-04-25",
"restricted_access": 0,
"sap_energy_source": {
"main_gas": "Y",
"meter_type": 2,
"photovoltaic_supply": [
[
{
"pitch": 3,
"peak_power": 1.25,
"orientation": 5,
"overshading": 1
}
],
[
{
"pitch": 3,
"peak_power": 0.5,
"orientation": 3,
"overshading": 1
}
],
[
{
"pitch": 3,
"peak_power": 0.5,
"orientation": 7,
"overshading": 2
}
]
],
"wind_turbines_count": 0,
"wind_turbines_terrain_type": 2
},
"secondary_heating": {
"description": "Room heaters, smokeless fuel",
"energy_efficiency_rating": 0,
"environmental_efficiency_rating": 0
},
"lzc_energy_sources": [
11
],
"sap_building_parts": [
{
"identifier": "Main Dwelling",
"wall_dry_lined": "N",
"wall_thickness": 300,
"floor_heat_loss": 7,
"roof_construction": 4,
"wall_construction": 4,
"building_part_number": 1,
"sap_floor_dimensions": [
{
"floor": 0,
"room_height": 2.7,
"floor_insulation": 1,
"total_floor_area": 67.49,
"floor_construction": 2,
"heat_loss_perimeter": 27.69
},
{
"floor": 1,
"room_height": 2.47,
"total_floor_area": 67.49,
"heat_loss_perimeter": 31.38
}
],
"wall_insulation_type": 4,
"construction_age_band": "A",
"wall_thickness_measured": "Y",
"roof_insulation_location": 2,
"roof_insulation_thickness": "100mm"
},
{
"identifier": "Extension",
"wall_dry_lined": "N",
"wall_thickness": 280,
"floor_heat_loss": 7,
"roof_construction": 5,
"wall_construction": 4,
"building_part_number": 2,
"sap_floor_dimensions": [
{
"floor": 0,
"room_height": 2.45,
"floor_insulation": 1,
"total_floor_area": 3.84,
"floor_construction": 1,
"heat_loss_perimeter": 5.77
}
],
"wall_insulation_type": 4,
"construction_age_band": "B",
"wall_thickness_measured": "Y",
"roof_insulation_location": 4,
"roof_insulation_thickness": "NI"
}
],
"low_energy_lighting": 17,
"solar_water_heating": "N",
"bedf_revision_number": 361,
"habitable_room_count": 5,
"heating_cost_current": {
"value": 1430,
"currency": "GBP"
},
"co2_emissions_current": 7.2,
"energy_rating_average": 60,
"energy_rating_current": 57,
"lighting_cost_current": {
"value": 132,
"currency": "GBP"
},
"main_heating_controls": [
{
"description": "Programmer, room thermostat and TRVs",
"energy_efficiency_rating": 4,
"environmental_efficiency_rating": 4
}
],
"multiple_glazing_type": 2,
"open_fireplaces_count": 2,
"has_hot_water_cylinder": "false",
"heating_cost_potential": {
"value": 881,
"currency": "GBP"
},
"hot_water_cost_current": {
"value": 105,
"currency": "GBP"
},
"mechanical_ventilation": 0,
"percent_draughtproofed": 100,
"suggested_improvements": [
{
"sequence": 1,
"typical_saving": {
"value": 308,
"currency": "GBP"
},
"indicative_cost": "\u00c2\u00a3500 - \u00c2\u00a31,500",
"improvement_type": "B",
"improvement_details": {
"improvement_number": 6
},
"improvement_category": 5,
"energy_performance_rating": 71,
"environmental_impact_rating": 63
},
{
"sequence": 2,
"typical_saving": {
"value": 73,
"currency": "GBP"
},
"indicative_cost": "\u00c2\u00a3800 - \u00c2\u00a31,200",
"improvement_type": "W",
"improvement_details": {
"improvement_number": 47
},
"improvement_category": 5,
"energy_performance_rating": 74,
"environmental_impact_rating": 66
},
{
"sequence": 3,
"typical_saving": {
"value": 49,
"currency": "GBP"
},
"indicative_cost": "\u00c2\u00a375",
"improvement_type": "E",
"improvement_details": {
"improvement_number": 35
},
"improvement_category": 5,
"energy_performance_rating": 75,
"environmental_impact_rating": 67
}
],
"co2_emissions_potential": 4.0,
"energy_rating_potential": 75,
"lighting_cost_potential": {
"value": 72,
"currency": "GBP"
},
"alternative_improvements": [
{
"improvement": {
"sequence": 1,
"typical_saving": {
"value": 67,
"currency": "GBP"
},
"improvement_type": "Q2",
"improvement_details": {
"improvement_number": 55
},
"improvement_category": 6,
"energy_performance_rating": 73,
"environmental_impact_rating": 65
}
}
],
"hot_water_cost_potential": {
"value": 105,
"currency": "GBP"
},
"renewable_heat_incentive": {
"water_heating": 2315,
"impact_of_loft_insulation": -869,
"impact_of_cavity_insulation": -8658,
"space_heating_existing_dwelling": 23216
},
"seller_commission_report": "Y",
"energy_consumption_current": 230,
"has_fixed_air_conditioning": "false",
"multiple_glazed_proportion": 100,
"calculation_software_version": "9.1.2",
"energy_consumption_potential": 125,
"environmental_impact_current": 45,
"fixed_lighting_outlets_count": 18,
"current_energy_efficiency_band": "D",
"environmental_impact_potential": 67,
"has_heated_separate_conservatory": "false",
"potential_energy_efficiency_band": "C",
"co2_emissions_current_per_floor_area": 52,
"low_energy_fixed_lighting_outlets_count": 3
}

View file

@ -189,6 +189,15 @@ def _sap_opening_area_m2(width: Any, height: Any) -> float:
)
def _pv_array_field(array: Any, name: str) -> Any:
"""Read a PV-array field from either a `PhotovoltaicArray` dataclass (21.0.x,
where the schema Union parsed the nested list) or a raw dict (older schemas
that leave the list unparsed)."""
if isinstance(array, dict):
return cast(Dict[str, Any], array).get(name)
return getattr(array, name)
def _map_schema_21_pv(
es_pv_supply: Any,
) -> tuple[Optional[PhotovoltaicSupply], Optional[List[PhotovoltaicArray]]]:
@ -206,12 +215,18 @@ def _map_schema_21_pv(
None. With no PV data at all, both are None.
"""
if isinstance(es_pv_supply, list):
# The nested list's leaves are `PhotovoltaicArray` dataclasses when the
# schema's Union type parsed them (21.0.x), or raw dicts when an older
# schema (17.020.0.0) types the field as the wrapper only and leaves the
# list unparsed — `_pv_array_field` reads either shape.
flattened = [
PhotovoltaicArray(
peak_power=_measurement_value(array.peak_power),
pitch=int(_measurement_value(array.pitch)),
orientation=_pv_orientation(array.orientation),
overshading=int(_measurement_value(array.overshading)),
peak_power=_measurement_value(_pv_array_field(array, "peak_power")),
pitch=int(_measurement_value(_pv_array_field(array, "pitch"))),
orientation=_pv_orientation(_pv_array_field(array, "orientation")),
overshading=int(
_measurement_value(_pv_array_field(array, "overshading"))
),
)
for inner_list in es_pv_supply
for array in inner_list
@ -538,6 +553,7 @@ class EpcPropertyDataMapper:
@staticmethod
def from_rdsap_schema_17_0(schema: RdSapSchema17_0) -> EpcPropertyData:
es = schema.sap_energy_source
pv_supply, pv_arrays = _map_schema_21_pv(es.photovoltaic_supply)
return EpcPropertyData(
uprn=schema.uprn,
assessment_type=schema.assessment_type,
@ -656,17 +672,8 @@ class EpcPropertyDataMapper:
is_dwelling_export_capable=False,
wind_turbines_terrain_type=str(es.wind_turbines_terrain_type),
electricity_smart_meter_present=False,
photovoltaic_supply=(
PhotovoltaicSupply(
none_or_no_details=PhotovoltaicSupplyNoneOrNoDetails(
percent_roof_area=es.photovoltaic_supply.none_or_no_details.percent_roof_area,
)
)
# ADR-0028: photovoltaic_supply can be absent, None, or a
# sparse shape without none_or_no_details — guard the read.
if getattr(es.photovoltaic_supply, "none_or_no_details", None)
else None
),
photovoltaic_supply=pv_supply,
photovoltaic_arrays=pv_arrays,
),
sap_building_parts=[
SapBuildingPart(
@ -954,6 +961,7 @@ class EpcPropertyDataMapper:
@staticmethod
def from_rdsap_schema_17_1(schema: RdSapSchema17_1) -> EpcPropertyData:
es = schema.sap_energy_source
pv_supply, pv_arrays = _map_schema_21_pv(es.photovoltaic_supply)
# ADR-0028: instantaneous_wwhrs holds bath/shower ROOM counts.
iw = schema.sap_heating.instantaneous_wwhrs
return EpcPropertyData(
@ -1097,15 +1105,8 @@ class EpcPropertyDataMapper:
is_dwelling_export_capable=False,
wind_turbines_terrain_type=str(es.wind_turbines_terrain_type),
electricity_smart_meter_present=False,
photovoltaic_supply=(
PhotovoltaicSupply(
none_or_no_details=PhotovoltaicSupplyNoneOrNoDetails(
percent_roof_area=es.photovoltaic_supply.none_or_no_details.percent_roof_area,
)
)
if getattr(es.photovoltaic_supply, "none_or_no_details", None)
else None
),
photovoltaic_supply=pv_supply,
photovoltaic_arrays=pv_arrays,
),
sap_building_parts=[
SapBuildingPart(
@ -1145,6 +1146,7 @@ class EpcPropertyDataMapper:
@staticmethod
def from_rdsap_schema_18_0(schema: RdSapSchema18_0) -> EpcPropertyData:
es = schema.sap_energy_source
pv_supply, pv_arrays = _map_schema_21_pv(es.photovoltaic_supply)
# ADR-0028: instantaneous_wwhrs holds bath/shower ROOM counts.
iw = schema.sap_heating.instantaneous_wwhrs
return EpcPropertyData(
@ -1295,16 +1297,8 @@ class EpcPropertyDataMapper:
is_dwelling_export_capable=False,
wind_turbines_terrain_type=str(es.wind_turbines_terrain_type),
electricity_smart_meter_present=False,
photovoltaic_supply=(
PhotovoltaicSupply(
none_or_no_details=PhotovoltaicSupplyNoneOrNoDetails(
percent_roof_area=es.photovoltaic_supply.none_or_no_details.percent_roof_area,
)
)
if es.photovoltaic_supply
and es.photovoltaic_supply.none_or_no_details
else None
),
photovoltaic_supply=pv_supply,
photovoltaic_arrays=pv_arrays,
),
sap_building_parts=[
SapBuildingPart(
@ -1375,6 +1369,7 @@ class EpcPropertyDataMapper:
@staticmethod
def from_rdsap_schema_19_0(schema: RdSapSchema19_0) -> EpcPropertyData:
es = schema.sap_energy_source
pv_supply, pv_arrays = _map_schema_21_pv(es.photovoltaic_supply)
return EpcPropertyData(
uprn=schema.uprn,
assessment_type=schema.assessment_type,
@ -1496,17 +1491,8 @@ class EpcPropertyDataMapper:
is_dwelling_export_capable=False,
wind_turbines_terrain_type=str(es.wind_turbines_terrain_type),
electricity_smart_meter_present=False,
photovoltaic_supply=(
PhotovoltaicSupply(
none_or_no_details=PhotovoltaicSupplyNoneOrNoDetails(
percent_roof_area=es.photovoltaic_supply.none_or_no_details.percent_roof_area,
)
)
# ADR-0028: photovoltaic_supply can be absent, None, or a
# sparse shape without none_or_no_details — guard the read.
if getattr(es.photovoltaic_supply, "none_or_no_details", None)
else None
),
photovoltaic_supply=pv_supply,
photovoltaic_arrays=pv_arrays,
),
sap_building_parts=[
SapBuildingPart(
@ -1581,6 +1567,12 @@ class EpcPropertyDataMapper:
@staticmethod
def from_rdsap_schema_20_0_0(schema: RdSapSchema20_0_0) -> EpcPropertyData:
es = schema.sap_energy_source
# `photovoltaic_supply` is polymorphic in 20.0.0 too (None / wrapper dict /
# measured-array LIST) — route it through the shared dispatch like 21.0.0 so a
# cert lodging measured arrays as a list (e.g. cert 6102-6227-8000-0083-2292,
# uprn 22086693) maps instead of raising "'list' object has no attribute
# 'none_or_no_details'" and sinking the whole prediction cohort.
pv_supply, pv_arrays = _map_schema_21_pv(es.photovoltaic_supply)
# ADR-0027: instantaneous_wwhrs holds bath/shower ROOM counts.
iw = schema.sap_heating.instantaneous_wwhrs
return EpcPropertyData(
@ -1737,16 +1729,8 @@ class EpcPropertyDataMapper:
is_dwelling_export_capable=False,
wind_turbines_terrain_type=str(es.wind_turbines_terrain_type),
electricity_smart_meter_present=False,
photovoltaic_supply=(
PhotovoltaicSupply(
none_or_no_details=PhotovoltaicSupplyNoneOrNoDetails(
percent_roof_area=es.photovoltaic_supply.none_or_no_details.percent_roof_area,
)
)
if es.photovoltaic_supply
and es.photovoltaic_supply.none_or_no_details
else None
),
photovoltaic_supply=pv_supply,
photovoltaic_arrays=pv_arrays,
),
sap_building_parts=[
SapBuildingPart(
@ -2817,16 +2801,23 @@ def _with_recorded_performance(
current_energy_efficiency_band=(
Epc(band) if band is not None else epc.current_energy_efficiency_band
),
# These scalars are normally plain numbers but some certs (e.g. 16.x cert
# 2308-4997-7262-0137-9930) lodge them as a Measurement dict
# {'value': 3.5, 'quantity': 'tonnes per year'} — `_measurement_value`
# coerces Measurement / dict / plain-number alike, so `float(dict)` no
# longer crashes the whole prediction cohort.
co2_emissions_current=(
float(co2) if co2 is not None else epc.co2_emissions_current
_measurement_value(co2) if co2 is not None else epc.co2_emissions_current
),
energy_consumption_current=(
int(consumption)
int(_measurement_value(consumption))
if consumption is not None
else epc.energy_consumption_current
),
energy_rating_current=(
int(rating) if rating is not None else epc.energy_rating_current
int(_measurement_value(rating))
if rating is not None
else epc.energy_rating_current
),
)
@ -3251,6 +3242,34 @@ def _default_missing_post_town(data: Dict[str, Any]) -> Dict[str, Any]:
return {**data, "post_town": ""}
def _derive_built_form_16x(dwelling_type: Any) -> int:
"""Derive the RdSAP `built_form` code from a cert's `dwelling_type` text when
the numeric field is omitted (some 16.0 certs). The form is stated in the
description for houses/bungalows ("Mid-terrace house" 4, "End-terrace" 3,
"Semi-detached" 2, "Detached" 1). Flats ("Ground-floor flat") do not encode
a form, so fall back to the modal 4 (Mid-terrace) `built_form` is an ML-only
feature the SAP calculator never reads, so the fallback is SAP-neutral.
`dwelling_type` arrives as a plain string or a `{'value': ...}` dict (real-API
shape); both are handled."""
if isinstance(dwelling_type, dict):
dwelling_type = cast(Dict[str, Any], dwelling_type).get("value")
text = str(dwelling_type or "").lower()
if "semi-detached" in text:
return 2
if "detached" in text:
return 1
if "enclosed end" in text:
return 5
if "enclosed mid" in text:
return 6
if "end-terrace" in text:
return 3
if "mid-terrace" in text or "terrace" in text:
return 4
return 4 # flats / unstated form → modal built_form (SAP- and gate-neutral)
def _normalize_sap_schema_16_x(data: Dict[str, Any]) -> Dict[str, Any]:
"""Rewrite a `SAP-Schema-16.2`/`16.3` API doc onto the `RdSAP-Schema-17.1`
shape so it can reuse the tested `from_rdsap_schema_17_1` mapper.
@ -3287,6 +3306,13 @@ def _normalize_sap_schema_16_x(data: Dict[str, Any]) -> Dict[str, Any]:
if isinstance(windows, list) and windows:
window_list: List[Any] = cast(List[Any], windows)
d.setdefault("window", window_list[0])
elif isinstance(windows, dict):
# Some 16.x certs (e.g. "High performance glazing" certs 9768-…-7974 /
# 8257-…-4992) lodge `windows` as a single dict rather than a list — take
# it directly. (These particular certs still fail loud below: they also omit
# door_count / habitable_room_count / glazed_area — genuinely insufficient
# data — but a windows-as-dict cert that DOES carry the rest now maps.)
d.setdefault("window", windows)
d.setdefault("schema_version_original", d.get("schema_version", ""))
# Some 16.x certs (notably 16.0) omit `tenure` — RdSapSchema17_1 requires it.
# It is address/occupancy metadata the SAP cascade never reads, so default a
@ -3306,6 +3332,27 @@ def _normalize_sap_schema_16_x(data: Dict[str, Any]) -> Dict[str, Any]:
# still fail loud — that is correct, the fabric data is insufficient.
d.setdefault("insulated_door_count", 0)
# Some 16.x certs (notably 16.0, e.g. cert 8742-6624-9300-2780-4926) lodge
# `property_type`/`dwelling_type` but omit `built_form` — RdSapSchema17_1
# requires it. `built_form` is an ML-only feature (the SAP-10 calculator never
# reads it — only `sap10_ml/transform`), so it is SAP-neutral; and a default is
# gate-neutral here (verified — the component-accuracy fixture has no
# built-form-missing cert, so unlike `multiple_glazed_proportion` it does not
# tip the donor pool). DERIVE it from the dwelling_type description where the
# form is stated (houses), falling back to the modal value for flats (whose
# dwelling_type — "Ground-floor flat" — does not encode form).
if "built_form" not in d:
d["built_form"] = _derive_built_form_16x(d.get("dwelling_type"))
# NB: we deliberately do NOT default `multiple_glazed_proportion` here. A 16.x
# cert that omits it (e.g. 16.3 cert 0418-3986-7250-2884-7970) is left to fail
# the RdSapSchema17_1 parse and be handled by the cohort skip-and-report path —
# synthesising a value (tried `100`) drew the otherwise-unmappable cert into the
# EPC-prediction donor pool and tipped near-tie similarity matches, regressing
# the frozen-fixture component-accuracy gate (has_hot_water_cylinder 30→29/36,
# door_count residual 23→25/36). The field is ML-only (the SAP calc never reads
# it), so there is no calc cost to leaving such certs unmapped. See worklist.
# 16.2 lodges glazing in BOTH `multiple_glazing_type` (frequently the "ND"
# not-defined sentinel) AND the windows[].description. When the numeric field
# is undefined, honour an explicit "Single glazed" description so it is not

View file

@ -191,6 +191,30 @@ class TestFromRdSapSchema20_0_0:
schema = from_dict(RdSapSchema20_0_0, load("20_0_0.json"))
return EpcPropertyDataMapper.from_rdsap_schema_20_0_0(schema)
def test_photovoltaic_supply_as_dict_list_is_mapped_not_crashed(self) -> None:
# 20.0.0 types `photovoltaic_supply` as the wrapper only (not the 21.0.x
# Union), so a cert lodging measured arrays as a LIST leaves the leaves as
# raw dicts. The mapper previously did `es.photovoltaic_supply.none_or_no_
# details` → "'list' object has no attribute 'none_or_no_details'", sinking
# the whole prediction cohort. Regression for cert 6102-6227-8000-0083-2292
# (uprn 22086693). It now routes through `_map_schema_21_pv`, whose
# dict-tolerant array reader captures the arrays.
data = load("20_0_0.json")
data["sap_energy_source"]["photovoltaic_supply"] = [
[{"pitch": 2, "peak_power": {"value": 1.14, "quantity": "kW"},
"orientation": 5, "overshading": 1}],
[{"pitch": 2, "peak_power": 1.14, "orientation": 5, "overshading": 1}],
]
schema = from_dict(RdSapSchema20_0_0, data)
result = EpcPropertyDataMapper.from_rdsap_schema_20_0_0(schema)
arrays = result.sap_energy_source.photovoltaic_arrays
assert arrays is not None and len(arrays) == 2
assert arrays[0].peak_power == 1.14
assert arrays[0].orientation == 5
assert result.sap_energy_source.photovoltaic_supply is None
def test_uprn(self, result: EpcPropertyData) -> None:
assert result.uprn == 12457

View file

@ -324,7 +324,7 @@ class TestFromSapSchema17_1CodeBasedHeating:
class TestFromSapSchema17_1Ventilation:
"""Slice D5-vent: full-SAP sap_ventilation → measured air permeability (AP4),
"""Slice D5-vent: full-SAP sap_ventilation → measured air permeability (AP50),
ventilation_type MechanicalVentilationKind, sheltered sides, wet rooms and
the MEV PCDB index."""
@ -333,9 +333,13 @@ class TestFromSapSchema17_1Ventilation:
schema = from_dict(SapSchema17_1, load("sap_17_1.json"))
return EpcPropertyDataMapper.from_sap_schema_17_1(schema)
def test_measured_air_permeability_fed_as_ap4(self, sample: EpcPropertyData) -> None:
def test_measured_air_permeability_fed_as_ap50(self, sample: EpcPropertyData) -> None:
# The lodged `air_permeability` is a q50 Blower-Door result, so it feeds the
# engine's AP50 path `(18) = AP50/20 + (8)` — NOT the AP4/Pulse formula
# `0.263 × AP4^0.924` (the air-permeability AP50 fix, uprn_10093116528).
assert sample.sap_ventilation is not None
assert sample.sap_ventilation.air_permeability_ap4_m3_h_m2 == 2.6
assert sample.sap_ventilation.air_permeability_ap50_m3_h_m2 == 2.6
assert sample.sap_ventilation.air_permeability_ap4_m3_h_m2 is None
def test_ventilation_type_6_is_extract(self, sample: EpcPropertyData) -> None:
# ventilation_type 6 = MEV decentralised → EXTRACT_OR_PIV_OUTSIDE.
@ -519,6 +523,32 @@ class TestFromSapSchema16_2:
assert epc.uprn == 100020933894
assert Sap10Calculator().calculate(epc).sap_score == 61 # lodged 56
def test_16_x_missing_built_form_is_derived_from_dwelling_type(self) -> None:
# Some 16.0 certs (e.g. cert 8742-6624-9300-2780-4926) lodge
# `property_type`/`dwelling_type` but omit `built_form`, which RdSapSchema17_1
# requires — previously aborted the prediction cohort. The normaliser derives
# it from the dwelling_type text ("Semi-detached house" → 2). built_form is
# ML-only (SAP calc never reads it), so this is SAP- and gate-neutral.
data = load("sap_16_0.json")
del data["built_form"]
assert data["dwelling_type"] == "Semi-detached house"
epc = EpcPropertyDataMapper.from_api_response(data)
assert isinstance(epc, EpcPropertyData)
assert epc.built_form == "2"
def test_16_x_missing_built_form_for_flat_falls_back_to_modal(self) -> None:
# A flat's dwelling_type ("Ground-floor flat") does not encode a form, so the
# derivation falls back to the modal 4 (Mid-terrace) — keeps the cert mappable.
data = load("sap_16_0.json")
del data["built_form"]
data["dwelling_type"] = "Ground-floor flat"
epc = EpcPropertyDataMapper.from_api_response(data)
assert epc.built_form == "4"
def test_16_x_missing_insulated_door_count_defaults_to_zero(self) -> None:
# Some 16.x certs lodge `door_count` but omit `insulated_door_count`,
# which RdSapSchema17_1 requires — previously raised "missing required
@ -538,6 +568,35 @@ class TestFromSapSchema16_2:
assert isinstance(epc, EpcPropertyData)
assert epc.insulated_door_count == 0
def test_16_x_missing_multiple_glazed_proportion_fails_loud(self) -> None:
# A 16.x cert omitting `multiple_glazed_proportion` (e.g. 16.3 cert
# 0418-3986-7250-2884-7970) is deliberately NOT defaulted — it fails the
# RdSapSchema17_1 parse and is handled by the cohort skip-and-report path.
# Synthesising a value drew the otherwise-unmappable cert into the
# EPC-prediction donor pool and regressed the component-accuracy gate;
# the field is ML-only so there is no calc cost to leaving it unmapped.
data = load("sap_16_3.json")
del data["multiple_glazed_proportion"]
with pytest.raises(
ValueError, match="missing required field 'multiple_glazed_proportion'"
):
EpcPropertyDataMapper.from_api_response(data)
def test_recorded_co2_as_measurement_dict_is_coerced_not_crashed(self) -> None:
# Some certs (e.g. 16.x cert 2308-4997-7262-0137-9930) lodge
# `co2_emissions_current` as a Measurement dict {'value': 3.5, 'quantity':
# 'tonnes per year'} rather than a plain number. `_with_recorded_performance`
# previously did `float(co2)` → "float() argument must be ... not 'dict'",
# crashing the whole prediction cohort. It now coerces via _measurement_value.
data = load("sap_16_3.json")
data["co2_emissions_current"] = {"value": 3.5, "quantity": "tonnes per year"}
epc = EpcPropertyDataMapper.from_api_response(data)
assert isinstance(epc, EpcPropertyData)
assert epc.co2_emissions_current == 3.5
def test_16_2_normalizer_does_not_mutate_caller_dict(self) -> None:
# Mirror _normalize_shower_outlets' contract: the caller's dict is
# untouched (deep copy), so a re-dispatch sees the original shape.

View file

@ -26,7 +26,7 @@ variable "reserved_concurrent_executions" {
variable "maximum_concurrency" {
type = number
default = 2
default = 32
description = "Maximum concurrent Lambda invocations from the SQS trigger."
}

View file

@ -1,20 +1,25 @@
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any, Optional, Type
from sqlalchemy.engine import Engine
from sqlalchemy.pool import Pool
from sqlmodel import Session, create_engine
from infrastructure.postgres.config import PostgresConfig
def make_engine(config: PostgresConfig) -> Engine:
return create_engine(
config.url(),
pool_size=config.pool_size,
max_overflow=config.max_overflow,
pool_pre_ping=config.pool_pre_ping,
pool_recycle=config.pool_recycle,
)
def make_engine(
config: PostgresConfig, poolclass: Optional[Type[Pool]] = None
) -> Engine:
kwargs: dict[str, Any] = {"pool_pre_ping": config.pool_pre_ping}
if poolclass is None:
kwargs["pool_size"] = config.pool_size
kwargs["max_overflow"] = config.max_overflow
kwargs["pool_recycle"] = config.pool_recycle
else:
kwargs["poolclass"] = poolclass
return create_engine(config.url(), **kwargs)
def make_session(engine: Engine) -> Session:

View file

@ -9,6 +9,8 @@ UPRNs share a partition). Register metadata the cert itself doesn't carry
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import date
from typing import Callable, Optional, Protocol
@ -53,6 +55,20 @@ class NearbyPostcodes(Protocol):
) -> list[str]: ...
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class SkippedCohortCert:
"""A postcode-cohort cert the mapper could not consume, so it was excluded
from the cohort rather than sinking the whole prediction. The certificate
number + error are captured so the gap surfaces (subtask outputs + logs) and
can be closed by extending the mapper later (ADR-0031)."""
certificate_number: str
error: str
class EpcComparablePropertiesRepository(ComparablePropertiesRepository):
def __init__(
self,
@ -63,6 +79,10 @@ class EpcComparablePropertiesRepository(ComparablePropertiesRepository):
self._epc_client = epc_client
self._geospatial = geospatial
self._nearby_postcodes = nearby_postcodes
# Cohort certs skipped because they are not yet mappable. Accumulates
# across every postcode the instance serves; the caller reads it after
# the run to report the mapper gaps (see modelling_e2e handler).
self.skipped: list[SkippedCohortCert] = []
def candidates_for(self, postcode: str) -> list[ComparableProperty]:
results: list[EpcSearchResult] = self._epc_client.search_by_postcode(
@ -72,7 +92,12 @@ class EpcComparablePropertiesRepository(ComparablePropertiesRepository):
coordinates: dict[int, Coordinates] = self._geospatial.coordinates_for_uprns(
uprns
)
return [self._comparable(result, coordinates) for result in results]
cohort: list[ComparableProperty] = []
for result in results:
comparable = self._comparable_or_skip(result, coordinates)
if comparable is not None:
cohort.append(comparable)
return cohort
def candidates_near(
self,
@ -113,12 +138,38 @@ class EpcComparablePropertiesRepository(ComparablePropertiesRepository):
break
return candidates
def _comparable(
# Mapper-shape errors: the cert's lodged data does not fit the schema/mapper.
# `ValueError` — missing required field / unmapped code (`UnmappedApiCode`);
# `AttributeError` — a field lodged in the wrong shape (e.g. `photovoltaic_supply`
# as a list where a `PhotovoltaicSupply` object is expected → "'list' object has
# no attribute 'none_or_no_details'"); `KeyError`/`TypeError` — analogous
# structural mismatches. Transient API failures (`EpcApiError` &c.) subclass
# `Exception` directly, NOT these, so they still propagate and fail the run loud.
_UNMAPPABLE_CERT_ERRORS = (ValueError, AttributeError, KeyError, TypeError)
def _comparable_or_skip(
self, result: EpcSearchResult, coordinates: dict[int, Coordinates]
) -> ComparableProperty:
epc: EpcPropertyData = self._epc_client.get_by_certificate_number(
result.certificate_number
)
) -> Optional[ComparableProperty]:
"""Map one cohort cert, or record + skip it when the mapper raises a
mapper-shape error (see ``_UNMAPPABLE_CERT_ERRORS``). One unmappable cert
must NOT abort the whole cohort; transient API errors still propagate."""
try:
epc: EpcPropertyData = self._epc_client.get_by_certificate_number(
result.certificate_number
)
except self._UNMAPPABLE_CERT_ERRORS as exc:
self.skipped.append(
SkippedCohortCert(
certificate_number=result.certificate_number,
error=f"{type(exc).__name__}: {exc}",
)
)
logger.warning(
"skipping unmappable cohort cert %s: %s",
result.certificate_number,
exc,
)
return None
resolved: Optional[Coordinates] = (
coordinates.get(result.uprn) if result.uprn is not None else None
)

View file

@ -1,6 +1,6 @@
from __future__ import annotations
from sqlmodel import Session, col, delete
from sqlmodel import Session, col, update
from domain.modelling.plan import Plan
from infrastructure.postgres.modelling import PlanModel, RecommendationModel
@ -10,7 +10,12 @@ from repositories.plan.plan_repository import PlanRepository
class PlanPostgresRepository(PlanRepository):
"""Maps a Plan and its Plan Measures onto the live ``plan`` /
``recommendation`` tables (ADR-0017). Does not commit the Unit of Work
owns the transaction (ADR-0012)."""
owns the transaction (ADR-0012).
A re-run INSERTs a fresh Plan rather than deleting the prior one (the cascade
delete was slow); when the new Plan is the default it demotes any prior
default Plan for the same (property_id, scenario_id) to ``is_default=False``,
so readers can select the current Plan via ``is_default=True``."""
def __init__(self, session: Session) -> None:
self._session = session
@ -24,15 +29,20 @@ class PlanPostgresRepository(PlanRepository):
portfolio_id: int,
is_default: bool,
) -> int:
# Idempotent replace for (property_id, scenario_id): deleting the Plan
# cascades to its recommendation rows via the plan_id FK (ON DELETE
# CASCADE), so a re-run overwrites rather than duplicating (ADR-0012).
self._session.exec( # type: ignore[call-overload]
delete(PlanModel).where(
col(PlanModel.property_id) == property_id,
col(PlanModel.scenario_id) == scenario_id,
# Soft-replace (ADR-0012): keep prior Plans as history rather than DELETEing
# them — the cascade delete of recommendation rows was the slow part. When
# this Plan is the default, demote every prior Plan for the same
# (property_id, scenario_id) to is_default=False, so exactly one Plan for
# the pair stays default (the one just inserted).
if is_default:
self._session.exec( # type: ignore[call-overload]
update(PlanModel)
.where(
col(PlanModel.property_id) == property_id,
col(PlanModel.scenario_id) == scenario_id,
)
.values(is_default=False)
)
)
plan_row = PlanModel.from_domain(
plan,

View file

@ -8,10 +8,12 @@ from domain.modelling.plan import Plan
class PlanRepository(ABC):
"""Persists a Plan (and its Plan Measures) for a Property + Scenario.
One Plan per (Property, Scenario). The write is idempotent on re-run: it
replaces the existing Plan for that pair rather than duplicating (ADR-0012
/ ADR-0017). `portfolio_id` and `is_default` are supplied by the
orchestrator (the former from the trigger, the latter from the Scenario).
A re-run INSERTs a fresh Plan and keeps the prior one as history rather than
deleting it. When the new Plan is the default, prior default Plans for the
same (Property, Scenario) are demoted to `is_default=False`, so the current
Plan is the one with `is_default=True` (ADR-0012 / ADR-0017). `portfolio_id`
and `is_default` are supplied by the orchestrator (the former from the
trigger, the latter from the Scenario).
"""
@abstractmethod
@ -24,6 +26,7 @@ class PlanRepository(ABC):
portfolio_id: int,
is_default: bool,
) -> int:
"""Persist ``plan`` and return its Plan id, replacing any existing Plan
for ``(property_id, scenario_id)``."""
"""Persist ``plan`` and return its Plan id. Keeps prior Plans for
``(property_id, scenario_id)`` as history; when ``is_default`` is True,
demotes those prior Plans to ``is_default=False``."""
...

View file

@ -0,0 +1,17 @@
# Failed modelling_e2e Subtasks
| Subtask ID | Task ID | Updated At | Property ID | UPRN | Inputs |
|-----------|---------|------------|-------------|------|--------|
| ce6a5844-cf31-495e-9e45-91a83aedb8e7 | a520b8a0-2b7a-4a99-a87d-bfe4bf785de6 | 2026-06-23 11:35:58.448106+00:00 | 719897 | 100021919718 | {"property_ids": [719897], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 31e687bb-9b9d-4fcc-b640-f68f878cf49a | b7dde417-2e5d-42ee-b695-c9d7a9ca81c1 | 2026-06-23 11:35:51.384121+00:00 | 733315 | 100020401711 | {"property_ids": [733315], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| b337172c-9dc5-48a0-9eb8-8021893a0ef1 | 01266210-d44a-4715-bf21-eda88a67a5e7 | 2026-06-23 11:35:41.863424+00:00 | 723589 | 100020407755 | {"property_ids": [723589], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| d2af286a-8964-4040-8429-e289d215c635 | 5f408513-be4d-4ce0-96b0-ceb654563ca2 | 2026-06-23 11:35:39.939416+00:00 | 726592 | 100021918195 | {"property_ids": [726592], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 1d13c3a2-b269-4ca4-a3f7-f19479081444 | cebcff9b-4a46-48f0-a648-f40b030951b2 | 2026-06-23 11:35:18.760333+00:00 | 711228 | 100020416477 | {"property_ids": [711228], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| f920f079-edd0-4e3e-893e-809e57a57292 | e6a75a2f-2165-4c6a-b929-f485db08b5a2 | 2026-06-23 11:35:11.908026+00:00 | 717435 | 22010468 | {"property_ids": [717435], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 36ec4a16-be95-4fda-8279-d7d33ed5a556 | 693c3886-efc6-48c1-b99d-576b5736c7e9 | 2026-06-23 11:35:06.439654+00:00 | 710339 | 10013151061 | {"property_ids": [710339], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 8960e551-10af-48ac-b4ad-821154a79a1c | 17791ee1-1ec6-49ea-a6fc-6f1e8d20914d | 2026-06-23 11:34:58.830828+00:00 | 721815 | 22086690 | {"property_ids": [721815], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 453ca0df-3b0d-427d-abf5-8462194f770b | 4712998d-d130-4472-860f-5b1d2471b3e3 | 2026-06-23 11:34:47.106853+00:00 | 712401 | 100020394694 | {"property_ids": [712401], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 1f5ec8fb-7202-44d6-8794-44515f9b4d82 | e50d8753-fd8b-4735-ac89-225428989ec5 | 2026-06-23 11:34:39.473828+00:00 | 723881 | 22005280 | {"property_ids": [723881], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 6ddff51f-1c29-439c-9335-b5befee64836 | dbac1632-3648-46da-97bc-0ca572bc9c45 | 2026-06-23 11:34:35.708295+00:00 | 715891 | 22082258 | {"property_ids": [715891], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 3255673d-7cba-49f8-ba88-106301dfa029 | 620f5571-d6c5-469a-afe3-8986b53dd041 | 2026-06-23 11:34:32.737237+00:00 | 716049 | 22104161 | {"property_ids": [716049], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 9b1a8289-de92-4935-afd6-b93a89f400e6 | 35abffbe-7574-448a-b8cf-89586bf9057d | 2026-06-23 11:34:31.003170+00:00 | 730259 | 100061905741 | {"property_ids": [730259], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |

View file

@ -28,7 +28,7 @@ SESSION_DIR = HERE / ".elmhurst-session"
SAMPLE_DIR = (
HERE.parent.parent
/ "backend/epc_api/json_samples/real_life_examples"
/ "RdSAP-Schema-17.1/uprn_10010215568"
/ "SAP-Schema-16.0/uprn_10070004512"
)
ASSESSMENT_GUID = "B44A0DB4-4C08-4241-B818-86F060172105"

View file

@ -0,0 +1,47 @@
"""Print a dict of postcode → property IDs for a portfolio, sorted by group size.
Edit PORTFOLIO_ID below, then hit Run.
"""
from __future__ import annotations
# ---------------------------------------------------------------------------
# CONFIG
# ---------------------------------------------------------------------------
PORTFOLIO_ID: int = 796
# ---------------------------------------------------------------------------
import sys
from collections import defaultdict
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT))
from sqlalchemy import text # noqa: E402
from scripts.e2e_common import ENV_PATH, build_engine, load_env # noqa: E402
load_env(ENV_PATH)
engine = build_engine()
with engine.connect() as conn:
rows = conn.execute(
text("SELECT id, postcode FROM property WHERE portfolio_id = :pid ORDER BY postcode, id"),
{"pid": PORTFOLIO_ID},
).fetchall()
by_postcode: dict[str, list[int]] = defaultdict(list)
for pid, postcode in rows:
by_postcode[postcode or "UNKNOWN"].append(int(pid))
sorted_dict = dict(sorted(by_postcode.items(), key=lambda kv: len(kv[1])))
output_path = _REPO_ROOT / "scripts" / f"properties_by_postcode_{PORTFOLIO_ID}.txt"
lines = [f"{postcode!r}: {ids}" for postcode, ids in sorted_dict.items()]
lines.append(
f"\nTotal postcodes: {len(sorted_dict)}, total properties: {sum(len(v) for v in sorted_dict.values())}"
)
output_path.write_text("\n".join(lines), encoding="utf-8")
print(f"Saved to {output_path}")

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,103 @@
"""Query failed modelling_e2e subtasks and write a markdown report.
Joins sub_task tasks, pulls property_ids from the inputs JSON, then looks up
UPRNs from the property table.
Hit Run output written to scripts/failed_modelling_e2e.md
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT))
from sqlalchemy import text # noqa: E402
from scripts.e2e_common import ENV_PATH, build_engine, load_env # noqa: E402
_OUTPUT = _REPO_ROOT / "scripts" / "failed_modelling_e2e.md"
load_env(ENV_PATH)
engine = build_engine()
with engine.connect() as conn:
subtask_rows = conn.execute(text("""
SELECT
st.id AS subtask_id,
st.task_id,
st.inputs,
st.updated_at
FROM sub_task st
JOIN tasks t ON t.id = st.task_id
WHERE t.task_source = 'modelling_e2e'
AND st.status = 'failed'
ORDER BY st.updated_at DESC
""")).fetchall()
if not subtask_rows:
print("No failed modelling_e2e subtasks found.")
_OUTPUT.write_text(
"# Failed modelling_e2e Subtasks\n\nNone found.\n", encoding="utf-8"
)
exit(0)
# Collect all property_ids across all rows
all_property_ids: list[int] = []
parsed: list[tuple[str, str, list[int], str, str]] = []
for subtask_id, task_id, inputs_raw, updated_at in subtask_rows:
try:
inputs = (
json.loads(inputs_raw)
if isinstance(inputs_raw, str)
else (inputs_raw or {})
)
property_ids: list[int] = [
int(p) for p in (inputs.get("property_ids") or [])
]
except Exception:
property_ids = []
parsed.append(
(
str(subtask_id),
str(task_id),
property_ids,
str(updated_at),
inputs_raw or "",
)
)
all_property_ids.extend(property_ids)
# Look up UPRNs
uprn_map: dict[int, int] = {}
if all_property_ids:
uprn_rows = conn.execute(
text("SELECT id, uprn FROM property WHERE id = ANY(:ids)"),
{"ids": all_property_ids},
).fetchall()
uprn_map = {int(r[0]): int(r[1]) for r in uprn_rows}
lines: list[str] = [
"# Failed modelling_e2e Subtasks\n",
f"| Subtask ID | Task ID | Updated At | Property ID | UPRN | Inputs |",
f"|-----------|---------|------------|-------------|------|--------|",
]
for subtask_id, task_id, property_ids, updated_at, inputs_raw in parsed:
inputs_cell = (inputs_raw or "").replace("|", "\\|")
if property_ids:
for pid in property_ids:
uprn = uprn_map.get(pid, "unknown")
lines.append(
f"| {subtask_id} | {task_id} | {updated_at} | {pid} | {uprn} | {inputs_cell} |"
)
else:
lines.append(
f"| {subtask_id} | {task_id} | {updated_at} | — | — | {inputs_cell} |"
)
_OUTPUT.write_text("\n".join(lines) + "\n", encoding="utf-8")
print(f"Written {len(parsed)} failed subtasks → {_OUTPUT}")

View file

@ -1,37 +1,41 @@
"""Enqueue one SQS message per property for the modelling_e2e Lambda.
"""Enqueue one SQS message per postcode group for the modelling_e2e Lambda.
Reads all property IDs for the given portfolio from the DB and sends a batch of
SQS messages, one per property. The Lambda then processes each message
independently, enabling concurrent modelling at scale.
Reads postcode property ID groups from the file produced by
list_properties_by_postcode.py, queries the DB for already-completed
property IDs, then sends one SQS message per postcode batch containing only
the properties that still need processing.
Edit the CONFIG block below, then run via VSCode Run button or Jupyter.
AWS creds come from the ambient ~/.aws profile; DB creds from backend/.env.
Edit the CONFIG block below, then hit Run.
AWS creds come from the ambient ~/.aws profile.
"""
from __future__ import annotations
# --------------------------------------------------------------------------
from utilities.logger import setup_logger
# ---------------------------------------------------------------------------
# CONFIG — edit these before running
# ---------------------------------------------------------------------------
PORTFOLIO_ID: int = 785
SCENARIO_ID: int = 1266
SQS_URL: str = "https://sqs.eu-west-2.amazonaws.com/ACCOUNT_ID/modelling-e2e-STAGE"
PORTFOLIO_ID: int = 796
SCENARIO_ID: int = 1268
SQS_QUEUE_NAME: str = "modelling_e2e-queue-dev"
# Set to a positive integer to enqueue only the first N properties (trial run).
LIMIT: int | None = 10
# Number of postcodes to process this run (postcodes where all properties are
# already completed are skipped and do not count toward this limit).
POSTCODES_LIMIT: int = 1000
# True → Lambda runs the full pipeline but skips all DB writes (safe for testing).
DRY_RUN: bool = True
DRY_RUN: bool = False
# True → Lambda skips the Google Solar fetch.
NO_SOLAR: bool = False
# ---------------------------------------------------------------------------
import ast
import json
import sys
from pathlib import Path
from typing import Any, cast
from uuid import uuid4
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT))
@ -41,65 +45,97 @@ from sqlalchemy import text # noqa: E402
from scripts.e2e_common import ENV_PATH, build_engine, load_env # noqa: E402
_BATCH_SIZE = 10
logger = setup_logger()
_POSTCODES_FILE = _REPO_ROOT / "scripts" / f"properties_by_postcode_{PORTFOLIO_ID}.txt"
def _property_ids(portfolio_id: int, limit: int | None, engine: object) -> list[int]:
from sqlalchemy.engine import Engine
def _load_postcode_map() -> dict[str, list[int]]:
if not _POSTCODES_FILE.exists():
raise FileNotFoundError(
f"{_POSTCODES_FILE} not found — run list_properties_by_postcode.py first"
)
result: dict[str, list[int]] = {}
for line in _POSTCODES_FILE.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("Total"):
continue
postcode_repr, ids_repr = line.split(": ", 1)
result[ast.literal_eval(postcode_repr)] = ast.literal_eval(ids_repr)
return result
assert isinstance(engine, Engine)
query = "SELECT id FROM property WHERE portfolio_id = :pid ORDER BY id"
if limit is not None:
query += f" LIMIT {int(limit)}"
def _completed_property_ids() -> set[int]:
"""Return all property IDs with a completed modelling_e2e subtask for this
portfolio + scenario. Single DB round-trip."""
load_env(ENV_PATH)
engine = build_engine()
with engine.connect() as conn:
rows = conn.execute(text(query), {"pid": portfolio_id}).fetchall()
return [int(r[0]) for r in rows]
def _batches(items: list[int], size: int) -> list[list[int]]:
return [items[i : i + size] for i in range(0, len(items), size)]
rows = conn.execute(
text("""
SELECT DISTINCT elem.value::int AS property_id
FROM sub_task st
JOIN tasks t ON t.id = st.task_id
CROSS JOIN jsonb_array_elements_text(
(st.inputs::jsonb)->'property_ids'
) AS elem(value)
WHERE t.task_source = 'modelling_e2e'
AND st.status = 'complete'
AND ((st.inputs::jsonb)->>'portfolio_id')::int = :portfolio_id
AND ((st.inputs::jsonb)->>'scenario_id')::int = :scenario_id
"""),
{
"portfolio_id": PORTFOLIO_ID,
"scenario_id": SCENARIO_ID,
},
).fetchall()
return {int(r[0]) for r in rows}
def main() -> None:
load_env(ENV_PATH)
engine = build_engine()
postcode_map = _load_postcode_map()
completed = _completed_property_ids()
logger.info(f"{len(completed)} property IDs already completed — skipping")
ids = _property_ids(PORTFOLIO_ID, LIMIT, engine)
if not ids:
print(f"no properties found for portfolio {PORTFOLIO_ID}")
batches: list[tuple[str, list[int]]] = []
for postcode, ids in postcode_map.items():
pending = [pid for pid in ids if pid not in completed]
if pending:
batches.append((postcode, pending))
to_process = batches[:POSTCODES_LIMIT]
if not to_process:
logger.info("Nothing left to process.")
return
print(
f"enqueuing {len(ids)} properties "
sqs: Any = cast(
Any, boto3.client("sqs", region_name="eu-west-2")
) # pyright: ignore[reportUnknownMemberType]
sqs_url: str = sqs.get_queue_url(QueueName=SQS_QUEUE_NAME)["QueueUrl"]
logger.info(
f"sending {len(to_process)} messages "
f"(portfolio={PORTFOLIO_ID}, scenario={SCENARIO_ID}, "
f"no_solar={NO_SOLAR}, dry_run={DRY_RUN}) → {SQS_URL}"
f"dry_run={DRY_RUN}, no_solar={NO_SOLAR}) → {sqs_url}"
)
sqs: Any = cast(
Any, boto3.client("sqs")
) # pyright: ignore[reportUnknownMemberType]
sent = 0
for batch in _batches(ids, _BATCH_SIZE):
entries = [
{
"Id": str(uuid4()).replace("-", "")[:8] + str(i),
"MessageBody": json.dumps(
{
"property_id": [pid],
"portfolio_id": PORTFOLIO_ID,
"scenario_id": SCENARIO_ID,
"no_solar": NO_SOLAR,
"dry_run": DRY_RUN,
}
),
}
for i, pid in enumerate(batch)
]
sqs.send_message_batch(QueueUrl=SQS_URL, Entries=entries)
sent += len(batch)
print(f" sent {sent}/{len(ids)}", end="\r")
for postcode, ids in to_process:
sqs.send_message(
QueueUrl=sqs_url,
MessageBody=json.dumps(
{
"property_ids": ids,
"portfolio_id": PORTFOLIO_ID,
"scenario_id": SCENARIO_ID,
"no_solar": NO_SOLAR,
"dry_run": DRY_RUN,
}
),
)
logger.info(f" sent {postcode}: {ids}")
print(f"\ndone — {sent} messages enqueued")
logger.info(f"\ndone — {len(to_process)} messages enqueued")
main()

View file

@ -199,6 +199,99 @@ def test_lodged_epc_path_saves_epc_plan_and_marks_modelled() -> None:
mock_uow.commit.assert_called_once()
def test_skipped_cohort_certs_are_surfaced_in_the_outputs() -> None:
"""Cohort certs the mapper can't consume are skipped (so prediction is not
aborted) and surfaced with cert numbers in the subtask outputs, so the
mapper gaps can be reported and closed."""
from repositories.comparable_properties.epc_comparable_properties_repository import (
SkippedCohortCert,
)
mock_engine = _engine_mock([PROPERTY_ID], [UPRN], [POSTCODE])
mock_plan = _plan_mock()
skipped = [
SkippedCohortCert(
certificate_number="8257-7539-1649-0633-4992",
error="ValueError: RdSapSchema17_1: missing required field 'window'",
)
]
with ExitStack() as stack:
stack.enter_context(
patch("applications.modelling_e2e.handler.os.environ", _ENV)
)
stack.enter_context(
patch(
"applications.modelling_e2e.handler._get_engine",
return_value=mock_engine,
)
)
stack.enter_context(
patch("applications.modelling_e2e.handler.EpcClientService")
).return_value.get_by_uprn.return_value = MagicMock()
stack.enter_context(
patch("applications.modelling_e2e.handler.GeospatialS3Repository")
)
stack.enter_context(
patch("applications.modelling_e2e.handler.GoogleSolarApiClient")
)
stack.enter_context(
patch("applications.modelling_e2e.handler._spatial_for", return_value=None)
)
stack.enter_context(
patch(
"applications.modelling_e2e.handler._solar_insights_for",
return_value=None,
)
)
stack.enter_context(
patch("applications.modelling_e2e.handler.overlays_from", return_value=[])
)
stack.enter_context(
patch("applications.modelling_e2e.handler.PropertyOverridesPostgresReader")
)
stack.enter_context(
patch("applications.modelling_e2e.handler.ScenarioPostgresRepository")
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(
patch(
"applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides"
)
)
stack.enter_context(patch("applications.modelling_e2e.handler.Session"))
stack.enter_context(
patch(
"applications.modelling_e2e.handler.run_modelling",
return_value=mock_plan,
)
)
# The repo accumulated a skipped (unmappable) cohort cert during the run.
stack.enter_context(
patch("applications.modelling_e2e.handler.EpcComparablePropertiesRepository")
).return_value.skipped = skipped
MockUoW = stack.enter_context(
patch("applications.modelling_e2e.handler.PostgresUnitOfWork")
)
MockUoW.return_value.__enter__.return_value = MagicMock()
MockUoW.return_value.__exit__.return_value = False
# Act
result = _call_handler(_BODY)
# Assert — the handler's return (→ subtask outputs.result) carries the cert
# numbers + errors of every skipped cohort cert.
assert result == {
"skipped_unmappable_cohort_certs": [
{
"certificate_number": "8257-7539-1649-0633-4992",
"error": (
"ValueError: RdSapSchema17_1: missing required field 'window'"
),
}
]
}
# ---------------------------------------------------------------------------
# EPC Prediction path
# ---------------------------------------------------------------------------

View file

@ -581,6 +581,28 @@ _EXPECTATIONS: Final[tuple[RealCertExpectation, ...]] = (
cert_num="9978-7098-7226-2633-3994",
sap_score=75,
),
# UPRN 10002468137 → cert 0215-2818-7357-9703-2145. RdSAP-Schema-17.1,
# all-electric high-heat-retention storage heaters on Economy 7, solid-
# brick uninsulated end-terrace. Ground truth is Elmhurst RdSAP10 = 60,
# reproduced on identical inputs (summary + full SAP 10.2 worksheet saved
# alongside: elmhurst_summary.pdf / elmhurst_worksheet.pdf). The engine
# produces 62 — a +2 over-rating localised to OFF-PEAK WATER HEATING:
# the worksheet (lines 243-246) prices the 7-hour off-peak immersion at a
# Table 13 split (19.36% @ 15.29p high + 80.64% @ 5.5p low), but the engine
# prices 100% at the 5.5p low rate, under-costing the bill (£595.68 vs
# £629.67) → lower ECF (2.69 vs 2.84) → SAP 62 not 60. (Space heating 100%
# off-peak IS correct for storage heaters — the worksheet agrees.) Strict
# xfail until the off-peak water-heating rate split is implemented.
RealCertExpectation(
schema="RdSAP-Schema-17.1",
sample="uprn_10002468137",
cert_num="0215-2818-7357-9703-2145",
sap_score=60,
known_bug_xfail=(
"off-peak (7-hour) water-heating high/low rate split not applied — "
"engine prices 100% at the low rate; see elmhurst_worksheet.pdf (243-246)"
),
),
)

View file

@ -9,6 +9,8 @@ from datetime import date
from pathlib import Path
from typing import Any, Optional
import pytest
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from datatypes.epc.search.epc_search_result import EpcSearchResult
@ -129,7 +131,7 @@ def test_no_certs_in_the_postcode_yields_no_candidates() -> None:
# ---------------------------------------------------------------------------
# Broadened cohort — candidates_near (ADR-0031 nearby-postcode broadening)
# Broadened cohort — candidates_near (ADR-0034 nearby-postcode broadening)
# ---------------------------------------------------------------------------
@ -241,3 +243,94 @@ def test_candidates_near_without_a_source_uses_only_the_seed() -> None:
# Assert — degrades to the seed postcode alone.
assert client.searched == ["P0"]
assert [c.certificate_number for c in candidates] == ["CERT-1"]
# ---------------------------------------------------------------------------
# Unmappable cohort certs are skipped + recorded, not allowed to sink the cohort
# ---------------------------------------------------------------------------
class _FakeEpcClientRaising:
"""Serves the cohort, but raises the given exception for specific certs."""
def __init__(
self, results: list[EpcSearchResult], failures: dict[str, Exception]
) -> None:
self._results = results
self._failures = failures
def search_by_postcode(self, postcode: str) -> list[EpcSearchResult]:
return self._results
def get_by_certificate_number(self, cert_num: str) -> EpcPropertyData:
if cert_num in self._failures:
raise self._failures[cert_num]
return _epc()
def test_unmappable_cohort_cert_is_skipped_and_recorded() -> None:
# Arrange — two certs share the postcode; one can't be mapped (the mapper
# raises ValueError for a missing required field, as the gov-API 16.x / sparse
# certs do).
here = Coordinates(longitude=-1.55, latitude=53.81)
client = _FakeEpcClientRaising(
[_result("CERT-OK", uprn=12345), _result("CERT-BAD", uprn=67890)],
failures={
"CERT-BAD": ValueError(
"RdSapSchema17_1: missing required field 'window'"
)
},
)
geospatial = _FakeGeospatial({12345: here})
repo = EpcComparablePropertiesRepository(client, geospatial)
# Act
candidates = repo.candidates_for("LS6 1AA")
# Assert — the unmappable cert is excluded (does NOT sink the cohort) and the
# good one survives; the skip is recorded with the cert number + error so the
# mapper gap can be reported and closed later.
assert [c.certificate_number for c in candidates] == ["CERT-OK"]
assert len(repo.skipped) == 1
assert repo.skipped[0].certificate_number == "CERT-BAD"
assert "missing required field 'window'" in repo.skipped[0].error
def test_malformed_cert_shape_attributeerror_is_skipped_and_recorded() -> None:
# Arrange — a cohort cert lodges a field in the wrong shape (e.g.
# `photovoltaic_supply` as a list), so the mapper raises AttributeError
# ("'list' object has no attribute 'none_or_no_details'") — a mapper-shape
# error like the missing-field ValueErrors, so it must skip + record, not abort.
client = _FakeEpcClientRaising(
[_result("CERT-OK", uprn=12345), _result("CERT-BAD", uprn=67890)],
failures={
"CERT-BAD": AttributeError(
"'list' object has no attribute 'none_or_no_details'"
)
},
)
repo = EpcComparablePropertiesRepository(client, _FakeGeospatial({}))
# Act
candidates = repo.candidates_for("LS6 1AA")
# Assert — bad cert skipped + recorded; good cert survives.
assert [c.certificate_number for c in candidates] == ["CERT-OK"]
assert len(repo.skipped) == 1
assert repo.skipped[0].certificate_number == "CERT-BAD"
assert "none_or_no_details" in repo.skipped[0].error
def test_transient_fetch_error_is_not_swallowed_as_unmappable() -> None:
# Arrange — a non-ValueError (e.g. a transient API failure) must NOT be
# silently skipped as "unmappable"; it propagates so the run fails loudly.
client = _FakeEpcClientRaising(
[_result("CERT-1", uprn=12345)],
failures={"CERT-1": RuntimeError("EPC API error 503")},
)
repo = EpcComparablePropertiesRepository(client, _FakeGeospatial({}))
# Act / Assert
with pytest.raises(RuntimeError, match="503"):
repo.candidates_for("LS6 1AA")
assert repo.skipped == []

View file

@ -2,10 +2,11 @@
Plan Measures to the live ``plan`` / ``recommendation`` tables (ADR-0017).
The Plan is the parent; each selected Plan Measure is a ``recommendation`` row
linked by the new ``plan_id`` FK. A re-run replaces (delete the Plan for the
(property, scenario) cascade its recommendations insert fresh), so the
batch write is idempotent (ADR-0012). CO₂ is stored in tonnes (calculator kg
÷ 1000) to match the live column contract.
linked by the new ``plan_id`` FK. A re-run INSERTs a fresh Plan and keeps the
prior one as history (no cascade delete); when the new Plan is the default it
demotes prior default Plans for the (property, scenario) to ``is_default=False``
(ADR-0012). CO₂ is stored in tonnes (calculator kg ÷ 1000) to match the live
column contract.
"""
from __future__ import annotations
@ -147,31 +148,61 @@ def test_save_persists_null_per_measure_savings_when_unbilled(
assert rec_rows[0].energy_cost_savings is None
def test_save_is_idempotent_on_rerun_for_the_same_property_and_scenario(
def test_rerun_keeps_history_and_demotes_the_prior_default_plan(
db_engine: Engine,
) -> None:
# Arrange — first run
# Arrange — first (default) run
with Session(db_engine) as session:
PlanPostgresRepository(session).save(
first_id = PlanPostgresRepository(session).save(
_plan(), property_id=10, scenario_id=7, portfolio_id=1, is_default=True
)
session.commit()
# Act — re-run the same (property, scenario)
# Act — re-run the same (property, scenario) as the default
with Session(db_engine) as session:
PlanPostgresRepository(session).save(
second_id = PlanPostgresRepository(session).save(
_plan(), property_id=10, scenario_id=7, portfolio_id=1, is_default=True
)
session.commit()
# Assert — replaced, not duplicated (cascade removed the old measures)
# Assert — the prior Plan is kept as history (no delete), and only the new
# Plan is the default; exactly one Plan for the pair stays is_default=True.
with Session(db_engine) as session:
plan_rows = session.exec(
select(PlanModel).where(col(PlanModel.property_id) == 10)
).all()
rec_rows = session.exec(
select(RecommendationModel).where(col(RecommendationModel.property_id) == 10)
).all()
by_id = {p.id: p for p in plan_rows}
assert len(plan_rows) == 1
assert len(rec_rows) == 1
assert len(plan_rows) == 2
assert first_id != second_id
assert by_id[first_id].is_default is False # demoted
assert by_id[second_id].is_default is True # the current default
assert sum(1 for p in plan_rows if p.is_default) == 1
def test_rerun_as_non_default_does_not_demote_the_prior_default(
db_engine: Engine,
) -> None:
# Arrange — a default Plan exists
with Session(db_engine) as session:
first_id = PlanPostgresRepository(session).save(
_plan(), property_id=12, scenario_id=7, portfolio_id=1, is_default=True
)
session.commit()
# Act — re-run as NON-default (e.g. a non-default scenario); no demotion runs
with Session(db_engine) as session:
PlanPostgresRepository(session).save(
_plan(), property_id=12, scenario_id=7, portfolio_id=1, is_default=False
)
session.commit()
# Assert — the prior default is untouched (we only demote when saving a default)
with Session(db_engine) as session:
plan_rows = session.exec(
select(PlanModel).where(col(PlanModel.property_id) == 12)
).all()
by_id = {p.id: p for p in plan_rows}
assert len(plan_rows) == 2
assert by_id[first_id].is_default is True

View file

@ -0,0 +1,104 @@
from collections.abc import Generator, Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any
from uuid import UUID
import pytest
from sqlalchemy import Engine
from sqlmodel import Session
from domain.tasks.tasks import Source
from orchestration.task_orchestrator import TaskOrchestrator
from repositories.tasks.subtask_postgres_repository import SubTaskPostgresRepository
from repositories.tasks.task_postgres_repository import TaskPostgresRepository
from utilities.aws_lambda.task_handler import task_handler
@dataclass
class Harness:
orchestrator: TaskOrchestrator
tasks: TaskPostgresRepository
subtasks: SubTaskPostgresRepository
@contextmanager
def factory(self) -> Generator[TaskOrchestrator, None, None]:
yield self.orchestrator
@pytest.fixture
def harness(db_engine: Engine) -> Iterator[Harness]:
with Session(db_engine) as session:
tasks = TaskPostgresRepository(session=session)
subtasks = SubTaskPostgresRepository(session=session)
yield Harness(
orchestrator=TaskOrchestrator(task_repo=tasks, subtask_repo=subtasks),
tasks=tasks,
subtasks=subtasks,
)
def _direct_event(property_id: str) -> dict[str, Any]:
return {"property_id": property_id}
def test_task_handler_records_cloudwatch_url_on_subtask(
harness: Harness, monkeypatch: pytest.MonkeyPatch
) -> None:
# arrange
monkeypatch.setenv("AWS_REGION", "eu-west-2")
monkeypatch.setenv(
"AWS_LAMBDA_LOG_GROUP_NAME", "/aws/lambda/modelling-e2e"
)
monkeypatch.setenv(
"AWS_LAMBDA_LOG_STREAM_NAME", "2026/05/20/[$LATEST]abc123"
)
@task_handler(
task_source="modelling_e2e",
source=Source.PROPERTY,
orchestrator_cm=harness.factory,
)
def handler(body: dict[str, Any], context: Any) -> None:
return None
# act
result = handler(_direct_event("prop-1"), context=None)
# assert
subtask_id = result[0]["subtask_id"]
saved_url = harness.subtasks.get(UUID(subtask_id)).cloud_logs_url
assert saved_url is not None
assert saved_url.startswith(
"https://eu-west-2.console.aws.amazon.com/cloudwatch/home"
)
# Log group / stream are console-encoded ("/" -> "$252F").
assert "$252Faws$252Flambda$252Fmodelling-e2e" in saved_url
assert "$255B$2524LATEST$255D" in saved_url
def test_task_handler_leaves_cloudwatch_url_unset_outside_lambda(
harness: Harness, monkeypatch: pytest.MonkeyPatch
) -> None:
# arrange
for var in (
"AWS_REGION",
"AWS_LAMBDA_LOG_GROUP_NAME",
"AWS_LAMBDA_LOG_STREAM_NAME",
):
monkeypatch.delenv(var, raising=False)
@task_handler(
task_source="modelling_e2e",
source=Source.PROPERTY,
orchestrator_cm=harness.factory,
)
def handler(body: dict[str, Any], context: Any) -> None:
return None
# act
result = handler(_direct_event("prop-1"), context=None)
# assert
subtask_id = result[0]["subtask_id"]
assert harness.subtasks.get(UUID(subtask_id)).cloud_logs_url is None

View file

@ -0,0 +1,27 @@
"""Build a CloudWatch console deep-link for the running Lambda invocation.
Shared by @task_handler and @subtask_handler so both persist the same
`cloud_logs_url` onto the SubTask they run.
"""
import os
from typing import Optional
from urllib.parse import quote
def _console_encode(value: str) -> str:
return quote(value, safe="").replace("%", "$25")
def cloudwatch_url() -> Optional[str]:
"""Deep-link to this invocation's log stream, or None outside Lambda."""
region = os.environ.get("AWS_REGION")
log_group = os.environ.get("AWS_LAMBDA_LOG_GROUP_NAME")
log_stream = os.environ.get("AWS_LAMBDA_LOG_STREAM_NAME")
if not (region and log_group and log_stream):
return None
return (
f"https://{region}.console.aws.amazon.com/cloudwatch/home"
f"?region={region}#logsV2:log-groups/log-group/"
f"{_console_encode(log_group)}/log-events/{_console_encode(log_stream)}"
)

View file

@ -2,6 +2,7 @@ import os
from collections.abc import Generator
from contextlib import contextmanager
from sqlalchemy.pool import NullPool
from sqlmodel import Session
from infrastructure.postgres.config import PostgresConfig
@ -17,8 +18,11 @@ def default_orchestrator() -> Generator[TaskOrchestrator, None, None]:
Connection params come from os.environ via PostgresConfig.from_env. Each
handler invocation gets its own session, cleaned up on context exit.
NullPool is intentional: a new engine is created on every invocation, so
pooling would accumulate idle connections across warm Lambda containers.
"""
engine = make_engine(PostgresConfig.from_env(dict(os.environ)))
engine = make_engine(PostgresConfig.from_env(dict(os.environ)), poolclass=NullPool)
with Session(engine) as session:
yield TaskOrchestrator(
task_repo=TaskPostgresRepository(session=session),

View file

@ -6,12 +6,11 @@ TaskOrchestrator.run_subtask(...) calls.
import json
import logging
import os
from contextlib import AbstractContextManager
from functools import wraps
from typing import Any, Callable, Optional, cast
from urllib.parse import quote
from utilities.aws_lambda.cloud_logs import cloudwatch_url
from utilities.aws_lambda.default_orchestrator import default_orchestrator
from utilities.aws_lambda.subtask_trigger_body import SubtaskTriggerBody
from orchestration.task_orchestrator import TaskOrchestrator
@ -42,7 +41,7 @@ def subtask_handler(
@wraps(func)
def wrapper(event: dict[str, Any], context: Any) -> None:
cloud_logs_url = _cloudwatch_url()
cloud_logs_url = cloudwatch_url()
with factory() as orchestrator:
for record in _records(event):
body = _parse_body(record)
@ -95,20 +94,3 @@ def _records(event: dict[str, Any]) -> list[dict[str, Any]]:
if isinstance(raw_records, list):
return [r for r in cast(list[Any], raw_records) if isinstance(r, dict)]
return [event]
def _console_encode(value: str) -> str:
return quote(value, safe="").replace("%", "$25")
def _cloudwatch_url() -> Optional[str]:
region = os.environ.get("AWS_REGION")
log_group = os.environ.get("AWS_LAMBDA_LOG_GROUP_NAME")
log_stream = os.environ.get("AWS_LAMBDA_LOG_STREAM_NAME")
if not (region and log_group and log_stream):
return None
return (
f"https://{region}.console.aws.amazon.com/cloudwatch/home"
f"?region={region}#logsV2:log-groups/log-group/"
f"{_console_encode(log_group)}/log-events/{_console_encode(log_stream)}"
)

View file

@ -10,6 +10,7 @@ from contextlib import AbstractContextManager
from functools import wraps
from typing import Any, Callable, Optional, cast
from utilities.aws_lambda.cloud_logs import cloudwatch_url
from utilities.aws_lambda.default_orchestrator import default_orchestrator
from domain.tasks.tasks import Source
from orchestration.task_orchestrator import TaskOrchestrator
@ -41,6 +42,7 @@ def task_handler(
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
def wrapper(event: dict[str, Any], context: Any) -> Any:
cloud_logs_url = cloudwatch_url()
with factory() as orchestrator:
task_ids: list[dict[str, str]] = []
failures: list[dict[str, Any]] = []
@ -66,6 +68,7 @@ def task_handler(
orchestrator.run_subtask(
subtask.id,
work=lambda body=body: func(body, context),
cloud_logs_url=cloud_logs_url,
)
except Exception:
logger.exception(