diff --git a/datatypes/epc/domain/mapper.py b/datatypes/epc/domain/mapper.py index b3fc944f..159c0f92 100644 --- a/datatypes/epc/domain/mapper.py +++ b/datatypes/epc/domain/mapper.py @@ -5,6 +5,7 @@ from decimal import ROUND_HALF_UP, Decimal from typing import Any, Dict, Final, List, Optional, Sequence, TypeVar, Union, cast from datatypes.epc.schema.helpers import from_dict +from datatypes.epc.domain.epc import Epc from datatypes.epc.domain.epc_property_data import ( BASEMENT_WALL_CONSTRUCTION_CODE, Addendum, @@ -2268,61 +2269,53 @@ class EpcPropertyDataMapper: if schema == "RdSAP-Schema-21.0.1": from datatypes.epc.schema.rdsap_schema_21_0_1 import RdSapSchema21_0_1 - return _clear_basement_flag_when_system_built( - EpcPropertyDataMapper.from_rdsap_schema_21_0_1( - from_dict(RdSapSchema21_0_1, data) - ) + mapped = EpcPropertyDataMapper.from_rdsap_schema_21_0_1( + from_dict(RdSapSchema21_0_1, data) ) - if schema == "RdSAP-Schema-21.0.0": + elif schema == "RdSAP-Schema-21.0.0": from datatypes.epc.schema.rdsap_schema_21_0_0 import RdSapSchema21_0_0 - return _clear_basement_flag_when_system_built( - EpcPropertyDataMapper.from_rdsap_schema_21_0_0( - from_dict(RdSapSchema21_0_0, data) - ) + mapped = EpcPropertyDataMapper.from_rdsap_schema_21_0_0( + from_dict(RdSapSchema21_0_0, data) ) - if schema == "RdSAP-Schema-20.0.0": + elif schema == "RdSAP-Schema-20.0.0": from datatypes.epc.schema.rdsap_schema_20_0_0 import RdSapSchema20_0_0 - return _clear_basement_flag_when_system_built( - EpcPropertyDataMapper.from_rdsap_schema_20_0_0( - from_dict(RdSapSchema20_0_0, data) - ) + mapped = EpcPropertyDataMapper.from_rdsap_schema_20_0_0( + from_dict(RdSapSchema20_0_0, data) ) - if schema == "RdSAP-Schema-19.0": + elif schema == "RdSAP-Schema-19.0": from datatypes.epc.schema.rdsap_schema_19_0 import RdSapSchema19_0 - return _clear_basement_flag_when_system_built( - EpcPropertyDataMapper.from_rdsap_schema_19_0( - from_dict(RdSapSchema19_0, data) - ) + mapped = EpcPropertyDataMapper.from_rdsap_schema_19_0( + from_dict(RdSapSchema19_0, data) ) - if schema == "RdSAP-Schema-18.0": + elif schema == "RdSAP-Schema-18.0": from datatypes.epc.schema.rdsap_schema_18_0 import RdSapSchema18_0 - return _clear_basement_flag_when_system_built( - EpcPropertyDataMapper.from_rdsap_schema_18_0( - from_dict(RdSapSchema18_0, data) - ) + mapped = EpcPropertyDataMapper.from_rdsap_schema_18_0( + from_dict(RdSapSchema18_0, data) ) - if schema == "RdSAP-Schema-17.1": + elif schema == "RdSAP-Schema-17.1": from datatypes.epc.schema.rdsap_schema_17_1 import RdSapSchema17_1 - return _clear_basement_flag_when_system_built( - EpcPropertyDataMapper.from_rdsap_schema_17_1( - from_dict(RdSapSchema17_1, data) - ) + mapped = EpcPropertyDataMapper.from_rdsap_schema_17_1( + from_dict(RdSapSchema17_1, data) ) - if schema == "RdSAP-Schema-17.0": + elif schema == "RdSAP-Schema-17.0": from datatypes.epc.schema.rdsap_schema_17_0 import RdSapSchema17_0 - return _clear_basement_flag_when_system_built( - EpcPropertyDataMapper.from_rdsap_schema_17_0( - from_dict(RdSapSchema17_0, data) - ) + mapped = EpcPropertyDataMapper.from_rdsap_schema_17_0( + from_dict(RdSapSchema17_0, data) ) + else: + raise ValueError(f"Unsupported EPC schema: {schema!r}") - raise ValueError(f"Unsupported EPC schema: {schema!r}") + return _clear_basement_flag_when_system_built( + _with_renewable_heat_incentive( + _with_recorded_performance(mapped, data), data + ) + ) # --------------------------------------------------------------------------- @@ -2330,6 +2323,85 @@ class EpcPropertyDataMapper: # --------------------------------------------------------------------------- +def _with_recorded_performance( + epc: EpcPropertyData, data: Dict[str, Any] +) -> EpcPropertyData: + """Overlay the recorded current-performance scalars from the raw API payload. + + The current SAP rating, EPC band, Primary Energy Intensity + (``energy_consumption_current``) and CO2 are top-level fields on every RdSAP + schema response, but only a couple of the per-schema mappers copy them + through (and none map the band). Baseline's Lodged Performance reads all four + off the EPC, so map them here, once, for every schema version. An absent key + leaves the mapped value untouched. + """ + band = data.get("current_energy_efficiency_band") + co2 = data.get("co2_emissions_current") + consumption = data.get("energy_consumption_current") + rating = data.get("energy_rating_current") + return replace( + epc, + current_energy_efficiency_band=( + Epc(band) if band is not None else epc.current_energy_efficiency_band + ), + co2_emissions_current=( + float(co2) if co2 is not None else epc.co2_emissions_current + ), + energy_consumption_current=( + int(consumption) + if consumption is not None + else epc.energy_consumption_current + ), + energy_rating_current=( + int(rating) if rating is not None else epc.energy_rating_current + ), + ) + + +def _with_renewable_heat_incentive( + epc: EpcPropertyData, data: Dict[str, Any] +) -> EpcPropertyData: + """Gap-fill the RHI block (baseline space/water-heating kWh) from the raw + payload. + + The ``renewable_heat_incentive`` object is present on every schema response, + but only the 21.x mappers copy it through; Baseline reads + ``space_heating_kwh`` / ``water_heating_kwh`` off it. Only fills when a mapper + left it unset, and only when the block carries both required kWh figures — + otherwise the EPC is returned untouched. + """ + if epc.renewable_heat_incentive is not None: + return epc + rhi = data.get("renewable_heat_incentive") + if not isinstance(rhi, dict): + return epc + rhi_obj = cast(Dict[str, Any], rhi) + space = rhi_obj.get("space_heating_existing_dwelling") + water = rhi_obj.get("water_heating") + if space is None or water is None: + return epc + return replace( + epc, + renewable_heat_incentive=RenewableHeatIncentive( + space_heating_kwh=float(space), + water_heating_kwh=float(water), + impact_of_loft_insulation_kwh=_optional_float( + rhi_obj.get("impact_of_loft_insulation") + ), + impact_of_cavity_insulation_kwh=_optional_float( + rhi_obj.get("impact_of_cavity_insulation") + ), + impact_of_solid_wall_insulation_kwh=_optional_float( + rhi_obj.get("impact_of_solid_wall_insulation") + ), + ), + ) + + +def _optional_float(value: Any) -> Optional[float]: + return float(value) if value is not None else None + + def _clear_basement_flag_when_system_built( epc: EpcPropertyData, ) -> EpcPropertyData: diff --git a/infrastructure/postgres/property_baseline_performance_table.py b/infrastructure/postgres/property_baseline_performance_table.py index 03906c0c..89019478 100644 --- a/infrastructure/postgres/property_baseline_performance_table.py +++ b/infrastructure/postgres/property_baseline_performance_table.py @@ -72,26 +72,31 @@ class PropertyBaselinePerformanceModel(SQLModel, table=True): space_heating_kwh: float water_heating_kwh: float + # The Fuel Rates snapshot period the bill was priced against (FE-owned column, + # nullable). Not yet threaded through Bill Derivation, so left None for now. + fuel_rates_period: Optional[str] = Field(default=None) + # Bill Derivation block (ADR-0014 §6). Nullable: all None when no calculator - # ran (stub path). The ``bill_`` prefix avoids clashing with the + # ran (stub path). Column names are unprefixed to mirror the FE-owned table — + # the per-section ``heating_kwh`` / ``hot_water_kwh`` do not clash with the # recorded-demand ``space_heating_kwh`` / ``water_heating_kwh`` above. - bill_heating_kwh: Optional[float] = Field(default=None) - bill_heating_cost_gbp: Optional[float] = Field(default=None) - bill_hot_water_kwh: Optional[float] = Field(default=None) - bill_hot_water_cost_gbp: Optional[float] = Field(default=None) - bill_lighting_kwh: Optional[float] = Field(default=None) - bill_lighting_cost_gbp: Optional[float] = Field(default=None) - bill_appliances_kwh: Optional[float] = Field(default=None) - bill_appliances_cost_gbp: Optional[float] = Field(default=None) - bill_cooking_kwh: Optional[float] = Field(default=None) - bill_cooking_cost_gbp: Optional[float] = Field(default=None) - bill_pumps_fans_kwh: Optional[float] = Field(default=None) - bill_pumps_fans_cost_gbp: Optional[float] = Field(default=None) - bill_cooling_kwh: Optional[float] = Field(default=None) - bill_cooling_cost_gbp: Optional[float] = Field(default=None) - bill_standing_charges_gbp: Optional[float] = Field(default=None) - bill_seg_credit_gbp: Optional[float] = Field(default=None) - bill_total_annual_bill_gbp: Optional[float] = Field(default=None) + heating_kwh: Optional[float] = Field(default=None) + heating_cost_gbp: Optional[float] = Field(default=None) + hot_water_kwh: Optional[float] = Field(default=None) + hot_water_cost_gbp: Optional[float] = Field(default=None) + lighting_kwh: Optional[float] = Field(default=None) + lighting_cost_gbp: Optional[float] = Field(default=None) + appliances_kwh: Optional[float] = Field(default=None) + appliances_cost_gbp: Optional[float] = Field(default=None) + cooking_kwh: Optional[float] = Field(default=None) + cooking_cost_gbp: Optional[float] = Field(default=None) + pumps_fans_kwh: Optional[float] = Field(default=None) + pumps_fans_cost_gbp: Optional[float] = Field(default=None) + cooling_kwh: Optional[float] = Field(default=None) + cooling_cost_gbp: Optional[float] = Field(default=None) + standing_charges_gbp: Optional[float] = Field(default=None) + seg_credit_gbp: Optional[float] = Field(default=None) + total_annual_bill_gbp: Optional[float] = Field(default=None) @classmethod def from_domain( @@ -122,15 +127,15 @@ class PropertyBaselinePerformanceModel(SQLModel, table=True): return for section, stem in _SECTION_COLUMN_STEM.items(): cost = bill.sections.get(section) - setattr(self, f"bill_{stem}_kwh", cost.kwh if cost is not None else None) + setattr(self, f"{stem}_kwh", cost.kwh if cost is not None else None) setattr( self, - f"bill_{stem}_cost_gbp", + f"{stem}_cost_gbp", cost.cost_gbp if cost is not None else None, ) - self.bill_standing_charges_gbp = bill.standing_charges_gbp - self.bill_seg_credit_gbp = bill.seg_credit_gbp - self.bill_total_annual_bill_gbp = bill.total_gbp + self.standing_charges_gbp = bill.standing_charges_gbp + self.seg_credit_gbp = bill.seg_credit_gbp + self.total_annual_bill_gbp = bill.total_gbp def to_domain(self) -> PropertyBaselinePerformance: return PropertyBaselinePerformance( @@ -157,18 +162,18 @@ class PropertyBaselinePerformanceModel(SQLModel, table=True): not-None discriminator: a persisted bill always sets it, so its absence means no calculator ran and the bill was None. A section is rebuilt only when its kWh column is not None (paired with its cost).""" - if self.bill_total_annual_bill_gbp is None: + if self.total_annual_bill_gbp is None: return None sections: dict[BillSection, BillSectionCost] = {} for section, stem in _SECTION_COLUMN_STEM.items(): - kwh = cast(Optional[float], getattr(self, f"bill_{stem}_kwh")) + kwh = cast(Optional[float], getattr(self, f"{stem}_kwh")) if kwh is None: continue - cost_gbp = cast(float, getattr(self, f"bill_{stem}_cost_gbp")) + cost_gbp = cast(float, getattr(self, f"{stem}_cost_gbp")) sections[section] = BillSectionCost(kwh=kwh, cost_gbp=cost_gbp) return Bill( sections=sections, - standing_charges_gbp=cast(float, self.bill_standing_charges_gbp), - seg_credit_gbp=cast(float, self.bill_seg_credit_gbp), - total_gbp=self.bill_total_annual_bill_gbp, + standing_charges_gbp=cast(float, self.standing_charges_gbp), + seg_credit_gbp=cast(float, self.seg_credit_gbp), + total_gbp=self.total_annual_bill_gbp, ) diff --git a/infrastructure/postgres/property_table.py b/infrastructure/postgres/property_table.py index c333cad4..56d0b2fa 100644 --- a/infrastructure/postgres/property_table.py +++ b/infrastructure/postgres/property_table.py @@ -1,5 +1,6 @@ from __future__ import annotations +from datetime import datetime from typing import ClassVar, Optional from sqlalchemy import Column @@ -48,3 +49,10 @@ class PropertyRow(SQLModel, table=True): user_inputted_address: Optional[str] = Field(default=None) user_inputted_postcode: Optional[str] = Field(default=None) lexiscore: Optional[float] = Field(default=None) + + # FE-owned columns the modelling pipeline now WRITES to record a run: the old + # engine set `has_recommendations` (engine.py); we mirror that, and bump + # `updated_at` so a run is datable (a first-run under the new process is + # `updated_at >= 2026-06-01`, the cutoff the old pipeline predates). + has_recommendations: Optional[bool] = Field(default=None) + updated_at: Optional[datetime] = Field(default=None) diff --git a/orchestration/modelling_orchestrator.py b/orchestration/modelling_orchestrator.py index 867cb8b2..55ae531d 100644 --- a/orchestration/modelling_orchestrator.py +++ b/orchestration/modelling_orchestrator.py @@ -126,6 +126,7 @@ class ModellingOrchestrator: solar_potential: Optional[SolarPotential] = _solar_potential_for( uow.solar, prop.identity.uprn ) + has_recommendations = False for scenario in scenarios: plan = self._plan_for( scorer, @@ -145,6 +146,13 @@ class ModellingOrchestrator: portfolio_id=portfolio_id, is_default=scenario.is_default, ) + has_recommendations = has_recommendations or bool(plan.measures) + # Record the run on the Property: the old engine's per-Property + # `has_recommendations` marker (true if any Scenario yielded a + # measure), with `updated_at` bumped so the run is datable. + uow.property.mark_modelled( + property_id, has_recommendations=has_recommendations + ) uow.commit() def _plan_for( diff --git a/repositories/epc/epc_postgres_repository.py b/repositories/epc/epc_postgres_repository.py index 8e38c32b..faa86323 100644 --- a/repositories/epc/epc_postgres_repository.py +++ b/repositories/epc/epc_postgres_repository.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections.abc import Sequence -from datetime import date +from datetime import date, datetime from typing import Optional, Protocol, TypeVar from sqlmodel import Session, col, delete, select @@ -57,6 +57,24 @@ def _require(value: Optional[_T], field: str) -> _T: return value +def _as_date(value: object) -> date: + """Normalise an ``epc_property`` date column value to a ``date``. + + The FE-owned date columns (``inspection_date`` / ``completion_date`` / + ``registration_date``) are Postgres ``timestamp``s even though the SQLModel + mirror types them ``str`` (it stores the writer's ``isoformat()`` string). + So a read hands back a ``datetime``, while a value still in flight may be + the ISO string — accept both. + """ + if isinstance(value, datetime): + return value.date() + if isinstance(value, date): + return value + if isinstance(value, str): + return date.fromisoformat(value) + raise TypeError(f"unexpected inspection_date value: {value!r}") + + class _HasEpcPropertyId(Protocol): epc_property_id: int @@ -425,7 +443,7 @@ class EpcPostgresRepository(EpcRepository): return EpcPropertyData( dwelling_type=p.dwelling_type, - inspection_date=date.fromisoformat(p.inspection_date), + inspection_date=_as_date(p.inspection_date), tenure=p.tenure, transaction_type=p.transaction_type, address_line_1=_require(p.address_line_1, "address_line_1"), @@ -480,12 +498,10 @@ class EpcPostgresRepository(EpcRepository): pressure_test=p.pressure_test, language_code=p.language_code, completion_date=( - date.fromisoformat(p.completion_date) if p.completion_date else None + _as_date(p.completion_date) if p.completion_date else None ), registration_date=( - date.fromisoformat(p.registration_date) - if p.registration_date - else None + _as_date(p.registration_date) if p.registration_date else None ), measurement_type=p.measurement_type, conservatory_type=p.conservatory_type, diff --git a/repositories/property/property_postgres_repository.py b/repositories/property/property_postgres_repository.py index 3549d0fc..cca62df6 100644 --- a/repositories/property/property_postgres_repository.py +++ b/repositories/property/property_postgres_repository.py @@ -2,8 +2,9 @@ from __future__ import annotations from typing import Optional, cast -from sqlalchemy import Table +from sqlalchemy import Table, func from sqlalchemy import select as sa_select +from sqlalchemy import update as sa_update from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlmodel import Session, col, select @@ -116,6 +117,17 @@ class PropertyPostgresRepository(PropertyRepository): return {} return self._spatial_repo.get_for_uprns(uprns) + def mark_modelled(self, property_id: int, *, has_recommendations: bool) -> None: + # The old engine set `has_recommendations` per Property; we mirror it and + # bump `updated_at` (DB clock) so a new-process run is datable against the + # 2026-06-01 cutoff. Does not commit — the Unit of Work owns the txn. + stmt = ( + sa_update(self._table) + .where(self._table.c.id == property_id) + .values(has_recommendations=has_recommendations, updated_at=func.now()) + ) + self._session.execute(stmt) # pyright: ignore[reportDeprecated] + def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: if not rows: return 0 diff --git a/repositories/property/property_repository.py b/repositories/property/property_repository.py index 5b22c874..e2f8284a 100644 --- a/repositories/property/property_repository.py +++ b/repositories/property/property_repository.py @@ -47,6 +47,15 @@ class PropertyRepository(ABC): input ids.""" ... + @abstractmethod + def mark_modelled(self, property_id: int, *, has_recommendations: bool) -> None: + """Record that a Property has been run through the modelling pipeline: + set ``has_recommendations`` (the old engine's per-Property marker — true + when the Plan carries measures) and bump ``updated_at`` so the run is + datable (a first-run under the new process is ``updated_at >= + 2026-06-01``). Idempotent — re-running overwrites the same row.""" + ... + @abstractmethod def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: """Bulk-insert identity rows, skipping any whose ``(portfolio_id, uprn)`` diff --git a/scripts/e2e_common.py b/scripts/e2e_common.py new file mode 100644 index 00000000..56c82543 --- /dev/null +++ b/scripts/e2e_common.py @@ -0,0 +1,68 @@ +"""Shared configuration + client plumbing for the local e2e runner scripts +(``run_modelling_e2e`` and ``run_first_run_e2e``). + +Loads ``backend/.env`` and builds the DB engine from the FastAPI-layer ``DB_*`` +vars (the ``infrastructure/postgres`` layer reads ``POSTGRES_*``, which the .env +does not carry), plus an S3-backed ``ParquetReader`` for the geospatial +repository. Secrets live in the .env and the ambient ``~/.aws`` profile; this +module never hard-codes them. +""" + +from __future__ import annotations + +import io +import os +from pathlib import Path +from typing import Any, cast + +import boto3 +import pandas as pd +from sqlalchemy import Engine, create_engine + +from repositories.geospatial.geospatial_s3_repository import ParquetReader + +_REPO_ROOT = Path(__file__).resolve().parents[1] +ENV_PATH = _REPO_ROOT / "backend" / ".env" + + +def load_env(path: Path = ENV_PATH) -> None: + """Load `KEY=value` lines from `backend/.env` into the environment (without + overriding anything already set), so the DB creds + API tokens are present.""" + if not path.exists(): + return + for raw in path.read_text(encoding="utf-8").splitlines(): + line = raw.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'")) + + +def db_url() -> str: + """The connection string from the FastAPI-layer `DB_*` env vars.""" + env = os.environ + return ( + f"postgresql+psycopg2://{env['DB_USERNAME']}:{env['DB_PASSWORD']}" + f"@{env['DB_HOST']}:{env['DB_PORT']}/{env['DB_NAME']}" + ) + + +def build_engine() -> Engine: + """A connection-pooled engine to the target DB (DB_* creds).""" + return create_engine( + db_url(), pool_pre_ping=True, connect_args={"connect_timeout": 10} + ) + + +def s3_parquet_reader(bucket: str) -> ParquetReader: + """A `ParquetReader` (key -> DataFrame) backed by `bucket` in S3, for the + `GeospatialS3Repository`. AWS creds come from the ambient `~/.aws` profile; + pyarrow reads the parquet bytes (s3fs is not installed here).""" + # boto3 ships only partial type stubs, so the client is an untyped boundary. + client = cast(Any, boto3.client("s3")) # pyright: ignore[reportUnknownMemberType] + + def read(key: str) -> pd.DataFrame: + body = cast(bytes, client.get_object(Bucket=bucket, Key=key)["Body"].read()) + return pd.read_parquet(io.BytesIO(body)) + + return read diff --git a/scripts/run_first_run_e2e.py b/scripts/run_first_run_e2e.py new file mode 100644 index 00000000..b4c46f90 --- /dev/null +++ b/scripts/run_first_run_e2e.py @@ -0,0 +1,162 @@ +"""Run the **full** ``AraFirstRunPipeline`` (Ingestion → Baseline → Modelling) +end-to-end against the real database, locally. + +This is the production pipeline the ``ara_first_run`` Lambda runs, driven from a +shell instead of an SQS event. The Lambda ``handler`` itself cannot run locally — +``applications/ara_first_run/handler.py::_source_clients_from_env`` deliberately +raises until the deploy/Terraform wiring lands (#1136). So this script composes +the same pipeline directly via the existing ``build_first_run_pipeline`` seam, +supplying the three source clients that ``run_modelling_e2e`` already proves out +(EPC API, geospatial S3, Google Solar), then calls ``dispatch_first_run``. + +How it differs from ``run_modelling_e2e``: + * It runs the **real Ingestion stage** — fetches each Property's EPC by UPRN, + resolves spatial + Google Solar, and **persists** them (``epc_property`` / + ``property_details_spatial`` / ``solar``) — then Baseline, then Modelling. + ``run_modelling_e2e`` does ingestion inline and only models. + * **There is no inspect-only mode**: the stages persist as they go (ADR-0012), + so any run writes to the DB. This script is gated behind ``--confirm``; without + it the script previews what it would do and exits. + * **The modelling batch is all-or-nothing**: each stage commits once per batch, + so one Property raising aborts the whole batch (no per-Property recovery like + ``run_modelling_e2e``). Make sure the inputs are clean first. + +Measure scoping comes **only from the Scenario's exclusions** — the pipeline +threads no ``--measures`` override (issue #1130). So if the live ``material`` +catalogue cannot price/represent a measure a Property is eligible for (today: +``secondary_heating_removal``, absent from the ``material.type`` enum), that +Property's modelling raises and aborts the batch. Exclude it on the Scenario +first, e.g.:: + + UPDATE scenario SET exclusions = '{secondary_heating_removal}' WHERE id = 1266; + +EPC Prediction (ADR-0031) is left **off** — its Landlord-Override attributes +reader is not wired here, so an EPC-less Property is not gap-filled. + +Config + secrets are loaded exactly as ``run_modelling_e2e`` does: ``backend/.env`` +for the DB creds (``DB_*``), the EPC Bearer token (``OPEN_EPC_API_TOKEN``), the +Google Solar key (``GOOGLE_SOLAR_API_KEY``) and the S3 bucket (``DATA_BUCKET``); +AWS creds from the ambient ``~/.aws`` profile. Run from the worktree root:: + + # preview only (no writes): print what would run, then exit + python -m scripts.run_first_run_e2e --scenario-ids 1266 --portfolio-id 785 \ + 709634 709635 709636 + # actually run the full pipeline and persist (Ingestion -> Baseline -> Modelling) + python -m scripts.run_first_run_e2e --scenario-ids 1266 --portfolio-id 785 \ + --confirm 709634 709635 709636 +""" + +from __future__ import annotations + +import argparse +import os +import sys +from pathlib import Path +from uuid import uuid4 + +_REPO_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap + +from applications.ara_first_run.ara_first_run_trigger_body import ( # noqa: E402 + AraFirstRunTriggerBody, +) +from applications.ara_first_run.handler import ( # noqa: E402 + build_first_run_pipeline, + dispatch_first_run, +) +from infrastructure.epc_client.epc_client_service import EpcClientService # noqa: E402 +from infrastructure.solar.google_solar_api_client import ( # noqa: E402 + GoogleSolarApiClient, +) +from repositories.geospatial.geospatial_s3_repository import ( # noqa: E402 + GeospatialS3Repository, +) +from repositories.postgres_unit_of_work import PostgresUnitOfWork # noqa: E402 +from scripts.e2e_common import ( # noqa: E402 + ENV_PATH, + build_engine, + load_env, + s3_parquet_reader, +) +from sqlmodel import Session # noqa: E402 + + +def _parse_ids(raw: str) -> list[int]: + """Parse a comma-separated id list (e.g. ``--scenario-ids 1266,1270``).""" + return [int(token.strip()) for token in raw.split(",") if token.strip()] + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "property_ids", type=int, nargs="+", help="Property ids to run" + ) + parser.add_argument( + "--scenario-ids", + required=True, + help="comma-separated Scenario ids to model against (exclusions come " + "from each Scenario)", + ) + parser.add_argument( + "--portfolio-id", type=int, required=True, help="portfolio id for the run" + ) + parser.add_argument( + "--confirm", + action="store_true", + default=False, + help="actually run the pipeline and WRITE to the DB (default: preview only)", + ) + args = parser.parse_args() + + scenario_ids = _parse_ids(args.scenario_ids) + + load_env(ENV_PATH) + engine = build_engine() + + body = AraFirstRunTriggerBody( + # task/sub_task drive the Lambda SubTask lifecycle only; running the + # pipeline directly bypasses the @subtask_handler decorator, so synthetic + # ids satisfy validation without touching the task tables. + task_id=uuid4(), + sub_task_id=uuid4(), + portfolio_id=args.portfolio_id, + property_ids=args.property_ids, + scenario_ids=scenario_ids, + ) + + print( + f"full AraFirstRunPipeline (Ingestion -> Baseline -> Modelling) · " + f"{len(args.property_ids)} propertie(s) · scenarios {scenario_ids} · " + f"portfolio {args.portfolio_id}" + ) + if not args.confirm: + print( + "\nPREVIEW ONLY — no writes. This run WOULD fetch + persist EPC/" + "spatial/solar, rebaseline, and model+persist Plans for:\n" + f" properties: {args.property_ids}\n" + "Re-run with --confirm to execute. NOTE: the modelling batch is " + "all-or-nothing; ensure each Scenario excludes any measure the live " + "catalogue cannot price (e.g. secondary_heating_removal)." + ) + return + + epc_fetcher = EpcClientService(os.environ["OPEN_EPC_API_TOKEN"]) + geospatial_repo = GeospatialS3Repository( + s3_parquet_reader(os.environ["DATA_BUCKET"]) + ) + solar_fetcher = GoogleSolarApiClient(os.environ["GOOGLE_SOLAR_API_KEY"]) + + pipeline = build_first_run_pipeline( + unit_of_work=lambda: PostgresUnitOfWork(lambda: Session(engine)), + epc_fetcher=epc_fetcher, + geospatial_repo=geospatial_repo, + solar_fetcher=solar_fetcher, + ) + + print("running... (Ingestion -> Baseline -> Modelling, persisting per stage)\n") + dispatch_first_run(body.model_dump(), pipeline=pipeline) + print("done — EPC/spatial/solar + Baseline + Plans persisted for the batch.") + + +if __name__ == "__main__": + main() diff --git a/scripts/run_modelling_e2e.py b/scripts/run_modelling_e2e.py index a9e39fa6..fb919f82 100644 --- a/scripts/run_modelling_e2e.py +++ b/scripts/run_modelling_e2e.py @@ -53,14 +53,10 @@ Google leg. from __future__ import annotations import argparse -import io import os import sys from pathlib import Path -from typing import Any, Optional, cast - -import boto3 -import pandas as pd +from typing import Any, Optional _REPO_ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap @@ -84,7 +80,6 @@ from infrastructure.solar.google_solar_api_client import ( # noqa: E402 ) from repositories.geospatial.geospatial_s3_repository import ( # noqa: E402 GeospatialS3Repository, - ParquetReader, ) from repositories.product.product_postgres_repository import ( # noqa: E402 ProductPostgresRepository, @@ -93,51 +88,20 @@ from repositories.postgres_unit_of_work import PostgresUnitOfWork # noqa: E402 from repositories.scenario.scenario_postgres_repository import ( # noqa: E402 ScenarioPostgresRepository, ) -from sqlalchemy import Engine, create_engine, text # noqa: E402 +from scripts.e2e_common import ( # noqa: E402 + ENV_PATH, + build_engine, + load_env, + s3_parquet_reader, +) +from sqlalchemy import Engine, text # noqa: E402 from sqlmodel import Session # noqa: E402 -_ENV_PATH = _REPO_ROOT / "backend" / ".env" _MARKDOWN_PATH = Path("modelling_e2e.md") _CSV_PATH = Path("modelling_e2e.csv") _CANDIDATES_CSV_PATH = Path("modelling_e2e_candidates.csv") -def _load_env(path: Path) -> None: - """Load `KEY=value` lines from `backend/.env` into the environment (without - overriding anything already set), so the DB creds + EPC token are present.""" - if not path.exists(): - return - for raw in path.read_text(encoding="utf-8").splitlines(): - line = raw.strip() - if not line or line.startswith("#") or "=" not in line: - continue - key, value = line.split("=", 1) - os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'")) - - -def _db_url() -> str: - """The connection string from the FastAPI-layer `DB_*` env vars.""" - env = os.environ - return ( - f"postgresql+psycopg2://{env['DB_USERNAME']}:{env['DB_PASSWORD']}" - f"@{env['DB_HOST']}:{env['DB_PORT']}/{env['DB_NAME']}" - ) - - -def _s3_parquet_reader(bucket: str) -> ParquetReader: - """A `ParquetReader` (key -> DataFrame) backed by `bucket` in S3, for the - `GeospatialS3Repository`. AWS creds come from the ambient `~/.aws` profile; - pyarrow reads the parquet bytes (s3fs is not installed here).""" - # boto3 ships only partial type stubs, so the client is an untyped boundary. - client = cast(Any, boto3.client("s3")) # pyright: ignore[reportUnknownMemberType] - - def read(key: str) -> pd.DataFrame: - body = cast(bytes, client.get_object(Bucket=bucket, Key=key)["Body"].read()) - return pd.read_parquet(io.BytesIO(body)) - - return read - - def _spatial_for(repo: GeospatialS3Repository, uprn: int) -> Optional[SpatialReference]: """The UPRN's spatial reference (coordinates + planning protections), or None when S3 doesn't cover it — a missing reference must not abort the run, @@ -166,13 +130,6 @@ def _solar_insights_for( return None # no Google solar coverage at this point — model without it -def _engine() -> Engine: - """A connection-pooled engine to DevAssessmentModelDB (DB_* creds).""" - return create_engine( - _db_url(), pool_pre_ping=True, connect_args={"connect_timeout": 10} - ) - - def _uprns_for(engine: Engine, property_ids: list[int]) -> dict[int, Optional[int]]: """Read each Property's UPRN from the DB (read-only).""" with engine.connect() as conn: @@ -334,6 +291,12 @@ def _persist( portfolio_id=portfolio_id, is_default=scenario.is_default, ) + # Mark the Property as run under the new process (old engine's + # `has_recommendations` marker + a bumped `updated_at`); the modelling + # compute above runs on in-memory fakes, so this DB UoW must set it. + uow.property.mark_modelled( + property_id, has_recommendations=bool(plan.measures) + ) uow.commit() @@ -383,14 +346,14 @@ def main() -> None: if args.persist and (args.scenario_id is None or args.portfolio_id is None): parser.error("--persist requires --scenario-id and --portfolio-id") - _load_env(_ENV_PATH) + load_env(ENV_PATH) # The new gov EPC API (Bearer) authenticates with OPEN_EPC_API_TOKEN — the # name is misleading; EPC_AUTH_TOKEN is dead (403). Verified against the # /api/domestic/search endpoint. epc_client = EpcClientService(os.environ["OPEN_EPC_API_TOKEN"]) - geospatial = GeospatialS3Repository(_s3_parquet_reader(os.environ["DATA_BUCKET"])) + geospatial = GeospatialS3Repository(s3_parquet_reader(os.environ["DATA_BUCKET"])) solar_client = GoogleSolarApiClient(os.environ["GOOGLE_SOLAR_API_KEY"]) - engine = _engine() + engine = build_engine() cli_considered = _resolve_considered( _parse_measures(args.measures), _parse_measures(args.exclude_measures) ) diff --git a/tests/orchestration/fakes.py b/tests/orchestration/fakes.py index 72f7cb4b..7b180ca3 100644 --- a/tests/orchestration/fakes.py +++ b/tests/orchestration/fakes.py @@ -63,6 +63,11 @@ class FakePropertyRepo(PropertyRepository): def get_many(self, property_ids: list[int]) -> Properties: return Properties([self._hydrate(property_id) for property_id in property_ids]) + def mark_modelled(self, property_id: int, *, has_recommendations: bool) -> None: + # Record the marker so tests can assert the pipeline set it. + self.modelled: dict[int, bool] = getattr(self, "modelled", {}) + self.modelled[property_id] = has_recommendations + def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: self.inserted: list[PropertyIdentityInsert] = list(rows) return len(rows) diff --git a/tests/orchestration/test_bulk_upload_finaliser_orchestrator.py b/tests/orchestration/test_bulk_upload_finaliser_orchestrator.py index 94742dca..335a3e91 100644 --- a/tests/orchestration/test_bulk_upload_finaliser_orchestrator.py +++ b/tests/orchestration/test_bulk_upload_finaliser_orchestrator.py @@ -50,6 +50,11 @@ class FakePropertyRepository(PropertyRepository): def get_many(self, property_ids: list[int]) -> Properties: # pragma: no cover raise NotImplementedError + def mark_modelled( # pragma: no cover + self, property_id: int, *, has_recommendations: bool + ) -> None: + raise NotImplementedError + class FakeStatusWriter(BulkUploadStatusWriter): def __init__(self) -> None: diff --git a/tests/repositories/property/test_property_repository.py b/tests/repositories/property/test_property_repository.py index c075964f..ab757bde 100644 --- a/tests/repositories/property/test_property_repository.py +++ b/tests/repositories/property/test_property_repository.py @@ -111,3 +111,32 @@ def test_get_many_defaults_to_unrestricted_when_uprn_has_no_spatial_row( # Assert — an uncovered UPRN means unrestricted, not blocked (per legacy # `empty_spatial_df`; ADR-0020). assert properties.items[0].planning_restrictions == PlanningRestrictions() + + +def test_mark_modelled_sets_has_recommendations_and_bumps_updated_at( + db_engine: Engine, +) -> None: + # Arrange — a freshly-inserted property with no run recorded yet. + with Session(db_engine) as session: + row = PropertyRow(portfolio_id=7, uprn=12345) + session.add(row) + session.commit() + property_id = row.id + assert property_id is not None + assert row.has_recommendations is None + assert row.updated_at is None + + # Act — record a run that produced recommendations. + with Session(db_engine) as session: + PropertyPostgresRepository(session).mark_modelled( + property_id, has_recommendations=True + ) + session.commit() + + # Assert — the marker is set and updated_at is stamped, so the run is datable + # against the 2026-06-01 new-process cutoff. + with Session(db_engine) as session: + refreshed = session.get(PropertyRow, property_id) + assert refreshed is not None + assert refreshed.has_recommendations is True + assert refreshed.updated_at is not None