Merge pull request #1244 from Hestia-Homes/feature/e2e-runs

Feature/e2e runs
This commit is contained in:
Jun-te Kim 2026-06-17 09:48:55 +01:00 committed by GitHub
commit b11864a990
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 487 additions and 125 deletions

View file

@ -5,6 +5,7 @@ from decimal import ROUND_HALF_UP, Decimal
from typing import Any, Dict, Final, List, Optional, Sequence, TypeVar, Union, cast
from datatypes.epc.schema.helpers import from_dict
from datatypes.epc.domain.epc import Epc
from datatypes.epc.domain.epc_property_data import (
BASEMENT_WALL_CONSTRUCTION_CODE,
Addendum,
@ -2268,61 +2269,53 @@ class EpcPropertyDataMapper:
if schema == "RdSAP-Schema-21.0.1":
from datatypes.epc.schema.rdsap_schema_21_0_1 import RdSapSchema21_0_1
return _clear_basement_flag_when_system_built(
EpcPropertyDataMapper.from_rdsap_schema_21_0_1(
from_dict(RdSapSchema21_0_1, data)
)
mapped = EpcPropertyDataMapper.from_rdsap_schema_21_0_1(
from_dict(RdSapSchema21_0_1, data)
)
if schema == "RdSAP-Schema-21.0.0":
elif schema == "RdSAP-Schema-21.0.0":
from datatypes.epc.schema.rdsap_schema_21_0_0 import RdSapSchema21_0_0
return _clear_basement_flag_when_system_built(
EpcPropertyDataMapper.from_rdsap_schema_21_0_0(
from_dict(RdSapSchema21_0_0, data)
)
mapped = EpcPropertyDataMapper.from_rdsap_schema_21_0_0(
from_dict(RdSapSchema21_0_0, data)
)
if schema == "RdSAP-Schema-20.0.0":
elif schema == "RdSAP-Schema-20.0.0":
from datatypes.epc.schema.rdsap_schema_20_0_0 import RdSapSchema20_0_0
return _clear_basement_flag_when_system_built(
EpcPropertyDataMapper.from_rdsap_schema_20_0_0(
from_dict(RdSapSchema20_0_0, data)
)
mapped = EpcPropertyDataMapper.from_rdsap_schema_20_0_0(
from_dict(RdSapSchema20_0_0, data)
)
if schema == "RdSAP-Schema-19.0":
elif schema == "RdSAP-Schema-19.0":
from datatypes.epc.schema.rdsap_schema_19_0 import RdSapSchema19_0
return _clear_basement_flag_when_system_built(
EpcPropertyDataMapper.from_rdsap_schema_19_0(
from_dict(RdSapSchema19_0, data)
)
mapped = EpcPropertyDataMapper.from_rdsap_schema_19_0(
from_dict(RdSapSchema19_0, data)
)
if schema == "RdSAP-Schema-18.0":
elif schema == "RdSAP-Schema-18.0":
from datatypes.epc.schema.rdsap_schema_18_0 import RdSapSchema18_0
return _clear_basement_flag_when_system_built(
EpcPropertyDataMapper.from_rdsap_schema_18_0(
from_dict(RdSapSchema18_0, data)
)
mapped = EpcPropertyDataMapper.from_rdsap_schema_18_0(
from_dict(RdSapSchema18_0, data)
)
if schema == "RdSAP-Schema-17.1":
elif schema == "RdSAP-Schema-17.1":
from datatypes.epc.schema.rdsap_schema_17_1 import RdSapSchema17_1
return _clear_basement_flag_when_system_built(
EpcPropertyDataMapper.from_rdsap_schema_17_1(
from_dict(RdSapSchema17_1, data)
)
mapped = EpcPropertyDataMapper.from_rdsap_schema_17_1(
from_dict(RdSapSchema17_1, data)
)
if schema == "RdSAP-Schema-17.0":
elif schema == "RdSAP-Schema-17.0":
from datatypes.epc.schema.rdsap_schema_17_0 import RdSapSchema17_0
return _clear_basement_flag_when_system_built(
EpcPropertyDataMapper.from_rdsap_schema_17_0(
from_dict(RdSapSchema17_0, data)
)
mapped = EpcPropertyDataMapper.from_rdsap_schema_17_0(
from_dict(RdSapSchema17_0, data)
)
else:
raise ValueError(f"Unsupported EPC schema: {schema!r}")
raise ValueError(f"Unsupported EPC schema: {schema!r}")
return _clear_basement_flag_when_system_built(
_with_renewable_heat_incentive(
_with_recorded_performance(mapped, data), data
)
)
# ---------------------------------------------------------------------------
@ -2330,6 +2323,85 @@ class EpcPropertyDataMapper:
# ---------------------------------------------------------------------------
def _with_recorded_performance(
epc: EpcPropertyData, data: Dict[str, Any]
) -> EpcPropertyData:
"""Overlay the recorded current-performance scalars from the raw API payload.
The current SAP rating, EPC band, Primary Energy Intensity
(``energy_consumption_current``) and CO2 are top-level fields on every RdSAP
schema response, but only a couple of the per-schema mappers copy them
through (and none map the band). Baseline's Lodged Performance reads all four
off the EPC, so map them here, once, for every schema version. An absent key
leaves the mapped value untouched.
"""
band = data.get("current_energy_efficiency_band")
co2 = data.get("co2_emissions_current")
consumption = data.get("energy_consumption_current")
rating = data.get("energy_rating_current")
return replace(
epc,
current_energy_efficiency_band=(
Epc(band) if band is not None else epc.current_energy_efficiency_band
),
co2_emissions_current=(
float(co2) if co2 is not None else epc.co2_emissions_current
),
energy_consumption_current=(
int(consumption)
if consumption is not None
else epc.energy_consumption_current
),
energy_rating_current=(
int(rating) if rating is not None else epc.energy_rating_current
),
)
def _with_renewable_heat_incentive(
epc: EpcPropertyData, data: Dict[str, Any]
) -> EpcPropertyData:
"""Gap-fill the RHI block (baseline space/water-heating kWh) from the raw
payload.
The ``renewable_heat_incentive`` object is present on every schema response,
but only the 21.x mappers copy it through; Baseline reads
``space_heating_kwh`` / ``water_heating_kwh`` off it. Only fills when a mapper
left it unset, and only when the block carries both required kWh figures
otherwise the EPC is returned untouched.
"""
if epc.renewable_heat_incentive is not None:
return epc
rhi = data.get("renewable_heat_incentive")
if not isinstance(rhi, dict):
return epc
rhi_obj = cast(Dict[str, Any], rhi)
space = rhi_obj.get("space_heating_existing_dwelling")
water = rhi_obj.get("water_heating")
if space is None or water is None:
return epc
return replace(
epc,
renewable_heat_incentive=RenewableHeatIncentive(
space_heating_kwh=float(space),
water_heating_kwh=float(water),
impact_of_loft_insulation_kwh=_optional_float(
rhi_obj.get("impact_of_loft_insulation")
),
impact_of_cavity_insulation_kwh=_optional_float(
rhi_obj.get("impact_of_cavity_insulation")
),
impact_of_solid_wall_insulation_kwh=_optional_float(
rhi_obj.get("impact_of_solid_wall_insulation")
),
),
)
def _optional_float(value: Any) -> Optional[float]:
return float(value) if value is not None else None
def _clear_basement_flag_when_system_built(
epc: EpcPropertyData,
) -> EpcPropertyData:

View file

@ -72,26 +72,31 @@ class PropertyBaselinePerformanceModel(SQLModel, table=True):
space_heating_kwh: float
water_heating_kwh: float
# The Fuel Rates snapshot period the bill was priced against (FE-owned column,
# nullable). Not yet threaded through Bill Derivation, so left None for now.
fuel_rates_period: Optional[str] = Field(default=None)
# Bill Derivation block (ADR-0014 §6). Nullable: all None when no calculator
# ran (stub path). The ``bill_`` prefix avoids clashing with the
# ran (stub path). Column names are unprefixed to mirror the FE-owned table —
# the per-section ``heating_kwh`` / ``hot_water_kwh`` do not clash with the
# recorded-demand ``space_heating_kwh`` / ``water_heating_kwh`` above.
bill_heating_kwh: Optional[float] = Field(default=None)
bill_heating_cost_gbp: Optional[float] = Field(default=None)
bill_hot_water_kwh: Optional[float] = Field(default=None)
bill_hot_water_cost_gbp: Optional[float] = Field(default=None)
bill_lighting_kwh: Optional[float] = Field(default=None)
bill_lighting_cost_gbp: Optional[float] = Field(default=None)
bill_appliances_kwh: Optional[float] = Field(default=None)
bill_appliances_cost_gbp: Optional[float] = Field(default=None)
bill_cooking_kwh: Optional[float] = Field(default=None)
bill_cooking_cost_gbp: Optional[float] = Field(default=None)
bill_pumps_fans_kwh: Optional[float] = Field(default=None)
bill_pumps_fans_cost_gbp: Optional[float] = Field(default=None)
bill_cooling_kwh: Optional[float] = Field(default=None)
bill_cooling_cost_gbp: Optional[float] = Field(default=None)
bill_standing_charges_gbp: Optional[float] = Field(default=None)
bill_seg_credit_gbp: Optional[float] = Field(default=None)
bill_total_annual_bill_gbp: Optional[float] = Field(default=None)
heating_kwh: Optional[float] = Field(default=None)
heating_cost_gbp: Optional[float] = Field(default=None)
hot_water_kwh: Optional[float] = Field(default=None)
hot_water_cost_gbp: Optional[float] = Field(default=None)
lighting_kwh: Optional[float] = Field(default=None)
lighting_cost_gbp: Optional[float] = Field(default=None)
appliances_kwh: Optional[float] = Field(default=None)
appliances_cost_gbp: Optional[float] = Field(default=None)
cooking_kwh: Optional[float] = Field(default=None)
cooking_cost_gbp: Optional[float] = Field(default=None)
pumps_fans_kwh: Optional[float] = Field(default=None)
pumps_fans_cost_gbp: Optional[float] = Field(default=None)
cooling_kwh: Optional[float] = Field(default=None)
cooling_cost_gbp: Optional[float] = Field(default=None)
standing_charges_gbp: Optional[float] = Field(default=None)
seg_credit_gbp: Optional[float] = Field(default=None)
total_annual_bill_gbp: Optional[float] = Field(default=None)
@classmethod
def from_domain(
@ -122,15 +127,15 @@ class PropertyBaselinePerformanceModel(SQLModel, table=True):
return
for section, stem in _SECTION_COLUMN_STEM.items():
cost = bill.sections.get(section)
setattr(self, f"bill_{stem}_kwh", cost.kwh if cost is not None else None)
setattr(self, f"{stem}_kwh", cost.kwh if cost is not None else None)
setattr(
self,
f"bill_{stem}_cost_gbp",
f"{stem}_cost_gbp",
cost.cost_gbp if cost is not None else None,
)
self.bill_standing_charges_gbp = bill.standing_charges_gbp
self.bill_seg_credit_gbp = bill.seg_credit_gbp
self.bill_total_annual_bill_gbp = bill.total_gbp
self.standing_charges_gbp = bill.standing_charges_gbp
self.seg_credit_gbp = bill.seg_credit_gbp
self.total_annual_bill_gbp = bill.total_gbp
def to_domain(self) -> PropertyBaselinePerformance:
return PropertyBaselinePerformance(
@ -157,18 +162,18 @@ class PropertyBaselinePerformanceModel(SQLModel, table=True):
not-None discriminator: a persisted bill always sets it, so its absence
means no calculator ran and the bill was None. A section is rebuilt only
when its kWh column is not None (paired with its cost)."""
if self.bill_total_annual_bill_gbp is None:
if self.total_annual_bill_gbp is None:
return None
sections: dict[BillSection, BillSectionCost] = {}
for section, stem in _SECTION_COLUMN_STEM.items():
kwh = cast(Optional[float], getattr(self, f"bill_{stem}_kwh"))
kwh = cast(Optional[float], getattr(self, f"{stem}_kwh"))
if kwh is None:
continue
cost_gbp = cast(float, getattr(self, f"bill_{stem}_cost_gbp"))
cost_gbp = cast(float, getattr(self, f"{stem}_cost_gbp"))
sections[section] = BillSectionCost(kwh=kwh, cost_gbp=cost_gbp)
return Bill(
sections=sections,
standing_charges_gbp=cast(float, self.bill_standing_charges_gbp),
seg_credit_gbp=cast(float, self.bill_seg_credit_gbp),
total_gbp=self.bill_total_annual_bill_gbp,
standing_charges_gbp=cast(float, self.standing_charges_gbp),
seg_credit_gbp=cast(float, self.seg_credit_gbp),
total_gbp=self.total_annual_bill_gbp,
)

View file

@ -1,5 +1,6 @@
from __future__ import annotations
from datetime import datetime
from typing import ClassVar, Optional
from sqlalchemy import Column
@ -48,3 +49,10 @@ class PropertyRow(SQLModel, table=True):
user_inputted_address: Optional[str] = Field(default=None)
user_inputted_postcode: Optional[str] = Field(default=None)
lexiscore: Optional[float] = Field(default=None)
# FE-owned columns the modelling pipeline now WRITES to record a run: the old
# engine set `has_recommendations` (engine.py); we mirror that, and bump
# `updated_at` so a run is datable (a first-run under the new process is
# `updated_at >= 2026-06-01`, the cutoff the old pipeline predates).
has_recommendations: Optional[bool] = Field(default=None)
updated_at: Optional[datetime] = Field(default=None)

View file

@ -126,6 +126,7 @@ class ModellingOrchestrator:
solar_potential: Optional[SolarPotential] = _solar_potential_for(
uow.solar, prop.identity.uprn
)
has_recommendations = False
for scenario in scenarios:
plan = self._plan_for(
scorer,
@ -145,6 +146,13 @@ class ModellingOrchestrator:
portfolio_id=portfolio_id,
is_default=scenario.is_default,
)
has_recommendations = has_recommendations or bool(plan.measures)
# Record the run on the Property: the old engine's per-Property
# `has_recommendations` marker (true if any Scenario yielded a
# measure), with `updated_at` bumped so the run is datable.
uow.property.mark_modelled(
property_id, has_recommendations=has_recommendations
)
uow.commit()
def _plan_for(

View file

@ -1,7 +1,7 @@
from __future__ import annotations
from collections.abc import Sequence
from datetime import date
from datetime import date, datetime
from typing import Optional, Protocol, TypeVar
from sqlmodel import Session, col, delete, select
@ -57,6 +57,24 @@ def _require(value: Optional[_T], field: str) -> _T:
return value
def _as_date(value: object) -> date:
"""Normalise an ``epc_property`` date column value to a ``date``.
The FE-owned date columns (``inspection_date`` / ``completion_date`` /
``registration_date``) are Postgres ``timestamp``s even though the SQLModel
mirror types them ``str`` (it stores the writer's ``isoformat()`` string).
So a read hands back a ``datetime``, while a value still in flight may be
the ISO string accept both.
"""
if isinstance(value, datetime):
return value.date()
if isinstance(value, date):
return value
if isinstance(value, str):
return date.fromisoformat(value)
raise TypeError(f"unexpected inspection_date value: {value!r}")
class _HasEpcPropertyId(Protocol):
epc_property_id: int
@ -425,7 +443,7 @@ class EpcPostgresRepository(EpcRepository):
return EpcPropertyData(
dwelling_type=p.dwelling_type,
inspection_date=date.fromisoformat(p.inspection_date),
inspection_date=_as_date(p.inspection_date),
tenure=p.tenure,
transaction_type=p.transaction_type,
address_line_1=_require(p.address_line_1, "address_line_1"),
@ -480,12 +498,10 @@ class EpcPostgresRepository(EpcRepository):
pressure_test=p.pressure_test,
language_code=p.language_code,
completion_date=(
date.fromisoformat(p.completion_date) if p.completion_date else None
_as_date(p.completion_date) if p.completion_date else None
),
registration_date=(
date.fromisoformat(p.registration_date)
if p.registration_date
else None
_as_date(p.registration_date) if p.registration_date else None
),
measurement_type=p.measurement_type,
conservatory_type=p.conservatory_type,

View file

@ -2,8 +2,9 @@ from __future__ import annotations
from typing import Optional, cast
from sqlalchemy import Table
from sqlalchemy import Table, func
from sqlalchemy import select as sa_select
from sqlalchemy import update as sa_update
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlmodel import Session, col, select
@ -116,6 +117,17 @@ class PropertyPostgresRepository(PropertyRepository):
return {}
return self._spatial_repo.get_for_uprns(uprns)
def mark_modelled(self, property_id: int, *, has_recommendations: bool) -> None:
# The old engine set `has_recommendations` per Property; we mirror it and
# bump `updated_at` (DB clock) so a new-process run is datable against the
# 2026-06-01 cutoff. Does not commit — the Unit of Work owns the txn.
stmt = (
sa_update(self._table)
.where(self._table.c.id == property_id)
.values(has_recommendations=has_recommendations, updated_at=func.now())
)
self._session.execute(stmt) # pyright: ignore[reportDeprecated]
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
if not rows:
return 0

View file

@ -47,6 +47,15 @@ class PropertyRepository(ABC):
input ids."""
...
@abstractmethod
def mark_modelled(self, property_id: int, *, has_recommendations: bool) -> None:
"""Record that a Property has been run through the modelling pipeline:
set ``has_recommendations`` (the old engine's per-Property marker — true
when the Plan carries measures) and bump ``updated_at`` so the run is
datable (a first-run under the new process is ``updated_at >=
2026-06-01``). Idempotent re-running overwrites the same row."""
...
@abstractmethod
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
"""Bulk-insert identity rows, skipping any whose ``(portfolio_id, uprn)``

68
scripts/e2e_common.py Normal file
View file

@ -0,0 +1,68 @@
"""Shared configuration + client plumbing for the local e2e runner scripts
(``run_modelling_e2e`` and ``run_first_run_e2e``).
Loads ``backend/.env`` and builds the DB engine from the FastAPI-layer ``DB_*``
vars (the ``infrastructure/postgres`` layer reads ``POSTGRES_*``, which the .env
does not carry), plus an S3-backed ``ParquetReader`` for the geospatial
repository. Secrets live in the .env and the ambient ``~/.aws`` profile; this
module never hard-codes them.
"""
from __future__ import annotations
import io
import os
from pathlib import Path
from typing import Any, cast
import boto3
import pandas as pd
from sqlalchemy import Engine, create_engine
from repositories.geospatial.geospatial_s3_repository import ParquetReader
_REPO_ROOT = Path(__file__).resolve().parents[1]
ENV_PATH = _REPO_ROOT / "backend" / ".env"
def load_env(path: Path = ENV_PATH) -> None:
"""Load `KEY=value` lines from `backend/.env` into the environment (without
overriding anything already set), so the DB creds + API tokens are present."""
if not path.exists():
return
for raw in path.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'"))
def db_url() -> str:
"""The connection string from the FastAPI-layer `DB_*` env vars."""
env = os.environ
return (
f"postgresql+psycopg2://{env['DB_USERNAME']}:{env['DB_PASSWORD']}"
f"@{env['DB_HOST']}:{env['DB_PORT']}/{env['DB_NAME']}"
)
def build_engine() -> Engine:
"""A connection-pooled engine to the target DB (DB_* creds)."""
return create_engine(
db_url(), pool_pre_ping=True, connect_args={"connect_timeout": 10}
)
def s3_parquet_reader(bucket: str) -> ParquetReader:
"""A `ParquetReader` (key -> DataFrame) backed by `bucket` in S3, for the
`GeospatialS3Repository`. AWS creds come from the ambient `~/.aws` profile;
pyarrow reads the parquet bytes (s3fs is not installed here)."""
# boto3 ships only partial type stubs, so the client is an untyped boundary.
client = cast(Any, boto3.client("s3")) # pyright: ignore[reportUnknownMemberType]
def read(key: str) -> pd.DataFrame:
body = cast(bytes, client.get_object(Bucket=bucket, Key=key)["Body"].read())
return pd.read_parquet(io.BytesIO(body))
return read

View file

@ -0,0 +1,162 @@
"""Run the **full** ``AraFirstRunPipeline`` (Ingestion → Baseline → Modelling)
end-to-end against the real database, locally.
This is the production pipeline the ``ara_first_run`` Lambda runs, driven from a
shell instead of an SQS event. The Lambda ``handler`` itself cannot run locally
``applications/ara_first_run/handler.py::_source_clients_from_env`` deliberately
raises until the deploy/Terraform wiring lands (#1136). So this script composes
the same pipeline directly via the existing ``build_first_run_pipeline`` seam,
supplying the three source clients that ``run_modelling_e2e`` already proves out
(EPC API, geospatial S3, Google Solar), then calls ``dispatch_first_run``.
How it differs from ``run_modelling_e2e``:
* It runs the **real Ingestion stage** fetches each Property's EPC by UPRN,
resolves spatial + Google Solar, and **persists** them (``epc_property`` /
``property_details_spatial`` / ``solar``) then Baseline, then Modelling.
``run_modelling_e2e`` does ingestion inline and only models.
* **There is no inspect-only mode**: the stages persist as they go (ADR-0012),
so any run writes to the DB. This script is gated behind ``--confirm``; without
it the script previews what it would do and exits.
* **The modelling batch is all-or-nothing**: each stage commits once per batch,
so one Property raising aborts the whole batch (no per-Property recovery like
``run_modelling_e2e``). Make sure the inputs are clean first.
Measure scoping comes **only from the Scenario's exclusions** — the pipeline
threads no ``--measures`` override (issue #1130). So if the live ``material``
catalogue cannot price/represent a measure a Property is eligible for (today:
``secondary_heating_removal``, absent from the ``material.type`` enum), that
Property's modelling raises and aborts the batch. Exclude it on the Scenario
first, e.g.::
UPDATE scenario SET exclusions = '{secondary_heating_removal}' WHERE id = 1266;
EPC Prediction (ADR-0031) is left **off** its Landlord-Override attributes
reader is not wired here, so an EPC-less Property is not gap-filled.
Config + secrets are loaded exactly as ``run_modelling_e2e`` does: ``backend/.env``
for the DB creds (``DB_*``), the EPC Bearer token (``OPEN_EPC_API_TOKEN``), the
Google Solar key (``GOOGLE_SOLAR_API_KEY``) and the S3 bucket (``DATA_BUCKET``);
AWS creds from the ambient ``~/.aws`` profile. Run from the worktree root::
# preview only (no writes): print what would run, then exit
python -m scripts.run_first_run_e2e --scenario-ids 1266 --portfolio-id 785 \
709634 709635 709636
# actually run the full pipeline and persist (Ingestion -> Baseline -> Modelling)
python -m scripts.run_first_run_e2e --scenario-ids 1266 --portfolio-id 785 \
--confirm 709634 709635 709636
"""
from __future__ import annotations
import argparse
import os
import sys
from pathlib import Path
from uuid import uuid4
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
from applications.ara_first_run.ara_first_run_trigger_body import ( # noqa: E402
AraFirstRunTriggerBody,
)
from applications.ara_first_run.handler import ( # noqa: E402
build_first_run_pipeline,
dispatch_first_run,
)
from infrastructure.epc_client.epc_client_service import EpcClientService # noqa: E402
from infrastructure.solar.google_solar_api_client import ( # noqa: E402
GoogleSolarApiClient,
)
from repositories.geospatial.geospatial_s3_repository import ( # noqa: E402
GeospatialS3Repository,
)
from repositories.postgres_unit_of_work import PostgresUnitOfWork # noqa: E402
from scripts.e2e_common import ( # noqa: E402
ENV_PATH,
build_engine,
load_env,
s3_parquet_reader,
)
from sqlmodel import Session # noqa: E402
def _parse_ids(raw: str) -> list[int]:
"""Parse a comma-separated id list (e.g. ``--scenario-ids 1266,1270``)."""
return [int(token.strip()) for token in raw.split(",") if token.strip()]
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"property_ids", type=int, nargs="+", help="Property ids to run"
)
parser.add_argument(
"--scenario-ids",
required=True,
help="comma-separated Scenario ids to model against (exclusions come "
"from each Scenario)",
)
parser.add_argument(
"--portfolio-id", type=int, required=True, help="portfolio id for the run"
)
parser.add_argument(
"--confirm",
action="store_true",
default=False,
help="actually run the pipeline and WRITE to the DB (default: preview only)",
)
args = parser.parse_args()
scenario_ids = _parse_ids(args.scenario_ids)
load_env(ENV_PATH)
engine = build_engine()
body = AraFirstRunTriggerBody(
# task/sub_task drive the Lambda SubTask lifecycle only; running the
# pipeline directly bypasses the @subtask_handler decorator, so synthetic
# ids satisfy validation without touching the task tables.
task_id=uuid4(),
sub_task_id=uuid4(),
portfolio_id=args.portfolio_id,
property_ids=args.property_ids,
scenario_ids=scenario_ids,
)
print(
f"full AraFirstRunPipeline (Ingestion -> Baseline -> Modelling) · "
f"{len(args.property_ids)} propertie(s) · scenarios {scenario_ids} · "
f"portfolio {args.portfolio_id}"
)
if not args.confirm:
print(
"\nPREVIEW ONLY — no writes. This run WOULD fetch + persist EPC/"
"spatial/solar, rebaseline, and model+persist Plans for:\n"
f" properties: {args.property_ids}\n"
"Re-run with --confirm to execute. NOTE: the modelling batch is "
"all-or-nothing; ensure each Scenario excludes any measure the live "
"catalogue cannot price (e.g. secondary_heating_removal)."
)
return
epc_fetcher = EpcClientService(os.environ["OPEN_EPC_API_TOKEN"])
geospatial_repo = GeospatialS3Repository(
s3_parquet_reader(os.environ["DATA_BUCKET"])
)
solar_fetcher = GoogleSolarApiClient(os.environ["GOOGLE_SOLAR_API_KEY"])
pipeline = build_first_run_pipeline(
unit_of_work=lambda: PostgresUnitOfWork(lambda: Session(engine)),
epc_fetcher=epc_fetcher,
geospatial_repo=geospatial_repo,
solar_fetcher=solar_fetcher,
)
print("running... (Ingestion -> Baseline -> Modelling, persisting per stage)\n")
dispatch_first_run(body.model_dump(), pipeline=pipeline)
print("done — EPC/spatial/solar + Baseline + Plans persisted for the batch.")
if __name__ == "__main__":
main()

View file

@ -53,14 +53,10 @@ Google leg.
from __future__ import annotations
import argparse
import io
import os
import sys
from pathlib import Path
from typing import Any, Optional, cast
import boto3
import pandas as pd
from typing import Any, Optional
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
@ -84,7 +80,6 @@ from infrastructure.solar.google_solar_api_client import ( # noqa: E402
)
from repositories.geospatial.geospatial_s3_repository import ( # noqa: E402
GeospatialS3Repository,
ParquetReader,
)
from repositories.product.product_postgres_repository import ( # noqa: E402
ProductPostgresRepository,
@ -93,51 +88,20 @@ from repositories.postgres_unit_of_work import PostgresUnitOfWork # noqa: E402
from repositories.scenario.scenario_postgres_repository import ( # noqa: E402
ScenarioPostgresRepository,
)
from sqlalchemy import Engine, create_engine, text # noqa: E402
from scripts.e2e_common import ( # noqa: E402
ENV_PATH,
build_engine,
load_env,
s3_parquet_reader,
)
from sqlalchemy import Engine, text # noqa: E402
from sqlmodel import Session # noqa: E402
_ENV_PATH = _REPO_ROOT / "backend" / ".env"
_MARKDOWN_PATH = Path("modelling_e2e.md")
_CSV_PATH = Path("modelling_e2e.csv")
_CANDIDATES_CSV_PATH = Path("modelling_e2e_candidates.csv")
def _load_env(path: Path) -> None:
"""Load `KEY=value` lines from `backend/.env` into the environment (without
overriding anything already set), so the DB creds + EPC token are present."""
if not path.exists():
return
for raw in path.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'"))
def _db_url() -> str:
"""The connection string from the FastAPI-layer `DB_*` env vars."""
env = os.environ
return (
f"postgresql+psycopg2://{env['DB_USERNAME']}:{env['DB_PASSWORD']}"
f"@{env['DB_HOST']}:{env['DB_PORT']}/{env['DB_NAME']}"
)
def _s3_parquet_reader(bucket: str) -> ParquetReader:
"""A `ParquetReader` (key -> DataFrame) backed by `bucket` in S3, for the
`GeospatialS3Repository`. AWS creds come from the ambient `~/.aws` profile;
pyarrow reads the parquet bytes (s3fs is not installed here)."""
# boto3 ships only partial type stubs, so the client is an untyped boundary.
client = cast(Any, boto3.client("s3")) # pyright: ignore[reportUnknownMemberType]
def read(key: str) -> pd.DataFrame:
body = cast(bytes, client.get_object(Bucket=bucket, Key=key)["Body"].read())
return pd.read_parquet(io.BytesIO(body))
return read
def _spatial_for(repo: GeospatialS3Repository, uprn: int) -> Optional[SpatialReference]:
"""The UPRN's spatial reference (coordinates + planning protections), or
None when S3 doesn't cover it — a missing reference must not abort the run,
@ -166,13 +130,6 @@ def _solar_insights_for(
return None # no Google solar coverage at this point — model without it
def _engine() -> Engine:
"""A connection-pooled engine to DevAssessmentModelDB (DB_* creds)."""
return create_engine(
_db_url(), pool_pre_ping=True, connect_args={"connect_timeout": 10}
)
def _uprns_for(engine: Engine, property_ids: list[int]) -> dict[int, Optional[int]]:
"""Read each Property's UPRN from the DB (read-only)."""
with engine.connect() as conn:
@ -334,6 +291,12 @@ def _persist(
portfolio_id=portfolio_id,
is_default=scenario.is_default,
)
# Mark the Property as run under the new process (old engine's
# `has_recommendations` marker + a bumped `updated_at`); the modelling
# compute above runs on in-memory fakes, so this DB UoW must set it.
uow.property.mark_modelled(
property_id, has_recommendations=bool(plan.measures)
)
uow.commit()
@ -383,14 +346,14 @@ def main() -> None:
if args.persist and (args.scenario_id is None or args.portfolio_id is None):
parser.error("--persist requires --scenario-id and --portfolio-id")
_load_env(_ENV_PATH)
load_env(ENV_PATH)
# The new gov EPC API (Bearer) authenticates with OPEN_EPC_API_TOKEN — the
# name is misleading; EPC_AUTH_TOKEN is dead (403). Verified against the
# /api/domestic/search endpoint.
epc_client = EpcClientService(os.environ["OPEN_EPC_API_TOKEN"])
geospatial = GeospatialS3Repository(_s3_parquet_reader(os.environ["DATA_BUCKET"]))
geospatial = GeospatialS3Repository(s3_parquet_reader(os.environ["DATA_BUCKET"]))
solar_client = GoogleSolarApiClient(os.environ["GOOGLE_SOLAR_API_KEY"])
engine = _engine()
engine = build_engine()
cli_considered = _resolve_considered(
_parse_measures(args.measures), _parse_measures(args.exclude_measures)
)

View file

@ -63,6 +63,11 @@ class FakePropertyRepo(PropertyRepository):
def get_many(self, property_ids: list[int]) -> Properties:
return Properties([self._hydrate(property_id) for property_id in property_ids])
def mark_modelled(self, property_id: int, *, has_recommendations: bool) -> None:
# Record the marker so tests can assert the pipeline set it.
self.modelled: dict[int, bool] = getattr(self, "modelled", {})
self.modelled[property_id] = has_recommendations
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
self.inserted: list[PropertyIdentityInsert] = list(rows)
return len(rows)

View file

@ -50,6 +50,11 @@ class FakePropertyRepository(PropertyRepository):
def get_many(self, property_ids: list[int]) -> Properties: # pragma: no cover
raise NotImplementedError
def mark_modelled( # pragma: no cover
self, property_id: int, *, has_recommendations: bool
) -> None:
raise NotImplementedError
class FakeStatusWriter(BulkUploadStatusWriter):
def __init__(self) -> None:

View file

@ -111,3 +111,32 @@ def test_get_many_defaults_to_unrestricted_when_uprn_has_no_spatial_row(
# Assert — an uncovered UPRN means unrestricted, not blocked (per legacy
# `empty_spatial_df`; ADR-0020).
assert properties.items[0].planning_restrictions == PlanningRestrictions()
def test_mark_modelled_sets_has_recommendations_and_bumps_updated_at(
db_engine: Engine,
) -> None:
# Arrange — a freshly-inserted property with no run recorded yet.
with Session(db_engine) as session:
row = PropertyRow(portfolio_id=7, uprn=12345)
session.add(row)
session.commit()
property_id = row.id
assert property_id is not None
assert row.has_recommendations is None
assert row.updated_at is None
# Act — record a run that produced recommendations.
with Session(db_engine) as session:
PropertyPostgresRepository(session).mark_modelled(
property_id, has_recommendations=True
)
session.commit()
# Assert — the marker is set and updated_at is stamped, so the run is datable
# against the 2026-06-01 new-process cutoff.
with Session(db_engine) as session:
refreshed = session.get(PropertyRow, property_id)
assert refreshed is not None
assert refreshed.has_recommendations is True
assert refreshed.updated_at is not None