Merge pull request #1261 from Hestia-Homes/feature/e2e-runs

Feature/e2e runs
This commit is contained in:
Jun-te Kim 2026-06-23 12:24:18 +01:00 committed by GitHub
commit 1a080254e7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 445 additions and 67 deletions

View file

@ -12,9 +12,10 @@ When no lodged EPC is found, EPC Prediction (Path 3, ADR-0031) synthesises one
from the postcode cohort. ``_cohort_cache`` is module-level so warm Lambda
containers re-processing the same postcode avoid redundant fetches.
``secondary_heating_removal`` is excluded unconditionally: the live ``material``
catalogue does not yet carry this measure type, causing a crash during catalogue
reads for properties with a lodged secondary heater.
All Measure Types are considered: pricing goes through
``catalogue_with_off_catalogue_overrides`` so the measures the live ``material``
catalogue cannot supply (``secondary_heating_removal``, the glazing and heating
gaps) are priced from the committed off-catalogue overlay instead of crashing.
DB engine is module-scoped so the connection pool is reused across warm
invocations (ADR-0012).
@ -45,7 +46,6 @@ from domain.epc_prediction.prediction_target import build_prediction_target
from domain.geospatial.coordinates import Coordinates
from domain.geospatial.planning_restrictions import PlanningRestrictions
from domain.geospatial.spatial_reference import SpatialReference
from domain.modelling.measure_type import MeasureType
from domain.property.property import Property, PropertyIdentity
from domain.tasks.tasks import Source
from harness.console import run_modelling
@ -67,7 +67,9 @@ from repositories.geospatial.geospatial_s3_repository import (
ParquetReader,
)
from repositories.postgres_unit_of_work import PostgresUnitOfWork
from repositories.product.product_postgres_repository import ProductPostgresRepository
from repositories.product.composite_product_repository import (
catalogue_with_off_catalogue_overrides,
)
from repositories.property.landlord_override_overlays import overlays_from
from repositories.property.override_backed_prediction_attributes_reader import (
OverrideBackedPredictionAttributesReader,
@ -212,7 +214,7 @@ def handler(body: dict[str, Any], context: Any) -> None:
read_session = Session(engine)
try:
scenario = ScenarioPostgresRepository(read_session).get_many([scenario_id])[0]
products = ProductPostgresRepository(read_session)
products = catalogue_with_off_catalogue_overrides(read_session)
errors: list[int] = []
@ -282,21 +284,14 @@ def handler(body: dict[str, Any], context: Any) -> None:
None if no_solar else _solar_insights_for(solar_client, spatial)
)
# secondary_heating_removal is absent from the live material.type
# enum; exclude unconditionally until the catalogue gap is resolved.
# system_tune_up and system_tune_up_zoned have no active product.
considered: Optional[frozenset[MeasureType]] = (
frozenset(MeasureType)
- {MeasureType.SECONDARY_HEATING_REMOVAL}
- {MeasureType.SYSTEM_TUNE_UP_ZONED}
- {MeasureType.SYSTEM_TUNE_UP}
)
# All Measure Types are considered: the off-catalogue overlay
# (catalogue_with_off_catalogue_overrides) prices the measures the
# live material catalogue cannot supply, so none need excluding.
plan = run_modelling(
effective_epc,
planning_restrictions=restrictions,
solar_insights=solar_insights,
considered_measures=considered,
considered_measures=None,
products=products,
scenario=scenario,
print_table=False,

View file

@ -18,6 +18,8 @@ from datatypes.epc.search import EpcSearchResult
class EpcClientService:
BASE_URL = "https://api.get-energy-performance-data.communities.gov.uk"
# The gov API's per-UPRN search latency is variable: usually ~0.2s but with
# intermittent slow spells.
REQUEST_TIMEOUT = 10.0
def __init__(self, auth_token: str) -> None:

View file

@ -11,8 +11,8 @@ from repositories.property_baseline.property_baseline_postgres_repository import
)
from repositories.epc.epc_postgres_repository import EpcPostgresRepository
from repositories.plan.plan_postgres_repository import PlanPostgresRepository
from repositories.product.product_postgres_repository import (
ProductPostgresRepository,
from repositories.product.composite_product_repository import (
catalogue_with_off_catalogue_overrides,
)
from repositories.property.property_postgres_repository import (
PropertyPostgresRepository,
@ -49,7 +49,7 @@ class PostgresUnitOfWork(UnitOfWork):
self.spatial = spatial_repo
self.property_baseline = PropertyBaselinePostgresRepository(self._session)
self.scenario = ScenarioPostgresRepository(self._session)
self.product = ProductPostgresRepository(self._session)
self.product = catalogue_with_off_catalogue_overrides(self._session)
self.plan = PlanPostgresRepository(self._session)
return self

View file

@ -0,0 +1,51 @@
from __future__ import annotations
from pathlib import Path
from sqlmodel import Session
from domain.modelling.product import Product
from repositories.product.product_json_repository import ProductJsonRepository
from repositories.product.product_postgres_repository import (
ProductPostgresRepository,
)
from repositories.product.product_repository import ProductRepository
# The committed off-catalogue cost overlay — Measure Types the live ``material``
# catalogue cannot supply, priced from a static JSON until the ETL stocks them.
OFF_CATALOGUE_COSTS_PATH: Path = Path(__file__).resolve().parent / "off_catalogue_costs.json"
class CompositeProductRepository(ProductRepository):
"""Resolves a Product from an ``override`` source first, then a ``fallback``.
The override holds off-catalogue costs the live ``material`` catalogue cannot
supply today ``secondary_heating_removal``, whose Measure Type the FE-owned
``material.type`` pgEnum does not carry. Querying that value against the DB
raises a ``DataError`` that poisons the session's transaction (it is not a
benign ``ProductNotFound``), so checking the override **first** keeps that
Measure Type away from the catalogue entirely. Every other Measure Type
misses the override and falls through to the catalogue unchanged.
"""
def __init__(
self, override: ProductRepository, fallback: ProductRepository
) -> None:
self._override = override
self._fallback = fallback
def get(self, measure_type: str) -> Product:
product = self._override.get_optional(measure_type)
if product is not None:
return product
return self._fallback.get(measure_type)
def catalogue_with_off_catalogue_overrides(session: Session) -> ProductRepository:
"""The live ``material`` catalogue with the committed off-catalogue JSON costs
layered on top the single composition point both the pipeline's Unit of
Work and the local e2e runner price through."""
return CompositeProductRepository(
override=ProductJsonRepository(OFF_CATALOGUE_COSTS_PATH),
fallback=ProductPostgresRepository(session),
)

View file

@ -0,0 +1,8 @@
{
"secondary_heating_removal": { "unit_cost_per_m2": 270.0 },
"double_glazing": { "unit_cost_per_m2": 550.0 },
"secondary_glazing": { "unit_cost_per_m2": 400.0 },
"gas_boiler_upgrade": { "unit_cost_per_m2": 3000.0 },
"system_tune_up": { "unit_cost_per_m2": 500.0 },
"system_tune_up_zoned": { "unit_cost_per_m2": 900.0 }
}

View file

@ -9,7 +9,12 @@ from S3, and fetches Google Solar — then runs the Modelling stage (every
Recommendation Generator the Optimiser a costed, attributed Plan). The same
local computation runs whether or not you store the result: by default it
persists **nothing** (the run is for inspecting recommendations); pass
`--persist` to write the inputs + the Plan to the DB.
`--persist` to write the inputs + the Plan to the DB. With `--persist`, a
lodged-EPC Property **also** gets its Baseline Performance row written (lodged
vs calculator-effective SAP + the bill block) via the same orchestrator the
first-run pipeline uses run per Property so one bad cert does not abort the
batch. A predicted (EPC-less) Property has no lodged figures, so it gets a Plan
but no baseline row.
To keep the inspected recommendations identical to what gets stored, **both
modes price against the live ``material`` catalogue (read-only)** and model
@ -21,12 +26,9 @@ the run synthesises an Increasing-EPC-to-``--goal`` Scenario with no exclusions.
`--measures` / `--exclude-measures` are optional overlays layered on top of the
Scenario's own exclusions.
KNOWN GOTCHA: the live ``material.type`` enum does not yet carry
``secondary_heating_removal``, so a Property with a lodged secondary heater
crashes the catalogue read for that measure. Until the catalogue stocks it, pass
``--exclude-measures secondary_heating_removal`` (an ASHP row is also absent, but
ASHP is priced off the rate sheet so it degrades to ``material_id=None`` rather
than crashing no flag needed).
``secondary_heating_removal`` is priced from the committed off-catalogue JSON
overlay (the live ``material.type`` enum cannot carry it), so no exclusion flag
is needed. ASHP is priced off the rate sheet (``material_id=None``), also fine.
Config: loads `backend/.env` for the DB creds (`DB_*`), the EPC API token
(`OPEN_EPC_API_TOKEN` the Bearer token for the new gov API), the Google Solar
@ -104,13 +106,24 @@ from repositories.comparable_properties.epc_comparable_properties_repository imp
from repositories.geospatial.geospatial_s3_repository import ( # noqa: E402
GeospatialS3Repository,
)
from repositories.product.composite_product_repository import ( # noqa: E402
catalogue_with_off_catalogue_overrides,
)
from repositories.product.product_repository import ProductRepository # noqa: E402
from repositories.property.override_backed_prediction_attributes_reader import ( # noqa: E402
OverrideBackedPredictionAttributesReader,
)
from repositories.product.product_postgres_repository import ( # noqa: E402
ProductPostgresRepository,
)
from repositories.postgres_unit_of_work import PostgresUnitOfWork # noqa: E402
from orchestration.property_baseline_orchestrator import ( # noqa: E402
PropertyBaselineOrchestrator,
)
from domain.property_baseline.calculator_rebaseliner import ( # noqa: E402
CalculatorRebaseliner,
)
from domain.sap10_calculator.calculator import Sap10Calculator # noqa: E402
from repositories.fuel_rates.fuel_rates_static_file_repository import ( # noqa: E402
FuelRatesStaticFileRepository,
)
from repositories.scenario.scenario_postgres_repository import ( # noqa: E402
ScenarioPostgresRepository,
)
@ -418,6 +431,161 @@ def _predict_epc(
return predicted
def _run_from_db(
args: argparse.Namespace,
*,
engine: Engine,
products: ProductRepository,
scenario: Optional[Scenario],
considered: Optional[frozenset[MeasureType]],
baseline_orchestrator: Optional[PropertyBaselineOrchestrator],
md_path: Path,
csv_path: Path,
candidates_path: Path,
target: str,
measures_note: str,
) -> None:
"""Re-model from already-persisted inputs — **zero gov-API calls**.
Reads each Property's Effective EPC (lodged-or-predicted, overrides folded),
planning protections and solar straight from the DB (a prior ``--persist``
ingestion must have stored them), runs the same modelling, and with
``--persist`` re-writes the Plan and, for lodged-EPC Properties, the
Baseline. A predicted Property has no lodged figures, so it gets no baseline
row (same rule as the live path). One bad property is logged and skipped.
"""
md_lines: list[str] = [f"# Modelling recommendations ({target}, {measures_note})\n"]
csv_rows: list[str] = [
"property_id,uprn,api_sap,baseline_sap,sap_delta,post_sap,measures,"
"measure_types,cost_of_works"
]
candidate_csv_rows: list[str] = [
"property_id,uprn,surface,measure_type,cost_total,contingency_rate,"
"selected,description"
]
total = len(args.property_ids)
run_start = time.monotonic()
errors = 0
for index, property_id in enumerate(args.property_ids, start=1):
elapsed = time.monotonic() - run_start
eta = (elapsed / (index - 1)) * (total - index + 1) if index > 1 else 0.0
print(
f"[{index}/{total}] · {errors} err · elapsed {elapsed / 60:.1f}m "
f"· ETA {eta / 60:.1f}m · property {property_id} (from DB)",
flush=True,
)
try:
with PostgresUnitOfWork(lambda: Session(engine)) as uow:
prop = uow.property.get(property_id)
effective_epc: EpcPropertyData = prop.effective_epc
restrictions: PlanningRestrictions = prop.planning_restrictions
uprn: Optional[int] = prop.identity.uprn
epc: Optional[EpcPropertyData] = prop.epc
solar_insights: Optional[dict[str, Any]] = (
uow.solar.get(uprn) if uprn is not None else None
)
predicted = epc is None
plan: Plan = run_modelling(
effective_epc,
goal_band=args.goal,
planning_restrictions=restrictions,
solar_insights=solar_insights,
considered_measures=considered,
products=products,
scenario=scenario,
print_table=False,
)
candidates: list[Recommendation] = candidate_recommendations(
epc if epc is not None else effective_epc,
planning_restrictions=restrictions,
solar_insights=solar_insights,
considered_measures=considered,
products=products,
)
if args.persist:
assert scenario is not None # guaranteed by the --persist guard
with PostgresUnitOfWork(lambda: Session(engine)) as uow:
uow.plan.save(
plan,
property_id=property_id,
scenario_id=scenario.id,
portfolio_id=args.portfolio_id,
is_default=scenario.is_default,
)
uow.property.mark_modelled(
property_id, has_recommendations=bool(plan.measures)
)
uow.commit()
# Lodged EPC also gets its Baseline Performance re-established from
# the persisted EPC; predicted Properties have no lodged figures.
if epc is not None:
assert baseline_orchestrator is not None
baseline_orchestrator.run([property_id])
except Exception as error: # noqa: BLE001 — one bad property must not stop the run
errors += 1
line = f"property {property_id}: ERROR — {type(error).__name__}: {error}"
print(line + "\n")
md_lines.append(f"## Property {property_id}\n\n`{line}`\n")
csv_rows.append(f"{property_id},,,,,,,ERROR,")
continue
measure_types = [m.measure_type for m in plan.measures]
selected: set[MeasureType] = {m.measure_type for m in plan.measures}
flags = [
name
for name, on in (
("conservation", restrictions.in_conservation_area),
("listed", restrictions.is_listed),
("heritage", restrictions.is_heritage),
)
if on
]
context = (
f"{', '.join(flags) if flags else 'unrestricted'}; "
f"{'solar ✓' if solar_insights is not None else 'no solar'}"
)
source_tag = " · ⚠ PREDICTED (no lodged EPC)" if predicted else ""
candidate_lines = _candidate_lines(candidates, selected)
print(
f"=== Property {property_id} (uprn {uprn}) === "
f"SAP {plan.baseline.sap_continuous:.1f} -> {plan.post_sap_continuous:.1f} "
f"· {len(plan.measures)} measure(s) · £{plan.cost_of_works:,.0f} "
f"· {context}{source_tag}"
)
print(format_plan_table(plan))
md_lines.append(f"## Property {property_id} (uprn {uprn}){source_tag}\n")
md_lines.append(
f"SAP {plan.baseline.sap_continuous:.1f}{plan.post_sap_continuous:.1f} "
f"· {len(plan.measures)} measure(s) · cost £{plan.cost_of_works:,.0f} "
f"· {context}\n"
)
md_lines.append("**Selected Plan**\n")
md_lines.extend(_measure_summary(m) for m in plan.measures)
md_lines.append("")
md_lines.append("**All candidate measures (cost per measure)**\n")
md_lines.extend(candidate_lines)
md_lines.append("")
api_sap: Optional[int] = epc.energy_rating_current if epc is not None else None
calc_sap: float = plan.baseline.sap_continuous
api_cell = "" if api_sap is None else str(api_sap)
delta_cell = "" if api_sap is None else f"{calc_sap - api_sap:.2f}"
csv_rows.append(
f"{property_id},{uprn},{api_cell},{calc_sap:.2f},{delta_cell},"
f"{plan.post_sap_continuous:.2f},{len(plan.measures)},"
f"{'|'.join(measure_types)},{plan.cost_of_works:.0f}"
)
candidate_csv_rows.extend(
_candidate_csv_rows(property_id, uprn, candidates, selected)
)
md_path.write_text("\n".join(md_lines) + "\n", encoding="utf-8")
csv_path.write_text("\n".join(csv_rows) + "\n", encoding="utf-8")
candidates_path.write_text("\n".join(candidate_csv_rows) + "\n", encoding="utf-8")
print(f"wrote {md_path.resolve()}")
print(f"wrote {csv_path.resolve()}")
print(f"wrote {candidates_path.resolve()}")
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
@ -466,6 +634,16 @@ def main() -> None:
"(parent dirs created) instead of ./modelling_e2e.*; lets batched runs "
"keep separate, durable output files",
)
parser.add_argument(
"--from-db",
action="store_true",
default=False,
help="re-model from already-persisted inputs: read each Property's "
"Effective EPC + planning protections + solar from the DB and skip the "
"live EPC/spatial/solar fetch entirely (zero gov-API calls). Requires a "
"prior --persist ingestion run; with --persist it re-writes the Plan "
"(and Baseline for lodged-EPC Properties) without re-fetching.",
)
args = parser.parse_args()
if args.persist and (args.scenario_id is None or args.portfolio_id is None):
@ -519,7 +697,19 @@ def main() -> None:
# One read-only session for the live `material` catalogue, reused across the
# batch so both store and no-store runs price against the same DB rows.
catalogue_session = Session(engine)
products = ProductPostgresRepository(catalogue_session)
products = catalogue_with_off_catalogue_overrides(catalogue_session)
# When persisting, a lodged-EPC Property also gets a Baseline Performance row
# via the production orchestrator (lodged-vs-calculator SAP, the bill block) —
# the same establish-and-persist the first-run pipeline runs, here per Property
# so one bad cert doesn't abort the batch. Predicted Properties have no lodged
# figures, so they are skipped below.
baseline_orchestrator: Optional[PropertyBaselineOrchestrator] = None
if args.persist:
baseline_orchestrator = PropertyBaselineOrchestrator(
unit_of_work=lambda: PostgresUnitOfWork(lambda: Session(engine)),
rebaseliner=CalculatorRebaseliner(Sap10Calculator()),
fuel_rates=FuelRatesStaticFileRepository(),
)
scenario: Optional[Scenario] = (
_scenario_for(catalogue_session, args.scenario_id)
if args.scenario_id is not None
@ -539,14 +729,35 @@ def main() -> None:
)
measures_note = ",".join(sorted(considered)) if considered else "all measures"
mode = "PERSISTING to DB" if args.persist else "no DB writes"
source = "persisted DB inputs" if args.from_db else "live EPC/solar"
print(
f"modelling {len(args.property_ids)} propertie(s) · {target} · {measures_note} · "
f"{mode} (DB material catalogue, live EPC/solar)...\n"
f"{mode} (DB material catalogue, {source})...\n"
)
if args.from_db:
# Read inputs from the DB and skip every live fetcher (no gov-API calls).
# Self-contained loop + file writing; the live path below is left as-is.
_run_from_db(
args,
engine=engine,
products=products,
scenario=scenario,
considered=considered,
baseline_orchestrator=baseline_orchestrator,
md_path=md_path,
csv_path=csv_path,
candidates_path=candidates_path,
target=target,
measures_note=measures_note,
)
catalogue_session.close()
return
md_lines: list[str] = [f"# Modelling recommendations ({target}, {measures_note})\n"]
csv_rows: list[str] = [
"property_id,uprn,baseline_sap,post_sap,measures,measure_types,cost_of_works"
"property_id,uprn,api_sap,baseline_sap,sap_delta,post_sap,measures,"
"measure_types,cost_of_works"
]
candidate_csv_rows: list[str] = [
"property_id,uprn,surface,measure_type,cost_total,contingency_rate,"
@ -690,6 +901,12 @@ def main() -> None:
solar_insights=solar_insights,
plan=plan,
)
# A lodged EPC also gets its Baseline Performance persisted
# (reads the EPC just saved above). Predicted Properties have no
# lodged figures to baseline, so they are skipped.
if epc is not None:
assert baseline_orchestrator is not None
baseline_orchestrator.run([property_id])
except (
Exception
) as error: # noqa: BLE001 — one bad property must not stop the run
@ -702,7 +919,7 @@ def main() -> None:
line = f"property {property_id} (uprn {uprn}): ERROR — {type(error).__name__}: {error}"
print(line + "\n")
md_lines.append(f"## Property {property_id}\n\n`{line}`\n")
csv_rows.append(f"{property_id},{uprn or ''},,,,ERROR,")
csv_rows.append(f"{property_id},{uprn or ''},,,,,,ERROR,")
continue
measure_types = [m.measure_type for m in plan.measures]
@ -737,8 +954,16 @@ def main() -> None:
md_lines.append("**All candidate measures (cost per measure)**\n")
md_lines.extend(candidate_lines)
md_lines.append("")
# api_sap is the lodged/register SAP (off the cert); a predicted Property
# has none, so it and the delta are left blank. baseline_sap is the
# calculator's score on the Effective EPC — the two whose divergence the
# run is for reviewing (mirrors lodged vs effective in the baseline table).
api_sap: Optional[int] = epc.energy_rating_current if epc is not None else None
calc_sap: float = plan.baseline.sap_continuous
api_sap_cell = "" if api_sap is None else str(api_sap)
sap_delta_cell = "" if api_sap is None else f"{calc_sap - api_sap:.2f}"
csv_rows.append(
f"{property_id},{uprn},{plan.baseline.sap_continuous:.2f},"
f"{property_id},{uprn},{api_sap_cell},{calc_sap:.2f},{sap_delta_cell},"
f"{plan.post_sap_continuous:.2f},{len(plan.measures)},"
f"{'|'.join(measure_types)},{plan.cost_of_works:.0f}"
)

View file

@ -167,7 +167,7 @@ def test_lodged_epc_path_saves_epc_plan_and_marks_modelled() -> None:
patch("applications.modelling_e2e.handler.ScenarioPostgresRepository")
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(
patch("applications.modelling_e2e.handler.ProductPostgresRepository")
patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides")
)
stack.enter_context(
patch("applications.modelling_e2e.handler.Session")
@ -285,7 +285,7 @@ def test_prediction_path_saves_plan_without_epc_save() -> None:
patch("applications.modelling_e2e.handler.ScenarioPostgresRepository")
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(
patch("applications.modelling_e2e.handler.ProductPostgresRepository")
patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides")
)
stack.enter_context(
patch("applications.modelling_e2e.handler.Session")
@ -381,7 +381,7 @@ def test_empty_cohort_gates_property_out_and_raises() -> None:
patch("applications.modelling_e2e.handler.ScenarioPostgresRepository")
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(
patch("applications.modelling_e2e.handler.ProductPostgresRepository")
patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides")
)
stack.enter_context(patch("applications.modelling_e2e.handler.Session"))
MockUoW = stack.enter_context(
@ -462,7 +462,7 @@ def test_partial_batch_failure_raises_runtime_error_listing_failed_ids() -> None
patch("applications.modelling_e2e.handler.ScenarioPostgresRepository")
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(
patch("applications.modelling_e2e.handler.ProductPostgresRepository")
patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides")
)
stack.enter_context(patch("applications.modelling_e2e.handler.Session"))
stack.enter_context(
@ -576,7 +576,7 @@ def test_cohort_cache_prevents_duplicate_candidates_for_calls() -> None:
patch("applications.modelling_e2e.handler.ScenarioPostgresRepository")
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(
patch("applications.modelling_e2e.handler.ProductPostgresRepository")
patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides")
)
stack.enter_context(patch("applications.modelling_e2e.handler.Session"))
stack.enter_context(
@ -659,7 +659,7 @@ def test_dry_run_skips_all_db_writes() -> None:
patch("applications.modelling_e2e.handler.ScenarioPostgresRepository")
).return_value.get_many.return_value = [MagicMock()]
stack.enter_context(
patch("applications.modelling_e2e.handler.ProductPostgresRepository")
patch("applications.modelling_e2e.handler.catalogue_with_off_catalogue_overrides")
)
stack.enter_context(patch("applications.modelling_e2e.handler.Session"))
stack.enter_context(

View file

@ -192,14 +192,8 @@ def test_first_run_baselines_through_repos_and_is_idempotent_on_rerun(
is_active=True,
description="Zoned heating controls + cylinder tune-up",
),
MaterialRow(
id=9,
type="secondary_heating_removal",
total_cost=250.0,
cost_unit="gbp_per_unit",
is_active=True,
description="Secondary heating removal",
),
# secondary_heating_removal is off-catalogue (priced from the
# JSON overlay, not the pgEnum-constrained material table).
]
)
session.commit()
@ -324,14 +318,10 @@ def test_modelling_optimises_and_persists_a_multi_measure_plan(
is_active=True,
description="LED bulb",
),
MaterialRow(
id=9,
type="secondary_heating_removal",
total_cost=250.0,
cost_unit="gbp_per_unit",
is_active=True,
description="Secondary heating removal",
),
# No secondary_heating_removal row: the FE-owned ``material.type``
# pgEnum cannot carry that Measure Type, so it is priced from the
# committed off-catalogue JSON overlay (£270, no material_id) that
# the Unit of Work layers over the catalogue, not from the DB.
]
)
session.commit()
@ -402,13 +392,17 @@ def test_modelling_optimises_and_persists_a_multi_measure_plan(
"air_source_heat_pump",
"secondary_heating_removal",
}
# Each persisted measure carries the catalogue id of the Product it installs
# Each catalogue-sourced measure carries the id of the Product it installs
# (the MaterialRow ids seeded above), replacing the retired
# recommendation_materials BOM with a single material_id on the row.
assert by_type["air_source_heat_pump"].material_id == 5
assert by_type["suspended_floor_insulation"].material_id == 2
assert by_type["low_energy_lighting"].material_id == 4
assert by_type["secondary_heating_removal"].material_id == 9
# Secondary heating removal is priced from the off-catalogue JSON overlay
# (£270 flat per-dwelling, ADR-0028), so it carries no catalogue material_id.
assert by_type["secondary_heating_removal"].material_id is None
assert by_type["secondary_heating_removal"].estimated_cost is not None
assert abs(by_type["secondary_heating_removal"].estimated_cost - 270.0) <= 1e-6
for rec in rec_rows:
assert rec.default is True
assert rec.already_installed is False
@ -498,14 +492,8 @@ def test_modelling_recommends_nothing_when_already_at_the_target_band(
is_active=True,
description="LED bulb",
),
MaterialRow(
id=9,
type="secondary_heating_removal",
total_cost=250.0,
cost_unit="gbp_per_unit",
is_active=True,
description="Secondary heating removal",
),
# secondary_heating_removal is off-catalogue (priced from the
# JSON overlay, not the pgEnum-constrained material table).
]
)
session.commit()

View file

@ -0,0 +1,109 @@
"""Behaviour of the composite ProductRepository: an off-catalogue override
source checked first, the live catalogue as fallback. The override holds costs
the ``material`` catalogue cannot supply (today ``secondary_heating_removal``,
whose Measure Type the ``material.type`` pgEnum does not carry). Resolving it
from the override keeps that Measure Type away from the DB entirely, while every
other Measure Type falls through to the catalogue unchanged."""
import pytest
from domain.modelling.product import Product
from repositories.product.composite_product_repository import (
OFF_CATALOGUE_COSTS_PATH,
CompositeProductRepository,
)
from repositories.product.product_json_repository import ProductJsonRepository
from repositories.product.product_repository import ProductNotFound, ProductRepository
class _StaticRepo(ProductRepository):
"""A ProductRepository over a fixed measure-type → Product map."""
def __init__(self, products: dict[str, Product]) -> None:
self._products = products
def get(self, measure_type: str) -> Product:
product = self._products.get(measure_type)
if product is None:
raise ProductNotFound(f"no product for measure type {measure_type!r}")
return product
class _ExplodingRepo(ProductRepository):
"""A fallback that must never be reached — any read fails the test loudly,
standing in for the catalogue whose pgEnum query would poison the session."""
def get(self, measure_type: str) -> Product:
raise AssertionError(
f"fallback was queried for {measure_type!r}; the override should have "
"resolved it without touching the catalogue"
)
def _product(measure_type: str, unit_cost: float) -> Product:
return Product(
measure_type=measure_type, unit_cost_per_m2=unit_cost, contingency_rate=0.25
)
def test_get_returns_the_override_without_touching_the_fallback() -> None:
# Arrange — the override supplies the off-catalogue cost; the fallback would
# raise if consulted (mirroring the live pgEnum DataError on this type).
override = _StaticRepo(
{"secondary_heating_removal": _product("secondary_heating_removal", 270.0)}
)
repo = CompositeProductRepository(override=override, fallback=_ExplodingRepo())
# Act
product: Product = repo.get("secondary_heating_removal")
# Assert
assert product.measure_type == "secondary_heating_removal"
assert abs(product.unit_cost_per_m2 - 270.0) <= 1e-9
def test_get_falls_through_to_the_fallback_when_the_override_misses() -> None:
# Arrange — the override has nothing for this type; the catalogue does.
override = _StaticRepo({})
fallback = _StaticRepo(
{"cavity_wall_insulation": _product("cavity_wall_insulation", 18.5)}
)
repo = CompositeProductRepository(override=override, fallback=fallback)
# Act
product: Product = repo.get("cavity_wall_insulation")
# Assert
assert product.measure_type == "cavity_wall_insulation"
assert abs(product.unit_cost_per_m2 - 18.5) <= 1e-9
def test_get_raises_product_not_found_when_neither_source_has_it() -> None:
# Arrange
repo = CompositeProductRepository(
override=_StaticRepo({}), fallback=_StaticRepo({})
)
# Act / Assert
with pytest.raises(ProductNotFound):
repo.get("loft_insulation")
def test_committed_override_file_prices_secondary_heating_removal() -> None:
# The committed overlay carries the flat per-dwelling decommission cost
# (ADR-0028, ported from the legacy heater_removal: (£25 + £200) × 1.2 VAT);
# contingency 0.25 is joined from config.
product: Product = ProductJsonRepository(OFF_CATALOGUE_COSTS_PATH).get(
"secondary_heating_removal"
)
assert abs(product.unit_cost_per_m2 - 270.0) <= 1e-9
assert abs(product.contingency_rate - 0.25) <= 1e-9
def test_committed_override_file_prices_the_glazing_gaps() -> None:
# Glazing is absent from the live catalogue (no active rows), so the overlay
# carries per-window costs the generator uses directly (count x unit cost).
repo = ProductJsonRepository(OFF_CATALOGUE_COSTS_PATH)
assert abs(repo.get("double_glazing").unit_cost_per_m2 - 550.0) <= 1e-9
assert abs(repo.get("secondary_glazing").unit_cost_per_m2 - 400.0) <= 1e-9