From ea72ee97bf7b7a8bad4febf052a2f90df9502115 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 16 Jun 2026 23:45:23 +0000 Subject: [PATCH] feat(scripts): add full AraFirstRunPipeline local runner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit scripts/run_first_run_e2e.py runs the real Ingestion -> Baseline -> Modelling pipeline against the DB by composing build_first_run_pipeline + dispatch_first_run with the live source clients (the Lambda handler can't run locally — its _source_clients_from_env still raises, #1136). Unlike run_modelling_e2e it runs real ingestion (persists EPC/spatial/solar) and has no inspect-only mode, so it's gated behind --confirm (preview otherwise); measure scoping comes only from the Scenario's exclusions (the pipeline threads no --measures), and the modelling batch is all-or-nothing, both documented. Extract the shared env/engine/S3 plumbing into scripts/e2e_common.py (public load_env/build_engine/s3_parquet_reader) so both runners share one source and neither imports the other's privates. Co-Authored-By: Claude Opus 4.8 --- scripts/e2e_common.py | 68 +++++++++++++++ scripts/run_first_run_e2e.py | 162 +++++++++++++++++++++++++++++++++++ scripts/run_modelling_e2e.py | 65 +++----------- 3 files changed, 241 insertions(+), 54 deletions(-) create mode 100644 scripts/e2e_common.py create mode 100644 scripts/run_first_run_e2e.py diff --git a/scripts/e2e_common.py b/scripts/e2e_common.py new file mode 100644 index 00000000..56c82543 --- /dev/null +++ b/scripts/e2e_common.py @@ -0,0 +1,68 @@ +"""Shared configuration + client plumbing for the local e2e runner scripts +(``run_modelling_e2e`` and ``run_first_run_e2e``). + +Loads ``backend/.env`` and builds the DB engine from the FastAPI-layer ``DB_*`` +vars (the ``infrastructure/postgres`` layer reads ``POSTGRES_*``, which the .env +does not carry), plus an S3-backed ``ParquetReader`` for the geospatial +repository. Secrets live in the .env and the ambient ``~/.aws`` profile; this +module never hard-codes them. +""" + +from __future__ import annotations + +import io +import os +from pathlib import Path +from typing import Any, cast + +import boto3 +import pandas as pd +from sqlalchemy import Engine, create_engine + +from repositories.geospatial.geospatial_s3_repository import ParquetReader + +_REPO_ROOT = Path(__file__).resolve().parents[1] +ENV_PATH = _REPO_ROOT / "backend" / ".env" + + +def load_env(path: Path = ENV_PATH) -> None: + """Load `KEY=value` lines from `backend/.env` into the environment (without + overriding anything already set), so the DB creds + API tokens are present.""" + if not path.exists(): + return + for raw in path.read_text(encoding="utf-8").splitlines(): + line = raw.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'")) + + +def db_url() -> str: + """The connection string from the FastAPI-layer `DB_*` env vars.""" + env = os.environ + return ( + f"postgresql+psycopg2://{env['DB_USERNAME']}:{env['DB_PASSWORD']}" + f"@{env['DB_HOST']}:{env['DB_PORT']}/{env['DB_NAME']}" + ) + + +def build_engine() -> Engine: + """A connection-pooled engine to the target DB (DB_* creds).""" + return create_engine( + db_url(), pool_pre_ping=True, connect_args={"connect_timeout": 10} + ) + + +def s3_parquet_reader(bucket: str) -> ParquetReader: + """A `ParquetReader` (key -> DataFrame) backed by `bucket` in S3, for the + `GeospatialS3Repository`. AWS creds come from the ambient `~/.aws` profile; + pyarrow reads the parquet bytes (s3fs is not installed here).""" + # boto3 ships only partial type stubs, so the client is an untyped boundary. + client = cast(Any, boto3.client("s3")) # pyright: ignore[reportUnknownMemberType] + + def read(key: str) -> pd.DataFrame: + body = cast(bytes, client.get_object(Bucket=bucket, Key=key)["Body"].read()) + return pd.read_parquet(io.BytesIO(body)) + + return read diff --git a/scripts/run_first_run_e2e.py b/scripts/run_first_run_e2e.py new file mode 100644 index 00000000..b4c46f90 --- /dev/null +++ b/scripts/run_first_run_e2e.py @@ -0,0 +1,162 @@ +"""Run the **full** ``AraFirstRunPipeline`` (Ingestion → Baseline → Modelling) +end-to-end against the real database, locally. + +This is the production pipeline the ``ara_first_run`` Lambda runs, driven from a +shell instead of an SQS event. The Lambda ``handler`` itself cannot run locally — +``applications/ara_first_run/handler.py::_source_clients_from_env`` deliberately +raises until the deploy/Terraform wiring lands (#1136). So this script composes +the same pipeline directly via the existing ``build_first_run_pipeline`` seam, +supplying the three source clients that ``run_modelling_e2e`` already proves out +(EPC API, geospatial S3, Google Solar), then calls ``dispatch_first_run``. + +How it differs from ``run_modelling_e2e``: + * It runs the **real Ingestion stage** — fetches each Property's EPC by UPRN, + resolves spatial + Google Solar, and **persists** them (``epc_property`` / + ``property_details_spatial`` / ``solar``) — then Baseline, then Modelling. + ``run_modelling_e2e`` does ingestion inline and only models. + * **There is no inspect-only mode**: the stages persist as they go (ADR-0012), + so any run writes to the DB. This script is gated behind ``--confirm``; without + it the script previews what it would do and exits. + * **The modelling batch is all-or-nothing**: each stage commits once per batch, + so one Property raising aborts the whole batch (no per-Property recovery like + ``run_modelling_e2e``). Make sure the inputs are clean first. + +Measure scoping comes **only from the Scenario's exclusions** — the pipeline +threads no ``--measures`` override (issue #1130). So if the live ``material`` +catalogue cannot price/represent a measure a Property is eligible for (today: +``secondary_heating_removal``, absent from the ``material.type`` enum), that +Property's modelling raises and aborts the batch. Exclude it on the Scenario +first, e.g.:: + + UPDATE scenario SET exclusions = '{secondary_heating_removal}' WHERE id = 1266; + +EPC Prediction (ADR-0031) is left **off** — its Landlord-Override attributes +reader is not wired here, so an EPC-less Property is not gap-filled. + +Config + secrets are loaded exactly as ``run_modelling_e2e`` does: ``backend/.env`` +for the DB creds (``DB_*``), the EPC Bearer token (``OPEN_EPC_API_TOKEN``), the +Google Solar key (``GOOGLE_SOLAR_API_KEY``) and the S3 bucket (``DATA_BUCKET``); +AWS creds from the ambient ``~/.aws`` profile. Run from the worktree root:: + + # preview only (no writes): print what would run, then exit + python -m scripts.run_first_run_e2e --scenario-ids 1266 --portfolio-id 785 \ + 709634 709635 709636 + # actually run the full pipeline and persist (Ingestion -> Baseline -> Modelling) + python -m scripts.run_first_run_e2e --scenario-ids 1266 --portfolio-id 785 \ + --confirm 709634 709635 709636 +""" + +from __future__ import annotations + +import argparse +import os +import sys +from pathlib import Path +from uuid import uuid4 + +_REPO_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap + +from applications.ara_first_run.ara_first_run_trigger_body import ( # noqa: E402 + AraFirstRunTriggerBody, +) +from applications.ara_first_run.handler import ( # noqa: E402 + build_first_run_pipeline, + dispatch_first_run, +) +from infrastructure.epc_client.epc_client_service import EpcClientService # noqa: E402 +from infrastructure.solar.google_solar_api_client import ( # noqa: E402 + GoogleSolarApiClient, +) +from repositories.geospatial.geospatial_s3_repository import ( # noqa: E402 + GeospatialS3Repository, +) +from repositories.postgres_unit_of_work import PostgresUnitOfWork # noqa: E402 +from scripts.e2e_common import ( # noqa: E402 + ENV_PATH, + build_engine, + load_env, + s3_parquet_reader, +) +from sqlmodel import Session # noqa: E402 + + +def _parse_ids(raw: str) -> list[int]: + """Parse a comma-separated id list (e.g. ``--scenario-ids 1266,1270``).""" + return [int(token.strip()) for token in raw.split(",") if token.strip()] + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "property_ids", type=int, nargs="+", help="Property ids to run" + ) + parser.add_argument( + "--scenario-ids", + required=True, + help="comma-separated Scenario ids to model against (exclusions come " + "from each Scenario)", + ) + parser.add_argument( + "--portfolio-id", type=int, required=True, help="portfolio id for the run" + ) + parser.add_argument( + "--confirm", + action="store_true", + default=False, + help="actually run the pipeline and WRITE to the DB (default: preview only)", + ) + args = parser.parse_args() + + scenario_ids = _parse_ids(args.scenario_ids) + + load_env(ENV_PATH) + engine = build_engine() + + body = AraFirstRunTriggerBody( + # task/sub_task drive the Lambda SubTask lifecycle only; running the + # pipeline directly bypasses the @subtask_handler decorator, so synthetic + # ids satisfy validation without touching the task tables. + task_id=uuid4(), + sub_task_id=uuid4(), + portfolio_id=args.portfolio_id, + property_ids=args.property_ids, + scenario_ids=scenario_ids, + ) + + print( + f"full AraFirstRunPipeline (Ingestion -> Baseline -> Modelling) · " + f"{len(args.property_ids)} propertie(s) · scenarios {scenario_ids} · " + f"portfolio {args.portfolio_id}" + ) + if not args.confirm: + print( + "\nPREVIEW ONLY — no writes. This run WOULD fetch + persist EPC/" + "spatial/solar, rebaseline, and model+persist Plans for:\n" + f" properties: {args.property_ids}\n" + "Re-run with --confirm to execute. NOTE: the modelling batch is " + "all-or-nothing; ensure each Scenario excludes any measure the live " + "catalogue cannot price (e.g. secondary_heating_removal)." + ) + return + + epc_fetcher = EpcClientService(os.environ["OPEN_EPC_API_TOKEN"]) + geospatial_repo = GeospatialS3Repository( + s3_parquet_reader(os.environ["DATA_BUCKET"]) + ) + solar_fetcher = GoogleSolarApiClient(os.environ["GOOGLE_SOLAR_API_KEY"]) + + pipeline = build_first_run_pipeline( + unit_of_work=lambda: PostgresUnitOfWork(lambda: Session(engine)), + epc_fetcher=epc_fetcher, + geospatial_repo=geospatial_repo, + solar_fetcher=solar_fetcher, + ) + + print("running... (Ingestion -> Baseline -> Modelling, persisting per stage)\n") + dispatch_first_run(body.model_dump(), pipeline=pipeline) + print("done — EPC/spatial/solar + Baseline + Plans persisted for the batch.") + + +if __name__ == "__main__": + main() diff --git a/scripts/run_modelling_e2e.py b/scripts/run_modelling_e2e.py index 6dab17d4..fb919f82 100644 --- a/scripts/run_modelling_e2e.py +++ b/scripts/run_modelling_e2e.py @@ -53,14 +53,10 @@ Google leg. from __future__ import annotations import argparse -import io import os import sys from pathlib import Path -from typing import Any, Optional, cast - -import boto3 -import pandas as pd +from typing import Any, Optional _REPO_ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap @@ -84,7 +80,6 @@ from infrastructure.solar.google_solar_api_client import ( # noqa: E402 ) from repositories.geospatial.geospatial_s3_repository import ( # noqa: E402 GeospatialS3Repository, - ParquetReader, ) from repositories.product.product_postgres_repository import ( # noqa: E402 ProductPostgresRepository, @@ -93,51 +88,20 @@ from repositories.postgres_unit_of_work import PostgresUnitOfWork # noqa: E402 from repositories.scenario.scenario_postgres_repository import ( # noqa: E402 ScenarioPostgresRepository, ) -from sqlalchemy import Engine, create_engine, text # noqa: E402 +from scripts.e2e_common import ( # noqa: E402 + ENV_PATH, + build_engine, + load_env, + s3_parquet_reader, +) +from sqlalchemy import Engine, text # noqa: E402 from sqlmodel import Session # noqa: E402 -_ENV_PATH = _REPO_ROOT / "backend" / ".env" _MARKDOWN_PATH = Path("modelling_e2e.md") _CSV_PATH = Path("modelling_e2e.csv") _CANDIDATES_CSV_PATH = Path("modelling_e2e_candidates.csv") -def _load_env(path: Path) -> None: - """Load `KEY=value` lines from `backend/.env` into the environment (without - overriding anything already set), so the DB creds + EPC token are present.""" - if not path.exists(): - return - for raw in path.read_text(encoding="utf-8").splitlines(): - line = raw.strip() - if not line or line.startswith("#") or "=" not in line: - continue - key, value = line.split("=", 1) - os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'")) - - -def _db_url() -> str: - """The connection string from the FastAPI-layer `DB_*` env vars.""" - env = os.environ - return ( - f"postgresql+psycopg2://{env['DB_USERNAME']}:{env['DB_PASSWORD']}" - f"@{env['DB_HOST']}:{env['DB_PORT']}/{env['DB_NAME']}" - ) - - -def _s3_parquet_reader(bucket: str) -> ParquetReader: - """A `ParquetReader` (key -> DataFrame) backed by `bucket` in S3, for the - `GeospatialS3Repository`. AWS creds come from the ambient `~/.aws` profile; - pyarrow reads the parquet bytes (s3fs is not installed here).""" - # boto3 ships only partial type stubs, so the client is an untyped boundary. - client = cast(Any, boto3.client("s3")) # pyright: ignore[reportUnknownMemberType] - - def read(key: str) -> pd.DataFrame: - body = cast(bytes, client.get_object(Bucket=bucket, Key=key)["Body"].read()) - return pd.read_parquet(io.BytesIO(body)) - - return read - - def _spatial_for(repo: GeospatialS3Repository, uprn: int) -> Optional[SpatialReference]: """The UPRN's spatial reference (coordinates + planning protections), or None when S3 doesn't cover it — a missing reference must not abort the run, @@ -166,13 +130,6 @@ def _solar_insights_for( return None # no Google solar coverage at this point — model without it -def _engine() -> Engine: - """A connection-pooled engine to DevAssessmentModelDB (DB_* creds).""" - return create_engine( - _db_url(), pool_pre_ping=True, connect_args={"connect_timeout": 10} - ) - - def _uprns_for(engine: Engine, property_ids: list[int]) -> dict[int, Optional[int]]: """Read each Property's UPRN from the DB (read-only).""" with engine.connect() as conn: @@ -389,14 +346,14 @@ def main() -> None: if args.persist and (args.scenario_id is None or args.portfolio_id is None): parser.error("--persist requires --scenario-id and --portfolio-id") - _load_env(_ENV_PATH) + load_env(ENV_PATH) # The new gov EPC API (Bearer) authenticates with OPEN_EPC_API_TOKEN — the # name is misleading; EPC_AUTH_TOKEN is dead (403). Verified against the # /api/domestic/search endpoint. epc_client = EpcClientService(os.environ["OPEN_EPC_API_TOKEN"]) - geospatial = GeospatialS3Repository(_s3_parquet_reader(os.environ["DATA_BUCKET"])) + geospatial = GeospatialS3Repository(s3_parquet_reader(os.environ["DATA_BUCKET"])) solar_client = GoogleSolarApiClient(os.environ["GOOGLE_SOLAR_API_KEY"]) - engine = _engine() + engine = build_engine() cli_considered = _resolve_considered( _parse_measures(args.measures), _parse_measures(args.exclude_measures) )