From 168e7f18a12633b4e1b431e347a36d20932a5a1a Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 26 May 2026 10:41:00 +0000 Subject: [PATCH] deleted scaffolding services folder --- services/README.md | 13 - services/ara/Dockerfile | 12 - services/ara/README.md | 30 --- services/ara/pyproject.toml | 28 -- services/ara/src/ara/__init__.py | 4 - services/ara/src/ara/lambdas/__init__.py | 5 - .../ara/src/ara/orchestrators/__init__.py | 5 - services/ara/src/ara/services/__init__.py | 9 - services/ara/tests/__init__.py | 0 services/ara/tests/fakes/__init__.py | 4 - services/ara/tests/integration/__init__.py | 0 services/ara/tests/unit/__init__.py | 0 services/ml_training_data/pyproject.toml | 26 -- .../src/ml_training_data/__init__.py | 5 - .../src/ml_training_data/build_features.py | 74 ------ .../src/ml_training_data/bulk_zip_reader.py | 54 ---- .../ml_training_data/remote_bulk_fetcher.py | 82 ------ .../src/ml_training_data/sample.py | 33 --- .../src/ml_training_data/sap_parity_probe.py | 243 ------------------ .../src/ml_training_data/storage.py | 49 ---- .../src/ml_training_data/train_baseline.py | 189 -------------- .../src/ml_training_data/write_parquet.py | 54 ---- services/ml_training_data/tests/__init__.py | 0 .../ml_training_data/tests/unit/__init__.py | 0 .../tests/unit/test_build_features.py | 110 -------- .../tests/unit/test_bulk_zip_reader.py | 91 ------- .../tests/unit/test_sample.py | 98 ------- .../tests/unit/test_sap_parity_probe.py | 50 ---- .../tests/unit/test_storage.py | 90 ------- .../tests/unit/test_train_baseline.py | 235 ----------------- .../tests/unit/test_write_parquet.py | 98 ------- 31 files changed, 1691 deletions(-) delete mode 100644 services/README.md delete mode 100644 services/ara/Dockerfile delete mode 100644 services/ara/README.md delete mode 100644 services/ara/pyproject.toml delete mode 100644 services/ara/src/ara/__init__.py delete mode 100644 services/ara/src/ara/lambdas/__init__.py delete mode 100644 services/ara/src/ara/orchestrators/__init__.py delete mode 100644 services/ara/src/ara/services/__init__.py delete mode 100644 services/ara/tests/__init__.py delete mode 100644 services/ara/tests/fakes/__init__.py delete mode 100644 services/ara/tests/integration/__init__.py delete mode 100644 services/ara/tests/unit/__init__.py delete mode 100644 services/ml_training_data/pyproject.toml delete mode 100644 services/ml_training_data/src/ml_training_data/__init__.py delete mode 100644 services/ml_training_data/src/ml_training_data/build_features.py delete mode 100644 services/ml_training_data/src/ml_training_data/bulk_zip_reader.py delete mode 100644 services/ml_training_data/src/ml_training_data/remote_bulk_fetcher.py delete mode 100644 services/ml_training_data/src/ml_training_data/sample.py delete mode 100644 services/ml_training_data/src/ml_training_data/sap_parity_probe.py delete mode 100644 services/ml_training_data/src/ml_training_data/storage.py delete mode 100644 services/ml_training_data/src/ml_training_data/train_baseline.py delete mode 100644 services/ml_training_data/src/ml_training_data/write_parquet.py delete mode 100644 services/ml_training_data/tests/__init__.py delete mode 100644 services/ml_training_data/tests/unit/__init__.py delete mode 100644 services/ml_training_data/tests/unit/test_build_features.py delete mode 100644 services/ml_training_data/tests/unit/test_bulk_zip_reader.py delete mode 100644 services/ml_training_data/tests/unit/test_sample.py delete mode 100644 services/ml_training_data/tests/unit/test_sap_parity_probe.py delete mode 100644 services/ml_training_data/tests/unit/test_storage.py delete mode 100644 services/ml_training_data/tests/unit/test_train_baseline.py delete mode 100644 services/ml_training_data/tests/unit/test_write_parquet.py diff --git a/services/README.md b/services/README.md deleted file mode 100644 index c82ef6a4..00000000 --- a/services/README.md +++ /dev/null @@ -1,13 +0,0 @@ -# Services - -Each subdirectory is a deployable unit — typically a Lambda image. Own `pyproject.toml`, own `Dockerfile`, own deps. Lambda bundle contains only that service's deps + its workspace deps. - -| Service | Purpose | -|---------|---------| -| [`ara/`](./ara/) | The Domna retrofit modelling backend — ingestion + modelling pipelines, all 9 services in [PRD §9.2](../ara_backend_design.md). | - -Other Domna services (address2uprn, hubspot, pashub, ecmk, magicplan) live in the legacy `backend/` and `etl/` trees for now; they are slated to migrate here as their owners pick them up — see [PRD §11](../ara_backend_design.md). When that work starts, scaffold the service under `services//` and add it to the workspace members in the root `pyproject.toml`. - -## Service boundary - -A service can `import domain.*`, `import repos.*`, `import fetchers.*`, `import utils.*` (workspace deps). It **cannot** import another service's modules — they are separate distributions with no cross-import path. This is the structural enforcement of the modelling/ingestion separation ([ADR-0003](../docs/adr/0003-strict-ingestion-modelling-separation.md)). diff --git a/services/ara/Dockerfile b/services/ara/Dockerfile deleted file mode 100644 index c45d6bc1..00000000 --- a/services/ara/Dockerfile +++ /dev/null @@ -1,12 +0,0 @@ -# Lambda image for the Ara modelling backend. -# -# This is a scaffold — final image will install only ara + its workspace deps -# (domna-domain, domna-repos, domna-fetchers, domna-utils) plus ML/data libraries. -# Build via uv to keep cold-start size contained. - -FROM public.ecr.aws/lambda/python:3.11 - -# TODO: install uv, sync this service's deps from the workspace lock file, -# copy src/ara/ into ${LAMBDA_TASK_ROOT}/, set CMD to the Lambda handler. - -CMD ["ara.lambdas.handler.handler"] diff --git a/services/ara/README.md b/services/ara/README.md deleted file mode 100644 index 71e71a5d..00000000 --- a/services/ara/README.md +++ /dev/null @@ -1,30 +0,0 @@ -# ara - -The Domna retrofit modelling backend. Replaces the legacy `backend/engine/engine.py` monolith with a service-oriented pipeline that survives the 30 May 2026 gov EPC API cut-over and that other team members can read, fix, and extend. - -Design document: [`../../ara_backend_design.md`](../../ara_backend_design.md). -Domain glossary: [`../../CONTEXT.md`](../../CONTEXT.md). - -## Layout - -``` -src/ara/ -├── services/ # the 9 domain services from PRD §9.2: -│ # EpcRemappingService, EpcPredictionService, -│ # FeatureBuilder, EpcEnergyDerivationService, -│ # RebaseliningService, RecommendationService, -│ # ImpactPredictionService, OptimiserService, -│ # ValuationService, ResultsPersister -├── orchestrators/ # IngestionPipeline, ModellingPipeline, RefreshOrchestrator -└── lambdas/ # one handler.py per Lambda + the event-shape contracts -``` - -## Pipeline - -See [PRD §9.4](../../ara_backend_design.md) for the per-batch step order. Briefly: per-property setup (steps 1–6) runs once per Property; the per-scenario × per-phase loop (steps 7–10) re-derives candidates and impact predictions against the rolling Effective EPC state; results are persisted under one Unit of Work per (Plan, Scenario). - -## Testing - -- `tests/unit/` — service tests against fakes from `tests/fakes/`. No DB, no network, no ML lambda. -- `tests/integration/` — real Postgres (testcontainers / localstack), fake fetchers + fake ML lambdas. -- ML transform contract tests live with `domain.ml.transform` in `packages/domain/`. diff --git a/services/ara/pyproject.toml b/services/ara/pyproject.toml deleted file mode 100644 index 3556a15f..00000000 --- a/services/ara/pyproject.toml +++ /dev/null @@ -1,28 +0,0 @@ -[project] -name = "ara" -version = "0.1.0" -description = "The Domna retrofit modelling backend. Ingestion + modelling pipelines." -requires-python = ">=3.11" -dependencies = [ - "domna-domain", - "domna-repos", - "domna-fetchers", - "domna-utils", - "pandas>=2.0", - "pandas-stubs", - "numpy>=1.26", - "pydantic>=2.0", -] - -[tool.uv.sources] -domna-domain = { workspace = true } -domna-repos = { workspace = true } -domna-fetchers = { workspace = true } -domna-utils = { workspace = true } - -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - -[tool.hatch.build.targets.wheel] -packages = ["src/ara"] diff --git a/services/ara/src/ara/__init__.py b/services/ara/src/ara/__init__.py deleted file mode 100644 index 26856c73..00000000 --- a/services/ara/src/ara/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -"""The Domna retrofit modelling backend. - -See README.md and ara_backend_design.md (repo root) for the architecture. -""" diff --git a/services/ara/src/ara/lambdas/__init__.py b/services/ara/src/ara/lambdas/__init__.py deleted file mode 100644 index 93b08582..00000000 --- a/services/ara/src/ara/lambdas/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Lambda handlers + event-shape contracts. - -One handler per deployable Lambda. See PRD §4.6 for the ModelTriggerRequest -shape. -""" diff --git a/services/ara/src/ara/orchestrators/__init__.py b/services/ara/src/ara/orchestrators/__init__.py deleted file mode 100644 index 4d2c9a60..00000000 --- a/services/ara/src/ara/orchestrators/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Orchestrators for the Ara pipeline. - -IngestionPipeline, ModellingPipeline, RefreshOrchestrator. The only place -where step order is encoded and where fetchers + services + repos meet. -""" diff --git a/services/ara/src/ara/services/__init__.py b/services/ara/src/ara/services/__init__.py deleted file mode 100644 index b561f336..00000000 --- a/services/ara/src/ara/services/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -"""Domain services for the Ara modelling pipeline (PRD §9.2). - -EpcRemappingService, EpcPredictionService, FeatureBuilder, -EpcEnergyDerivationService, RebaseliningService, RecommendationService, -ImpactPredictionService, OptimiserService, ValuationService, ResultsPersister. - -Each service operates on `Properties` and depends only on repos + other services -+ domain objects. No external IO (per ADR-0003). -""" diff --git a/services/ara/tests/__init__.py b/services/ara/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/services/ara/tests/fakes/__init__.py b/services/ara/tests/fakes/__init__.py deleted file mode 100644 index cc032044..00000000 --- a/services/ara/tests/fakes/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -"""Fake repos and fetchers for unit tests. - -One FakeRepo per real repo; dict-backed; no DB. Same for fetchers. -""" diff --git a/services/ara/tests/integration/__init__.py b/services/ara/tests/integration/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/services/ara/tests/unit/__init__.py b/services/ara/tests/unit/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/services/ml_training_data/pyproject.toml b/services/ml_training_data/pyproject.toml deleted file mode 100644 index 12b412a5..00000000 --- a/services/ml_training_data/pyproject.toml +++ /dev/null @@ -1,26 +0,0 @@ -[project] -name = "ml-training-data" -version = "0.1.0" -description = "Pipeline that turns the EPC open-data CSV into ML training parquet + baseline models." -requires-python = ">=3.11" -dependencies = [ - "domna-domain", - "pandas>=2.0", - "pandas-stubs", - "ijson>=3.2", - "pyarrow>=15", - "lightgbm>=4.0", - "scikit-learn>=1.4", - "httpx", - "remotezip>=0.12", -] - -[tool.uv.sources] -domna-domain = { workspace = true } - -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - -[tool.hatch.build.targets.wheel] -packages = ["src/ml_training_data"] diff --git a/services/ml_training_data/src/ml_training_data/__init__.py b/services/ml_training_data/src/ml_training_data/__init__.py deleted file mode 100644 index 3ac79700..00000000 --- a/services/ml_training_data/src/ml_training_data/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""EPC CSV → training-data pipeline. - -Produces parquet + schema.json + manifest.json for baseline LightGBM training. -See ara_backend_design.md (repo root) for the pipeline shape. -""" diff --git a/services/ml_training_data/src/ml_training_data/build_features.py b/services/ml_training_data/src/ml_training_data/build_features.py deleted file mode 100644 index 17a69cbc..00000000 --- a/services/ml_training_data/src/ml_training_data/build_features.py +++ /dev/null @@ -1,74 +0,0 @@ -"""Wrapper records -> EpcPropertyData -> feature+target DataFrame. - -Each wrapper record from the bulk ZIP carries the cert metadata plus a -JSON-encoded `document` payload. This function: - - - Filters out non-RdSAP assessments (our mapper only handles RdSAP-Schema-21.x). - - Parses `document` and feeds it to EpcPropertyDataMapper.from_api_response. - - Calls EpcMlTransform.to_row per cert (streaming) and discards the heavy - EpcPropertyData immediately so memory stays O(row-dict) per cert rather than - O(EpcPropertyData * n) — critical for the 500k+ cert full-year runs. - - Prepends a `certificate_number` column so every row is traceable to its source. -""" - -import json -from typing import Any, cast - -import pandas as pd - -from datatypes.epc.domain.mapper import EpcPropertyDataMapper -from domain.ml.schema import TransformSchema -from domain.ml.transform import EpcMlTransform -from ml_training_data.bulk_zip_reader import BulkZipReader - -_RDSAP_ASSESSMENT_TYPE = "RdSAP" - - -def build_features( - bulk_reader: BulkZipReader, - certificate_numbers: set[str], - *, - skip_unsupported_schemas: bool = True, -) -> pd.DataFrame: - transform = EpcMlTransform() - rows: list[dict[str, Any]] = [] - cert_nums: list[str] = [] - for record in bulk_reader.iter_certificates_filtered(certificate_numbers): - if record.get("assessment_type") != _RDSAP_ASSESSMENT_TYPE: - continue - document_field = record.get("document") - if isinstance(document_field, str): - document = cast(dict[str, Any], json.loads(document_field)) - elif isinstance(document_field, dict): - document = cast(dict[str, Any], document_field) - else: - continue - try: - prop = EpcPropertyDataMapper.from_api_response(document) - except ValueError: - if skip_unsupported_schemas: - continue - raise - rows.append(transform.to_row(prop)) - cert_nums.append(str(record["certificate_number"])) - # prop and document drop out of scope; GC reclaims before the next iter. - df = _frame_from_rows(rows, transform.schema()) - df["certificate_number"] = cert_nums - return df[["certificate_number", *[c for c in df.columns if c != "certificate_number"]]] - - -def _frame_from_rows(rows: list[dict[str, Any]], schema: TransformSchema) -> pd.DataFrame: - """Build the typed DataFrame from streamed row dicts. - - Mirrors EpcMlTransform.to_rows post-processing: full column set even when empty, - and pd.Categorical casts for any column flagged categorical in the schema. - """ - all_columns = list(schema.feature_columns.keys()) + list(schema.target_columns.keys()) - df = pd.DataFrame(rows, columns=all_columns) - for name, spec in schema.feature_columns.items(): - if spec.categorical: - df[name] = df[name].astype("category") - for name, spec in schema.target_columns.items(): - if spec.categorical: - df[name] = df[name].astype("category") - return df diff --git a/services/ml_training_data/src/ml_training_data/bulk_zip_reader.py b/services/ml_training_data/src/ml_training_data/bulk_zip_reader.py deleted file mode 100644 index d3e7d80d..00000000 --- a/services/ml_training_data/src/ml_training_data/bulk_zip_reader.py +++ /dev/null @@ -1,54 +0,0 @@ -"""Stream EPC certificate wrappers from the gov bulk JSON ZIP. - -The bulk ZIP from /api/files/domestic/json contains one entry per inspection year -(certificates-YYYY.json). Each entry is a stream of concatenated JSON objects -(NDJSON-shaped, no enclosing array) where every wrapper has: - - - certificate_number: str - - assessment_type: "RdSAP" | "SAP" | ... - - document: str — a JSON-encoded payload with the full EPC schema document - - warehouse_created_at, updated_at: provenance timestamps - -The reader yields those wrappers as-is. Unwrapping `document` and filtering by -assessment_type belongs at the build_features layer, where the domain knowledge -of which assessment types our mapper supports already lives. -""" - -import zipfile -from collections.abc import Iterator -from typing import Any, cast - -import ijson # type: ignore[import-untyped] - -from ml_training_data.storage import Storage - - -class BulkZipReader: - def __init__(self, storage: Storage, zip_key: str) -> None: - self._storage = storage - self._zip_key = zip_key - - def list_entries(self) -> list[str]: - with zipfile.ZipFile(self._storage.open_read(self._zip_key)) as zf: - return zf.namelist() - - def iter_certificates(self, entry: str) -> Iterator[dict[str, Any]]: - with zipfile.ZipFile(self._storage.open_read(self._zip_key)) as zf: - with zf.open(entry) as f: - for item in ijson.items(f, "", use_float=True, multiple_values=True): - yield cast(dict[str, Any], item) - - def iter_certificates_filtered( - self, certificate_numbers: set[str] - ) -> Iterator[dict[str, Any]]: - remaining = set(certificate_numbers) - for entry in self.list_entries(): - if not remaining: - return - for cert in self.iter_certificates(entry): - cert_num = cert.get("certificate_number") - if cert_num in remaining: - remaining.discard(cert_num) - yield cert - if not remaining: - return diff --git a/services/ml_training_data/src/ml_training_data/remote_bulk_fetcher.py b/services/ml_training_data/src/ml_training_data/remote_bulk_fetcher.py deleted file mode 100644 index 1a8fcb25..00000000 --- a/services/ml_training_data/src/ml_training_data/remote_bulk_fetcher.py +++ /dev/null @@ -1,82 +0,0 @@ -"""Extract specific yearly entries from the gov bulk JSON ZIP without downloading -the whole 15 GB archive. - -The gov endpoint redirects to a pre-signed S3 URL; remotezip uses HTTP Range -requests so only the bytes for the requested entries traverse the wire. Each entry -is streamed in chunks and re-zipped into a *separate* local archive (one per year): - - - Smaller disk footprint than the uncompressed JSON would need (5-8 GB per year). - - Per-year resumability: a partial-network failure on year N doesn't waste years - that already completed; rerun and the existing zips are skipped. - - BulkZipReader keeps working unchanged on each per-year local ZIP. -""" - -import zipfile -from pathlib import Path - -import httpx -from remotezip import RemoteZip # type: ignore[import-untyped] # pyright: ignore[reportMissingTypeStubs] - -_BULK_JSON_URL = ( - "https://api.get-energy-performance-data.communities.gov.uk/api/files/domestic/json" -) -_READ_CHUNK_BYTES = 8 * 1024 * 1024 # 8 MB - - -def extract_entries_to_local_zips( - auth_token: str, - entry_names: list[str], - output_dir: Path, -) -> dict[str, int]: - output_dir.mkdir(parents=True, exist_ok=True) - sizes: dict[str, int] = {} - for entry in entry_names: - out_path = output_dir / f"{entry}.zip" - if out_path.exists(): - sizes[entry] = out_path.stat().st_size - continue - # The S3 presigned URL is short-lived (X-Amz-Expires=30s), so refresh it - # per entry — extracting one entry can easily take longer than the TTL. - presigned_url = _resolve_presigned_url(auth_token) - with RemoteZip(presigned_url) as in_zf: # pyright: ignore[reportUnknownVariableType] - _stream_entry_to_zip(in_zf, entry, out_path) - sizes[entry] = out_path.stat().st_size - return sizes - - -def _stream_entry_to_zip( - in_zf: RemoteZip, # pyright: ignore[reportUnknownParameterType] - entry: str, - out_path: Path, -) -> None: - tmp_path = out_path.with_suffix(out_path.suffix + ".part") - try: - with zipfile.ZipFile(tmp_path, "w", zipfile.ZIP_DEFLATED) as out_zf: - with in_zf.open(entry) as src, out_zf.open(entry, "w", force_zip64=True) as dst: # pyright: ignore[reportUnknownMemberType,reportUnknownVariableType] - while True: - chunk: bytes = src.read(_READ_CHUNK_BYTES) # pyright: ignore[reportUnknownVariableType,reportUnknownMemberType] - if not chunk: - break - dst.write(chunk) - tmp_path.rename(out_path) - except BaseException: - if tmp_path.exists(): - tmp_path.unlink() - raise - - -def _resolve_presigned_url(auth_token: str) -> str: - response = httpx.get( - _BULK_JSON_URL, - headers={"Authorization": f"Bearer {auth_token}"}, - follow_redirects=False, - timeout=30, - ) - if response.status_code != 302: - raise RuntimeError( - f"Bulk JSON endpoint did not redirect: {response.status_code} {response.text[:200]}" - ) - location = response.headers.get("location") - if not location: - raise RuntimeError("Bulk JSON 302 had no Location header") - return location diff --git a/services/ml_training_data/src/ml_training_data/sample.py b/services/ml_training_data/src/ml_training_data/sample.py deleted file mode 100644 index 1cb9cabd..00000000 --- a/services/ml_training_data/src/ml_training_data/sample.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Sample certificate rows from the EPC flat-register CSV. - -The flat-register CSV (2.4M rows) is the *only* exhaustive list of certificate -numbers; per-certificate detail is fetched separately downstream. sample() returns -a thin DataFrame keyed by certificate_number so later stages know which records to -fetch and how to join their per-cert JSON back to register-side metadata. -""" - -from collections.abc import Mapping, Sequence -from pathlib import Path -from typing import Optional - -import pandas as pd - - -def sample( - csv_path: Path, - n: int, - seed: int, - filters: Optional[Mapping[str, Sequence[str]]] = None, -) -> pd.DataFrame: - df: pd.DataFrame = pd.read_csv(csv_path, dtype=str, keep_default_na=False) - - if filters: - for column, allowed in filters.items(): - if column not in df.columns: - raise KeyError(f"filter column not present in CSV: {column!r}") - df = df[df[column].isin(list(allowed))] - - if len(df) <= n: - return df.reset_index(drop=True) - - return df.sample(n=n, random_state=seed).reset_index(drop=True) diff --git a/services/ml_training_data/src/ml_training_data/sap_parity_probe.py b/services/ml_training_data/src/ml_training_data/sap_parity_probe.py deleted file mode 100644 index beb64783..00000000 --- a/services/ml_training_data/src/ml_training_data/sap_parity_probe.py +++ /dev/null @@ -1,243 +0,0 @@ -"""Sap10Calculator parity probe over N random certs from the corpus. - -ADR-0009 Session B exploratory tool. Loads the v18a parquet, samples N -certs from the typical sap-score range, streams them from the bulk JSON -ZIPs, runs the calculator, and prints the residual distribution + -worst-N residuals for spec-iteration triage. - -Usage (from repo root, with the workspace venv active): - python -m ml_training_data.sap_parity_probe # N=100, seed=7 - python -m ml_training_data.sap_parity_probe 500 13 # custom N + seed - -Findings get written up in docs/sap-spec/PARITY_FINDINGS.md. -""" -from __future__ import annotations - -import json -import sys -import time -from pathlib import Path -from typing import Any, cast - -import pandas as pd - -from datatypes.epc.domain.mapper import EpcPropertyDataMapper -from domain.sap.calculator import calculate_sap_from_inputs -from domain.sap.rdsap.cert_to_inputs import ( - PriceTable, - SAP_10_2_SPEC_PRICES, - cert_to_inputs, -) -from ml_training_data.bulk_zip_reader import BulkZipReader -from ml_training_data.storage import LocalStorage - - -_REPO = Path(__file__).resolve().parents[4] -_PARQUET = _REPO / "data" / "ml_training" / "runs" / "2025_2026_n250000_v18a" / "data.parquet" -_BULK = _REPO / "data" / "ml_training" / "bulk" -_ZIP_KEYS = ("certificates-2025.json.zip", "certificates-2026.json.zip") - - -def predict_sap_for_cert( - cert_document: dict[str, Any], *, prices: PriceTable -) -> int: - """Run the mapper → cert_to_inputs → calculator pipeline on a single - cert document and return the rounded RdSAP-style SAP score. The - pure-function seam the corpus probe and any future per-cert dev - tools share.""" - epc = EpcPropertyDataMapper.from_api_response(cert_document) - inputs = cert_to_inputs(epc, prices=prices) - result = calculate_sap_from_inputs(inputs) - return result.sap_score - - -def _sample_certs(n: int, seed: int) -> dict[str, int]: - df = pd.read_parquet(_PARQUET, columns=["certificate_number", "sap_score"]) - # Wide range so the sample includes full-SAP new-builds (sap_score 90+) - # and the deepest-tail heritage/anomaly certs (sap_score ≤ 20). Earlier - # `between(20, 95)` excluded the populations where the calculator's - # biggest spec gaps tend to live. - df = df[df["sap_score"].between(5, 99)] - s = df.sample(n, random_state=seed) - return dict(zip(s["certificate_number"], s["sap_score"].astype(int))) - - -def main(argv: list[str] | None = None) -> None: - args = argv if argv is not None else sys.argv[1:] - n = int(args[0]) if args else 100 - seed = int(args[1]) if len(args) > 1 else 7 - - targets = _sample_certs(n, seed) - print( - f"Sampling {len(targets)} certs (seed={seed}) — using SAP 10.2 " - f"(14-03-2025) spec prices per ADR-0010" - ) - storage = LocalStorage(_BULK) - prices = SAP_10_2_SPEC_PRICES - results: list[dict[str, Any]] = [] - errors: list[dict[str, Any]] = [] - remaining = set(targets) - t0 = time.monotonic() - for zip_key in _ZIP_KEYS: - if not remaining: - break - if not storage.exists(zip_key): - print(f"!! missing {zip_key}", file=sys.stderr) - continue - reader = BulkZipReader(storage, zip_key) - for cert in reader.iter_certificates_filtered(remaining): - cn = cert["certificate_number"] - actual = targets[cn] - doc_field = cert.get("document") - document = cast( - dict[str, Any], - json.loads(doc_field) if isinstance(doc_field, str) else doc_field, - ) - try: - epc = EpcPropertyDataMapper.from_api_response(document) - inputs = cert_to_inputs(epc, prices=prices) - result = calculate_sap_from_inputs(inputs) - cert_primary = epc.energy_consumption_current - tfa = epc.total_floor_area_m2 or 1.0 - pf_space = inputs.space_heating_primary_factor - pf_hw = inputs.hot_water_primary_factor - pf_other = inputs.other_primary_factor - main_detail = ( - epc.sap_heating.main_heating_details[0] - if epc.sap_heating and epc.sap_heating.main_heating_details - else None - ) - age_band = ( - epc.sap_building_parts[0].construction_age_band - if epc.sap_building_parts - else None - ) - results.append({ - "cert": cn, - "actual": actual, - "predicted": result.sap_score, - "residual": result.sap_score - actual, - "ecf": round(result.ecf, 3), - "tfa": epc.total_floor_area_m2, - "ext": epc.extensions_count, - "dwelling": epc.dwelling_type, - "our_primary_kwh_m2": round(result.primary_energy_kwh_per_m2, 1), - "cert_primary_kwh_m2": cert_primary, - "primary_resid": ( - round(result.primary_energy_kwh_per_m2 - cert_primary, 1) - if cert_primary is not None - else None - ), - # End-use primary-energy split, kWh/m² of TFA (calculator side). - "space_pe_m2": round( - (result.main_heating_fuel_kwh_per_yr + result.secondary_heating_fuel_kwh_per_yr) - * pf_space / tfa, 1 - ), - "hw_pe_m2": round(result.hot_water_kwh_per_yr * pf_hw / tfa, 1), - "lighting_pe_m2": round(result.lighting_kwh_per_yr * pf_other / tfa, 1), - "pumps_pe_m2": round(result.pumps_fans_kwh_per_yr * pf_other / tfa, 1), - "pv_pe_m2": round( - -inputs.pv_generation_kwh_per_yr * pf_other / tfa, 1 - ), - # Strata for residual decomposition. - "main_cat": main_detail.main_heating_category if main_detail else None, - "main_fuel": main_detail.main_fuel_type if main_detail else None, - "age": age_band, - "water_code": epc.sap_heating.water_heating_code if epc.sap_heating else None, - "has_cyl": epc.sap_heating.cylinder_size is not None if epc.sap_heating else False, - }) - except Exception as e: # noqa: BLE001 — exploratory probe - errors.append({"cert": cn, "actual": actual, "error": f"{type(e).__name__}: {e}"}) - remaining.discard(cn) - elapsed = time.monotonic() - t0 - df = pd.DataFrame(results) - print(f"\nelapsed {elapsed:.1f}s; calculated={len(results)}, errored={len(errors)}, not_found={len(remaining)}") - if not df.empty: - df["abs_resid"] = df["residual"].abs() - print(f"\nMAE: {df['residual'].abs().mean():.2f}") - print(f"RMSE: {((df['residual'] ** 2).mean()) ** 0.5:.2f}") - print(f"bias: {df['residual'].mean():.2f}") - for thr in (1, 3, 5, 10): - pct = (df["abs_resid"] <= thr).mean() * 100 - print(f"within ±{thr}: {pct:.1f}%") - print("\nresidual distribution:") - print(df["residual"].describe(percentiles=[0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95])) - print("\nworst 15 by |residual| (SAP):") - print(df.nlargest(15, "abs_resid")[ - ["cert", "actual", "predicted", "residual", "ecf", "tfa", "ext", "dwelling"] - ].to_string(index=False)) - - # ------------------------------------------------------------------ - # Primary-energy decomposition. The cert reports a single PEUI - # (`energy_consumption_current`, kWh/m² TFA), so we can't see the - # cert's per-end-use mix — but we can see ours, and stratify the - # residual by cert attributes to localise which population drives - # the bias. - # ------------------------------------------------------------------ - pe_df = df.dropna(subset=["primary_resid"]).copy() - if not pe_df.empty: - pe_df["abs_pe_resid"] = pe_df["primary_resid"].abs() - print(f"\n=== Primary energy (kWh/m² TFA) ===") - print(f"PE MAE: {pe_df['abs_pe_resid'].mean():.2f}") - print(f"PE bias: {pe_df['primary_resid'].mean():.2f}") - print(f"PE RMSE: {((pe_df['primary_resid'] ** 2).mean()) ** 0.5:.2f}") - print("PE residual distribution:") - print(pe_df["primary_resid"].describe( - percentiles=[0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95] - )) - print("\nCorpus mean OUR primary energy by end-use (kWh/m² TFA):") - mix_cols = ["space_pe_m2", "hw_pe_m2", "lighting_pe_m2", "pumps_pe_m2", "pv_pe_m2"] - for col in mix_cols: - print(f" {col:14s} {pe_df[col].mean():6.1f}") - print(f" {'TOTAL (ours)':14s} {pe_df['our_primary_kwh_m2'].mean():6.1f}") - print(f" {'TOTAL (cert)':14s} {pe_df['cert_primary_kwh_m2'].mean():6.1f}") - - # Stratified bias by main heating category. - print("\nPE residual stratified by main_heating_category:") - by_cat = pe_df.groupby("main_cat", dropna=False).agg( - n=("cert", "count"), - pe_mae=("abs_pe_resid", "mean"), - pe_bias=("primary_resid", "mean"), - our_pe=("our_primary_kwh_m2", "mean"), - cert_pe=("cert_primary_kwh_m2", "mean"), - space_pe=("space_pe_m2", "mean"), - hw_pe=("hw_pe_m2", "mean"), - light_pe=("lighting_pe_m2", "mean"), - ).round(1) - print(by_cat.to_string()) - - # Stratified bias by age band — narrows the envelope-side hypothesis. - print("\nPE residual stratified by construction_age_band:") - by_age = pe_df.groupby("age", dropna=False).agg( - n=("cert", "count"), - pe_mae=("abs_pe_resid", "mean"), - pe_bias=("primary_resid", "mean"), - space_pe=("space_pe_m2", "mean"), - ).round(1).sort_index() - print(by_age.to_string()) - - # Stratified bias by dwelling type — flat vs detached should split - # if envelope-side (party-wall surfaces) is the dominant residual. - print("\nPE residual stratified by dwelling_type:") - by_dwel = pe_df.groupby("dwelling", dropna=False).agg( - n=("cert", "count"), - pe_mae=("abs_pe_resid", "mean"), - pe_bias=("primary_resid", "mean"), - space_pe=("space_pe_m2", "mean"), - ).round(1) - print(by_dwel.to_string()) - - print("\nworst 15 by |PE residual|:") - print(pe_df.nlargest(15, "abs_pe_resid")[ - ["cert", "actual", "primary_resid", - "our_primary_kwh_m2", "cert_primary_kwh_m2", - "space_pe_m2", "hw_pe_m2", "main_cat", "dwelling"] - ].to_string(index=False)) - if errors: - print("\nerrors:") - for e in errors[:10]: - print(" ", e) - - -if __name__ == "__main__": - main() diff --git a/services/ml_training_data/src/ml_training_data/storage.py b/services/ml_training_data/src/ml_training_data/storage.py deleted file mode 100644 index 8e7e3994..00000000 --- a/services/ml_training_data/src/ml_training_data/storage.py +++ /dev/null @@ -1,49 +0,0 @@ -"""Storage protocol + LocalStorage impl for the training-data pipeline. - -The protocol is the seam between local-dev (filesystem, gitignored ./data/) and a -future S3-backed implementation. Downstream stages depend only on the protocol so -the swap is a constructor change, not a callsite rewrite. -""" - -from collections.abc import Iterator -from pathlib import Path -from typing import IO, Protocol - - -class Storage(Protocol): - def write_bytes(self, key: str, data: bytes) -> None: ... - def read_bytes(self, key: str) -> bytes: ... - def exists(self, key: str) -> bool: ... - def iter_keys(self, prefix: str = "") -> Iterator[str]: ... - def open_read(self, key: str) -> IO[bytes]: ... - - -class LocalStorage: - def __init__(self, root: Path) -> None: - self._root = root - - def _path(self, key: str) -> Path: - return self._root / key - - def write_bytes(self, key: str, data: bytes) -> None: - target = self._path(key) - target.parent.mkdir(parents=True, exist_ok=True) - target.write_bytes(data) - - def read_bytes(self, key: str) -> bytes: - return self._path(key).read_bytes() - - def exists(self, key: str) -> bool: - return self._path(key).exists() - - def open_read(self, key: str) -> IO[bytes]: - return self._path(key).open("rb") - - def iter_keys(self, prefix: str = "") -> Iterator[str]: - if not self._root.exists(): - return - for p in self._root.rglob("*"): - if p.is_file(): - key = p.relative_to(self._root).as_posix() - if key.startswith(prefix): - yield key diff --git a/services/ml_training_data/src/ml_training_data/train_baseline.py b/services/ml_training_data/src/ml_training_data/train_baseline.py deleted file mode 100644 index 019c8bdd..00000000 --- a/services/ml_training_data/src/ml_training_data/train_baseline.py +++ /dev/null @@ -1,189 +0,0 @@ -"""Fit one LightGBM regressor per target; emit metrics + feature importance. - -This is the final stage of the training-data pipeline. Inputs: a feature DataFrame -(produced by build_features + persisted by write_training_dataset) plus a list of -target columns to fit. Output: a metrics dict (MAPE + R^2 per target) and a -per-target JSON file of feature importances written via Storage. - -The certificate_number column, if present, is dropped before fitting so it never -leaks into the model as a feature. -""" - -import json -from typing import Any, Callable, Optional, cast - -import lightgbm as lgb # type: ignore[import-untyped] -import numpy as np -import pandas as pd -import sklearn.metrics as _sk_metrics # type: ignore[import-untyped] # pyright: ignore[reportMissingTypeStubs] -import sklearn.model_selection as _sk_model_selection # type: ignore[import-untyped] # pyright: ignore[reportMissingTypeStubs] - -mean_absolute_error: Any = _sk_metrics.mean_absolute_error # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] -mean_absolute_percentage_error: Any = _sk_metrics.mean_absolute_percentage_error # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] -mean_squared_error: Any = _sk_metrics.mean_squared_error # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] -r2_score: Any = _sk_metrics.r2_score # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] -train_test_split: Any = _sk_model_selection.train_test_split # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] - -from ml_training_data.storage import Storage - -_CERT_NUM_COLUMN = "certificate_number" - -# Per-target LightGBM objective overrides. Initially (slice 16g) we switched -# sap_score + peui_ucl to 'mape' to align objective with reporting metric; -# the 250k v16 ablation (slice 16h) showed 'mape' loses ~0.6 percentage -# points of global MAPE because it over-weights the low-SAP tail at the -# expense of the body. Reverted to the default 'regression' for all targets. -# Tail bias needs a different fix (sample weights / stratified loss) — slice 16i. -_OBJECTIVE_OVERRIDES: dict[str, str] = {} - - -SampleWeightFn = Callable[["pd.Series[Any]"], "pd.Series[Any]"] - - -# Default tail-bucket weight curve for slice 16i. Boundary 58 picked from -# slice 16h's per-decile residuals (decile 0 = SAP 1-58 carries 17% MAPE -# vs <5% in the body); 3x multiplier is the lightest weight that demonstrably -# reduces the +3.1 bias at decile 0 without inflating body MAPE. Configurable -# via sample_weight_fn for ablation runs. -_DEFAULT_LOW_SAP_THRESHOLD: float = 58.0 -_DEFAULT_LOW_SAP_WEIGHT: float = 3.0 - - -def low_sap_tail_weight( - y: "pd.Series[Any]", - threshold: float = _DEFAULT_LOW_SAP_THRESHOLD, - weight: float = _DEFAULT_LOW_SAP_WEIGHT, -) -> "pd.Series[Any]": - """Return per-row weights: `weight` where y < threshold, 1.0 otherwise. - - Use as `train_baseline(..., sample_weight_fn=low_sap_tail_weight)` to - apply the slice 16i tail strategy. - """ - arr = np.asarray(y, dtype=float) - return pd.Series(np.where(arr < threshold, weight, 1.0), index=y.index) - - -def train_baseline( - df: pd.DataFrame, - targets: list[str], - storage: Storage, - run_key: str, - *, - test_size: float = 0.2, - seed: int = 42, - n_estimators: int = 200, - sample_weight_fn: Optional[SampleWeightFn] = None, -) -> dict[str, dict[str, float]]: - feature_cols = [c for c in df.columns if c not in targets and c != _CERT_NUM_COLUMN] - # LightGBM needs numeric (or pd.Categorical) dtypes. Coerce object columns whose - # contents are numeric-or-None to numeric (None -> NaN). Pandas object columns - # that are *actually* string categoricals are left alone if coercion would - # destroy data — pd.Categorical features pass through LightGBM correctly. - for col in [*feature_cols, *targets]: - if df[col].dtype == "object": - df[col] = pd.to_numeric(df[col], errors="coerce") - metrics: dict[str, dict[str, float]] = {} - - for target in targets: - # Drop rows where this target is null so LightGBM doesn't trip on label NaN. - target_df = df.dropna(subset=[target]) - x = target_df[feature_cols] - y = target_df[target] - split = cast( - tuple[pd.DataFrame, pd.DataFrame, "pd.Series[Any]", "pd.Series[Any]"], - train_test_split(x, y, test_size=test_size, random_state=seed), - ) - x_train, x_test, y_train, y_test = split - - objective = _OBJECTIVE_OVERRIDES.get(target, "regression") - model: Any = lgb.LGBMRegressor( - n_estimators=n_estimators, random_state=seed, verbose=-1, objective=objective, - ) - sample_weight = sample_weight_fn(y_train) if sample_weight_fn is not None else None - model.fit(x_train, y_train, sample_weight=sample_weight) - preds: np.ndarray[Any, Any] = np.asarray(model.predict(x_test)) - - metrics[target] = { - "mape": float(cast(float, mean_absolute_percentage_error(y_test, preds))), - "smape": _smape(y_test, preds), - "mae": float(cast(float, mean_absolute_error(y_test, preds))), - "rmse": float(np.sqrt(cast(float, mean_squared_error(y_test, preds)))), - "r2": float(cast(float, r2_score(y_test, preds))), - } - - importance_arr = np.asarray(model.feature_importances_, dtype=float) - importance = {col: float(score) for col, score in zip(feature_cols, importance_arr)} - storage.write_bytes( - f"{run_key}importance_{target}.json", - json.dumps(importance, indent=2).encode("utf-8"), - ) - - residuals = _per_decile_residuals(np.asarray(y_test, dtype=float), preds) - storage.write_bytes( - f"{run_key}residuals_{target}.json", - json.dumps({"buckets": residuals}, indent=2).encode("utf-8"), - ) - - storage.write_bytes( - f"{run_key}metrics.json", - json.dumps(metrics, indent=2).encode("utf-8"), - ) - return metrics - - -def _smape(y_true: Any, y_pred: Any) -> float: - """Symmetric MAPE: mean(|y - yhat| / ((|y| + |yhat|) / 2)). - - Bounded in [0, 2] (often reported as 0-200%). Stable when |y| is near zero, - so it's a better summary than MAPE for low-magnitude targets like - `hot_water_kwh` in well-insulated homes. - """ - y_t = np.asarray(y_true, dtype=float) - y_p = np.asarray(y_pred, dtype=float) - denom = (np.abs(y_t) + np.abs(y_p)) / 2.0 - mask = denom > 0 - if not mask.any(): - return 0.0 - return float(np.mean(np.abs(y_t[mask] - y_p[mask]) / denom[mask])) - - -def _per_decile_residuals( - y_true: np.ndarray[Any, Any], y_pred: np.ndarray[Any, Any] -) -> list[dict[str, float]]: - """Bucket the test set by deciles of the true target value, then report - MAPE / MAE / mean residual / count per bucket. - - Lets us tell whether errors concentrate in the tails of the true distribution - (e.g. SAP<40 / SAP>85) vs the mid-band — which the global MAPE alone hides. - """ - order = np.argsort(y_true, kind="stable") - y_t = y_true[order] - y_p = y_pred[order] - n = len(y_t) - bucket_size = n // 10 # last bucket absorbs the remainder - buckets: list[dict[str, float]] = [] - for i in range(10): - start = i * bucket_size - stop = n if i == 9 else (i + 1) * bucket_size - slice_t = y_t[start:stop] - slice_p = y_p[start:stop] - count = len(slice_t) - if count == 0: - continue - abs_err = np.abs(slice_t - slice_p) - mae = float(np.mean(abs_err)) - mean_residual = float(np.mean(slice_p - slice_t)) - mape_mask = slice_t != 0 - mape = float(np.mean(abs_err[mape_mask] / np.abs(slice_t[mape_mask]))) if mape_mask.any() else 0.0 - buckets.append( - { - "decile": float(i), - "true_min": float(slice_t[0]), - "true_max": float(slice_t[-1]), - "count": float(count), - "mape": mape, - "mae": mae, - "mean_residual": mean_residual, - } - ) - return buckets diff --git a/services/ml_training_data/src/ml_training_data/write_parquet.py b/services/ml_training_data/src/ml_training_data/write_parquet.py deleted file mode 100644 index b180a380..00000000 --- a/services/ml_training_data/src/ml_training_data/write_parquet.py +++ /dev/null @@ -1,54 +0,0 @@ -"""Persist a training-feature DataFrame as parquet + schema.json + manifest.json. - -The output triple is the artefact contract this pipeline hands to downstream model -training: parquet for the data, schema.json for dtype intent, manifest.json for -run provenance. Writes go through Storage so the same code lands on local-fs or S3 -without a callsite change. -""" - -import io -import json -from datetime import datetime, timezone -from typing import Any, Optional - -import pandas as pd - -from domain.ml.schema import ColumnSpec, TransformSchema -from ml_training_data.storage import Storage - - -def write_training_dataset( - df: pd.DataFrame, - storage: Storage, - run_key: str, - *, - schema: TransformSchema, - source_info: Optional[dict[str, Any]] = None, -) -> None: - buf = io.BytesIO() - df.to_parquet(buf, engine="pyarrow", index=False) - storage.write_bytes(f"{run_key}data.parquet", buf.getvalue()) - - schema_doc = { - "transform_version": schema.transform_version, - "features": {name: _column_to_json(spec) for name, spec in schema.feature_columns.items()}, - "targets": {name: _column_to_json(spec) for name, spec in schema.target_columns.items()}, - } - storage.write_bytes(f"{run_key}schema.json", json.dumps(schema_doc, indent=2).encode("utf-8")) - - manifest = { - "transform_version": schema.transform_version, - "row_count": len(df), - "written_at": datetime.now(timezone.utc).isoformat(), - "source_info": source_info or {}, - } - storage.write_bytes(f"{run_key}manifest.json", json.dumps(manifest, indent=2).encode("utf-8")) - - -def _column_to_json(spec: ColumnSpec) -> dict[str, object]: - return { - "dtype": spec.dtype.__name__, - "nullable": spec.nullable, - "categorical": spec.categorical, - "description": spec.description, - } diff --git a/services/ml_training_data/tests/__init__.py b/services/ml_training_data/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/services/ml_training_data/tests/unit/__init__.py b/services/ml_training_data/tests/unit/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/services/ml_training_data/tests/unit/test_build_features.py b/services/ml_training_data/tests/unit/test_build_features.py deleted file mode 100644 index 50c28089..00000000 --- a/services/ml_training_data/tests/unit/test_build_features.py +++ /dev/null @@ -1,110 +0,0 @@ -"""Tests for build_features() — wrapper record → EpcPropertyData → feature row. - -build_features wires three existing pieces: BulkZipReader yields wrapper records -out of the bulk ZIP, EpcPropertyDataMapper.from_api_response parses the -JSON-encoded `document` payload into EpcPropertyData, and EpcMlTransform.to_rows -produces the feature+target DataFrame. The function adds a leading -`certificate_number` column so each row stays traceable to its source cert. -""" - -import io -import json -import zipfile -from pathlib import Path - -import pytest - -from ml_training_data.bulk_zip_reader import BulkZipReader -from ml_training_data.build_features import build_features -from ml_training_data.storage import LocalStorage - -_FIXTURE_PATH = Path("datatypes/epc/schema/tests/fixtures/21_0_0.json") - - -def _wrapper(cert_number: str, *, assessment_type: str = "RdSAP", document: str | None = None) -> dict[str, object]: - if document is None: - document = _FIXTURE_PATH.read_text() - return { - "certificate_number": cert_number, - "assessment_type": assessment_type, - "document": document, - } - - -def _write_zip(storage: LocalStorage, key: str, entries: dict[str, list[dict[str, object]]]) -> None: - buf = io.BytesIO() - with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: - for entry_name, records in entries.items(): - ndjson = "\n".join(json.dumps(r) for r in records) - zf.writestr(entry_name, ndjson) - storage.write_bytes(key, buf.getvalue()) - - -def test_build_features_returns_one_row_per_supported_cert(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - record = _wrapper("CERT-001") - _write_zip(storage, "bulk.zip", {"certificates-2024.json": [record]}) - reader = BulkZipReader(storage=storage, zip_key="bulk.zip") - - # Act - df = build_features(reader, certificate_numbers={"CERT-001"}) - - # Assert - assert len(df) == 1 - assert df.iloc[0]["certificate_number"] == "CERT-001" - assert "certificate_number" == df.columns[0] - - -def test_build_features_skips_non_rdsap_assessment_types(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - rdsap = _wrapper("OK-1", assessment_type="RdSAP") - sap = _wrapper("SAP-1", assessment_type="SAP") - _write_zip(storage, "bulk.zip", {"certificates-2024.json": [rdsap, sap]}) - reader = BulkZipReader(storage=storage, zip_key="bulk.zip") - - # Act - df = build_features(reader, certificate_numbers={"OK-1", "SAP-1"}) - - # Assert - assert df["certificate_number"].tolist() == ["OK-1"] - - -def test_build_features_skips_unsupported_schemas_by_default(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - bad_doc = json.loads(_FIXTURE_PATH.read_text()) - bad_doc["schema_type"] = "RdSAP-Schema-19.0.0" # not in mapper dispatch - supported = _wrapper("OK-1") - unsupported = _wrapper("BAD-1", document=json.dumps(bad_doc)) - _write_zip( - storage, - "bulk.zip", - {"certificates-2024.json": [supported, unsupported]}, - ) - reader = BulkZipReader(storage=storage, zip_key="bulk.zip") - - # Act - df = build_features(reader, certificate_numbers={"OK-1", "BAD-1"}) - - # Assert - assert df["certificate_number"].tolist() == ["OK-1"] - - -def test_build_features_raises_when_skip_unsupported_schemas_is_false(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - bad_doc = json.loads(_FIXTURE_PATH.read_text()) - bad_doc["schema_type"] = "RdSAP-Schema-19.0.0" - unsupported = _wrapper("BAD-1", document=json.dumps(bad_doc)) - _write_zip(storage, "bulk.zip", {"certificates-2024.json": [unsupported]}) - reader = BulkZipReader(storage=storage, zip_key="bulk.zip") - - # Act / Assert - with pytest.raises(ValueError, match="Unsupported EPC schema"): - build_features( - reader, - certificate_numbers={"BAD-1"}, - skip_unsupported_schemas=False, - ) diff --git a/services/ml_training_data/tests/unit/test_bulk_zip_reader.py b/services/ml_training_data/tests/unit/test_bulk_zip_reader.py deleted file mode 100644 index 9f43a6dd..00000000 --- a/services/ml_training_data/tests/unit/test_bulk_zip_reader.py +++ /dev/null @@ -1,91 +0,0 @@ -"""Tests for BulkZipReader — stream EPC certificate wrappers from the gov bulk JSON ZIP. - -The real bulk ZIP holds one entry per inspection year. Each entry is a sequence of -concatenated JSON objects (NDJSON-shaped, no enclosing array). Tests use small -synthetic ZIPs of the same shape. -""" - -import io -import json -import zipfile -from pathlib import Path - -import pytest - -from ml_training_data.bulk_zip_reader import BulkZipReader -from ml_training_data.storage import LocalStorage - - -def _write_zip( - storage: LocalStorage, key: str, entries: dict[str, list[dict[str, object]]] -) -> None: - buf = io.BytesIO() - with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: - for entry_name, records in entries.items(): - ndjson = "\n".join(json.dumps(r) for r in records) - zf.writestr(entry_name, ndjson) - storage.write_bytes(key, buf.getvalue()) - - -def test_list_entries_returns_zip_member_names_in_archive_order(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - _write_zip( - storage, - "bulk.zip", - {"certificates-2024.json": [], "certificates-2025.json": []}, - ) - reader = BulkZipReader(storage=storage, zip_key="bulk.zip") - - # Act - entries = reader.list_entries() - - # Assert - assert entries == ["certificates-2024.json", "certificates-2025.json"] - - -def test_iter_certificates_yields_every_cert_in_entry(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - certs = [ - {"certificate_number": "CN-0001", "postcode": "AA1 1AA"}, - {"certificate_number": "CN-0002", "postcode": "BB2 2BB"}, - {"certificate_number": "CN-0003", "postcode": "CC3 3CC"}, - ] - _write_zip(storage, "bulk.zip", {"certificates-2025.json": certs}) - reader = BulkZipReader(storage=storage, zip_key="bulk.zip") - - # Act - out = list(reader.iter_certificates("certificates-2025.json")) - - # Assert - assert out == certs - - -def test_iter_certificates_filtered_yields_only_requested_across_entries(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - _write_zip( - storage, - "bulk.zip", - { - "certificates-2024.json": [ - {"certificate_number": "A-1", "postcode": "AA1"}, - {"certificate_number": "A-2", "postcode": "AA2"}, - ], - "certificates-2025.json": [ - {"certificate_number": "B-1", "postcode": "BB1"}, - {"certificate_number": "B-2", "postcode": "BB2"}, - ], - }, - ) - reader = BulkZipReader(storage=storage, zip_key="bulk.zip") - - # Act - out = sorted( - reader.iter_certificates_filtered({"A-1", "B-2", "MISSING-9"}), - key=lambda c: c["certificate_number"], - ) - - # Assert - assert [c["certificate_number"] for c in out] == ["A-1", "B-2"] diff --git a/services/ml_training_data/tests/unit/test_sample.py b/services/ml_training_data/tests/unit/test_sample.py deleted file mode 100644 index fada269a..00000000 --- a/services/ml_training_data/tests/unit/test_sample.py +++ /dev/null @@ -1,98 +0,0 @@ -"""Tests for sample.sample() — random + filterable selection over the EPC flat-register CSV. - -sample() is the entry point of the training-data pipeline: it produces a thin DataFrame -of certificate rows that downstream stages (fetch -> build_features -> write_parquet) -operate on. Filtering supports excluding obviously-wrong cohorts before paying the -per-cert fetch cost. -""" - -from pathlib import Path - -import pandas as pd -import pytest - -from ml_training_data.sample import sample - - -def _write_csv(path: Path, rows: list[dict[str, str]]) -> None: - df = pd.DataFrame(rows) - df.to_csv(path, index=False) - - -def test_sample_returns_n_rows_when_no_filter(tmp_path: Path) -> None: - # Arrange - csv = tmp_path / "register.csv" - rows = [ - {"certificate_number": f"CN-{i:04d}", "property_type": "House", "postcode": "AB1 2CD"} - for i in range(100) - ] - _write_csv(csv, rows) - - # Act - out = sample(csv_path=csv, n=10, seed=42) - - # Assert - assert len(out) == 10 - assert set(out.columns) == {"certificate_number", "property_type", "postcode"} - - -def test_sample_is_deterministic_with_same_seed(tmp_path: Path) -> None: - # Arrange - csv = tmp_path / "register.csv" - _write_csv( - csv, - [{"certificate_number": f"CN-{i:04d}", "property_type": "House"} for i in range(200)], - ) - - # Act - first = sample(csv_path=csv, n=20, seed=7) - second = sample(csv_path=csv, n=20, seed=7) - - # Assert - assert first["certificate_number"].tolist() == second["certificate_number"].tolist() - - -def test_sample_filter_selects_only_matching_rows(tmp_path: Path) -> None: - # Arrange - csv = tmp_path / "register.csv" - rows: list[dict[str, str]] = [] - for i in range(50): - rows.append({"certificate_number": f"H-{i:03d}", "property_type": "House"}) - for i in range(50): - rows.append({"certificate_number": f"F-{i:03d}", "property_type": "Flat"}) - _write_csv(csv, rows) - - # Act - out = sample(csv_path=csv, n=30, seed=1, filters={"property_type": ["House"]}) - - # Assert - assert len(out) == 30 - assert (out["property_type"] == "House").all() - - -def test_sample_returns_fewer_than_n_when_filter_too_strict(tmp_path: Path) -> None: - # Arrange - csv = tmp_path / "register.csv" - rows: list[dict[str, str]] = [] - for i in range(3): - rows.append({"certificate_number": f"BG-{i}", "property_type": "Bungalow"}) - for i in range(50): - rows.append({"certificate_number": f"H-{i:03d}", "property_type": "House"}) - _write_csv(csv, rows) - - # Act - out = sample(csv_path=csv, n=100, seed=1, filters={"property_type": ["Bungalow"]}) - - # Assert - assert len(out) == 3 - assert (out["property_type"] == "Bungalow").all() - - -def test_sample_raises_when_filter_column_missing(tmp_path: Path) -> None: - # Arrange - csv = tmp_path / "register.csv" - _write_csv(csv, [{"certificate_number": "CN-001", "property_type": "House"}]) - - # Act / Assert - with pytest.raises(KeyError): - sample(csv_path=csv, n=1, seed=1, filters={"nonexistent": ["x"]}) diff --git a/services/ml_training_data/tests/unit/test_sap_parity_probe.py b/services/ml_training_data/tests/unit/test_sap_parity_probe.py deleted file mode 100644 index 0403331d..00000000 --- a/services/ml_training_data/tests/unit/test_sap_parity_probe.py +++ /dev/null @@ -1,50 +0,0 @@ -"""Tests for sap_parity_probe — the per-cert SAP10 prediction pipeline -exercised by the corpus-wide parity probe. - -P2.1 (ADR-0010): the probe now uses SAP 10.2 (14-03-2025) spec prices -exclusively. The extracted `predict_sap_for_cert` pure function exists -so the spec-prices path is unit-testable in isolation. -""" -from __future__ import annotations - -import json -from pathlib import Path -from typing import Any, cast - -from domain.sap.rdsap.cert_to_inputs import SAP_10_2_SPEC_PRICES -from ml_training_data.sap_parity_probe import predict_sap_for_cert - - -_REPO_ROOT = Path(__file__).resolve().parents[4] -_GOLDEN_FIXTURES = ( - _REPO_ROOT - / "packages/domain/src/domain/sap/rdsap/tests/fixtures/golden" -) - - -def _load_cert_document(cert_number: str) -> dict[str, Any]: - path = _GOLDEN_FIXTURES / f"{cert_number}.json" - return cast(dict[str, Any], json.loads(path.read_text())) - - -def test_predict_sap_for_cert_returns_spec_prices_score_for_6035_7729() -> None: - # Arrange - # Cert 6035-7729-2309-0879-2296: mid-terrace, age band A, gas combi - # boiler code 104, TFA 128 m². One of the previously-retired golden - # fixtures; the cert JSON is preserved on disk as reference data - # (ADR-0010 §10), the test against it is new and pins the - # spec-prices SAP score (not the cert SAP). - cert_document = _load_cert_document("6035-7729-2309-0879-2296") - - # Act - score = predict_sap_for_cert(cert_document, prices=SAP_10_2_SPEC_PRICES) - - # Assert - # Pinned in GREEN of P2.1. If this drifts, either the calculator's - # spec-correct output for this fixture has genuinely moved (a real - # behavioural change worth investigating) or the SAP 10.2 spec - # prices in table_12.py have changed. - assert score == _EXPECTED_SCORE_6035_7729 - - -_EXPECTED_SCORE_6035_7729: int = 67 diff --git a/services/ml_training_data/tests/unit/test_storage.py b/services/ml_training_data/tests/unit/test_storage.py deleted file mode 100644 index c254c2c9..00000000 --- a/services/ml_training_data/tests/unit/test_storage.py +++ /dev/null @@ -1,90 +0,0 @@ -"""Tests for LocalStorage — fs-backed Storage protocol for the training pipeline. - -Storage is the swap-point between local-dev (LocalStorage rooted at ./data/) and the -eventual S3-backed impl. Downstream stages (bulk_fetch, write_parquet) talk to the -Storage protocol only, not Path. -""" - -from pathlib import Path - -import pytest - -from ml_training_data.storage import LocalStorage - - -def test_write_bytes_then_read_bytes_returns_same_data(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - payload = b"hello world" - - # Act - storage.write_bytes("greetings/hello.txt", payload) - out = storage.read_bytes("greetings/hello.txt") - - # Assert - assert out == payload - - -def test_exists_is_false_before_write_and_true_after(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - - # Act - before = storage.exists("a/b.bin") - storage.write_bytes("a/b.bin", b"x") - after = storage.exists("a/b.bin") - - # Assert - assert before is False - assert after is True - - -def test_iter_keys_yields_every_written_key(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - storage.write_bytes("certs/a.json", b"1") - storage.write_bytes("certs/b.json", b"2") - storage.write_bytes("manifest.json", b"3") - - # Act - keys = sorted(storage.iter_keys()) - - # Assert - assert keys == ["certs/a.json", "certs/b.json", "manifest.json"] - - -def test_iter_keys_filters_by_prefix(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - storage.write_bytes("certs/a.json", b"1") - storage.write_bytes("certs/b.json", b"2") - storage.write_bytes("manifest.json", b"3") - - # Act - keys = sorted(storage.iter_keys(prefix="certs/")) - - # Assert - assert keys == ["certs/a.json", "certs/b.json"] - - -def test_read_bytes_raises_filenotfound_for_missing_key(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - - # Act / Assert - with pytest.raises(FileNotFoundError): - storage.read_bytes("nope.bin") - - -def test_open_read_returns_seekable_binary_stream(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - storage.write_bytes("big.bin", b"abcdefghij") - - # Act - with storage.open_read("big.bin") as f: - f.seek(4) - chunk = f.read(3) - - # Assert - assert chunk == b"efg" diff --git a/services/ml_training_data/tests/unit/test_train_baseline.py b/services/ml_training_data/tests/unit/test_train_baseline.py deleted file mode 100644 index 969f3785..00000000 --- a/services/ml_training_data/tests/unit/test_train_baseline.py +++ /dev/null @@ -1,235 +0,0 @@ -"""Tests for train_baseline() — fits one LightGBM regressor per target. - -train_baseline produces the baseline metrics (MAPE + R^2) and dumps per-target -feature-importance JSON to storage. This is the only stage that pulls in -LightGBM + sklearn; downstream training repos read the metrics + parquet only. -""" - -import json -from pathlib import Path - -import numpy as np -import pandas as pd - -from ml_training_data.storage import LocalStorage -from ml_training_data.train_baseline import train_baseline - - -def _synthetic_dataset(n: int = 200, seed: int = 0) -> pd.DataFrame: - rng = np.random.default_rng(seed) - floor_area = rng.uniform(40, 200, size=n) - walls = rng.integers(1, 5, size=n) - # sap_score correlates with floor_area + walls, plus noise. - sap_score = (100 - 0.2 * floor_area + 3 * walls + rng.normal(0, 2, size=n)).astype(int) - return pd.DataFrame( - { - "certificate_number": [f"CN-{i:04d}" for i in range(n)], - "total_floor_area_m2": floor_area, - "wall_count": walls, - "sap_score": sap_score, - } - ) - - -def test_train_baseline_returns_mape_and_r2_per_target(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - df = _synthetic_dataset() - - # Act - metrics = train_baseline( - df=df, - targets=["sap_score"], - storage=storage, - run_key="runs/2026-05-16/", - seed=42, - ) - - # Assert - assert "sap_score" in metrics - assert "mape" in metrics["sap_score"] - assert "r2" in metrics["sap_score"] - assert metrics["sap_score"]["r2"] > 0.0 # learns something on a correlated signal - - -def test_low_sap_tail_weight_returns_3x_for_rows_below_58_else_1x() -> None: - # Arrange — exposed helper so callers wanting the default tail strategy - # can plug it straight into train_baseline. SAP-rating boundary 58 chosen - # from slice 16h's per-decile residuals: decile 0 (SAP 1-58) carries 17% - # MAPE; deciles 1-9 are all below 5%. - import pandas as pd # noqa: PLC0415 - - from ml_training_data.train_baseline import low_sap_tail_weight # noqa: PLC0415 - - # Act - weights = low_sap_tail_weight(pd.Series([20, 50, 58, 60, 90])) - - # Assert - assert list(weights) == [3.0, 3.0, 1.0, 1.0, 1.0] - - -def test_train_baseline_accepts_sample_weight_fn_per_target(tmp_path: Path) -> None: - # Arrange — sample_weight_fn is a callable taking the training-label Series - # and returning a Series of weights the same length. When supplied, the - # weights flow into LGBMRegressor.fit's sample_weight argument and the - # model emphasizes the heavily-weighted rows. We verify the indirection - # works by training twice (no weights vs heavy-weighted tail) and - # confirming the predictions differ on the tail subset. - import numpy as np # noqa: PLC0415 - import pandas as pd # noqa: PLC0415 - - storage = LocalStorage(root=tmp_path) - df = _synthetic_dataset(n=600, seed=0) - - def weight_tail(y: "pd.Series[Any]") -> "pd.Series[Any]": - return pd.Series(np.where(np.asarray(y, dtype=float) < 60, 10.0, 1.0), index=y.index) - - # Act - m_unweighted = train_baseline( - df=df.copy(), targets=["sap_score"], storage=storage, - run_key="runs/unw/", seed=42, - ) - m_weighted = train_baseline( - df=df.copy(), targets=["sap_score"], storage=storage, - run_key="runs/w/", seed=42, sample_weight_fn=weight_tail, - ) - - # Assert — global MAE should differ between weighted and unweighted runs. - # (Direction depends on data; we just need to see that the weight reached LGBM.) - assert m_unweighted["sap_score"]["mae"] != m_weighted["sap_score"]["mae"] - - -def test_train_baseline_reports_mae_and_rmse_per_target(tmp_path: Path) -> None: - # Arrange — MAE gives users-facing "predicted SAP within N points" meaning; - # RMSE penalises large errors quadratically. Both should be reported next - # to MAPE so we can read the residual without inverting MAPE math by hand. - storage = LocalStorage(root=tmp_path) - df = _synthetic_dataset() - - # Act - metrics = train_baseline( - df=df, targets=["sap_score"], storage=storage, - run_key="runs/2026-05-16/", seed=42, - ) - - # Assert - assert "mae" in metrics["sap_score"] - assert "rmse" in metrics["sap_score"] - assert metrics["sap_score"]["mae"] > 0 - assert metrics["sap_score"]["rmse"] >= metrics["sap_score"]["mae"] # always true mathematically - - -def test_train_baseline_writes_feature_importance_per_target(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - df = _synthetic_dataset() - - # Act - train_baseline( - df=df, - targets=["sap_score"], - storage=storage, - run_key="runs/2026-05-16/", - seed=42, - ) - - # Assert - importance = json.loads(storage.read_bytes("runs/2026-05-16/importance_sap_score.json")) - assert set(importance.keys()) == {"total_floor_area_m2", "wall_count"} - assert all(isinstance(v, (int, float)) for v in importance.values()) - - -def test_train_baseline_handles_multiple_targets_independently(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - df = _synthetic_dataset() - df["co2_emissions"] = df["sap_score"] * 0.1 + 1.0 # second correlated target - - # Act - metrics = train_baseline( - df=df, - targets=["sap_score", "co2_emissions"], - storage=storage, - run_key="runs/2026-05-16/", - seed=42, - ) - - # Assert - assert set(metrics.keys()) == {"sap_score", "co2_emissions"} - assert storage.exists("runs/2026-05-16/importance_sap_score.json") - assert storage.exists("runs/2026-05-16/importance_co2_emissions.json") - assert storage.exists("runs/2026-05-16/metrics.json") - - -def test_train_baseline_writes_per_decile_residuals_per_target(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - df = _synthetic_dataset(n=500) - - # Act - train_baseline( - df=df, - targets=["sap_score"], - storage=storage, - run_key="runs/2026-05-16/", - seed=42, - ) - - # Assert - residuals = json.loads(storage.read_bytes("runs/2026-05-16/residuals_sap_score.json")) - assert "buckets" in residuals - assert len(residuals["buckets"]) == 10 - expected_keys = {"decile", "true_min", "true_max", "count", "mape", "mae", "mean_residual"} - for bucket in residuals["buckets"]: - assert expected_keys <= set(bucket.keys()) - # The 10 bucket counts sum to the test-set size (20% of df). - assert sum(b["count"] for b in residuals["buckets"]) == int(len(df) * 0.2) - # Buckets are ordered by true_min ascending. - true_mins = [b["true_min"] for b in residuals["buckets"]] - assert true_mins == sorted(true_mins) - - -def test_train_baseline_uses_default_regression_objective_per_slice_16h(tmp_path: Path) -> None: - # Arrange — slice 16g originally switched sap_score + peui_ucl to - # objective='mape'; slice 16h's 250k ablation showed that lost ~0.6 pts - # of global MAPE because mape over-weights the low-SAP tail. Reverted - # to default 'regression' for all targets; tail strategy moves to - # sample weights in slice 16i. - storage = LocalStorage(root=tmp_path) - df = _synthetic_dataset(n=300) - df["peui_ucl"] = df["sap_score"].astype(float) + 5.0 - - # Act - metrics = train_baseline( - df=df, - targets=["sap_score", "peui_ucl"], - storage=storage, - run_key="runs/2026-05-16/", - seed=42, - ) - - # Assert - assert "sap_score" in metrics - assert "peui_ucl" in metrics - from ml_training_data.train_baseline import _OBJECTIVE_OVERRIDES # noqa: PLC0415 - assert _OBJECTIVE_OVERRIDES == {} - - -def test_train_baseline_residuals_emitted_per_target_independently(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - df = _synthetic_dataset(n=500) - df["co2_emissions"] = df["sap_score"] * 0.1 + 1.0 - - # Act - train_baseline( - df=df, - targets=["sap_score", "co2_emissions"], - storage=storage, - run_key="runs/2026-05-16/", - seed=42, - ) - - # Assert - assert storage.exists("runs/2026-05-16/residuals_sap_score.json") - assert storage.exists("runs/2026-05-16/residuals_co2_emissions.json") diff --git a/services/ml_training_data/tests/unit/test_write_parquet.py b/services/ml_training_data/tests/unit/test_write_parquet.py deleted file mode 100644 index 571d969f..00000000 --- a/services/ml_training_data/tests/unit/test_write_parquet.py +++ /dev/null @@ -1,98 +0,0 @@ -"""Tests for write_training_dataset() — parquet + schema.json + manifest.json. - -The output triple is the contract handed to the AutoGluon training repo: - - data.parquet: feature+target rows - - schema.json: column specs (categorical flags, target list) so the consumer - can reconstruct dtype intent without re-parsing the transform module. - - manifest.json: run metadata (when, transform version, source bulk-ZIP info). -""" - -import json -from pathlib import Path - -import pandas as pd - -from domain.ml.schema import ColumnSpec, TransformSchema -from ml_training_data.storage import LocalStorage -from ml_training_data.write_parquet import write_training_dataset - - -def _toy_schema() -> "TransformSchema": - return TransformSchema( - transform_version="0.1.0", - feature_columns={ - "total_floor_area_m2": ColumnSpec( - dtype=float, nullable=False, description="floor area" - ), - "property_type": ColumnSpec( - dtype=str, nullable=True, description="cat", categorical=True - ), - }, - target_columns={ - "sap_score": ColumnSpec(dtype=int, nullable=False, description="target"), - }, - ) - - -def test_write_training_dataset_persists_dataframe_to_parquet(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - df = pd.DataFrame( - { - "certificate_number": ["A-1", "A-2"], - "total_floor_area_m2": [80.0, 120.0], - "sap_score": [70, 85], - } - ) - - # Act - write_training_dataset( - df=df, storage=storage, run_key="runs/2026-05-16/", schema=_toy_schema() - ) - - # Assert - assert storage.exists("runs/2026-05-16/data.parquet") - roundtrip = pd.read_parquet(tmp_path / "runs/2026-05-16/data.parquet") - assert roundtrip["certificate_number"].tolist() == ["A-1", "A-2"] - assert roundtrip["sap_score"].tolist() == [70, 85] - - -def test_write_training_dataset_writes_schema_json_alongside_parquet(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - df = pd.DataFrame({"total_floor_area_m2": [80.0], "property_type": ["House"], "sap_score": [70]}) - - # Act - write_training_dataset( - df=df, storage=storage, run_key="runs/2026-05-16/", schema=_toy_schema() - ) - - # Assert - schema_doc = json.loads(storage.read_bytes("runs/2026-05-16/schema.json")) - assert schema_doc["transform_version"] == "0.1.0" - assert "total_floor_area_m2" in schema_doc["features"] - assert schema_doc["features"]["property_type"]["categorical"] is True - assert schema_doc["targets"]["sap_score"]["dtype"] == "int" - - -def test_write_training_dataset_writes_manifest_with_row_count_and_source(tmp_path: Path) -> None: - # Arrange - storage = LocalStorage(root=tmp_path) - df = pd.DataFrame({"total_floor_area_m2": [80.0, 95.0, 110.0], "sap_score": [70, 75, 80]}) - source_info = {"bulk_zip_last_updated": "2026-05-14T09:59:32Z", "bulk_zip_size_bytes": 15_642_371_075} - - # Act - write_training_dataset( - df=df, - storage=storage, - run_key="runs/2026-05-16/", - schema=_toy_schema(), - source_info=source_info, - ) - - # Assert - manifest = json.loads(storage.read_bytes("runs/2026-05-16/manifest.json")) - assert manifest["row_count"] == 3 - assert manifest["transform_version"] == "0.1.0" - assert manifest["source_info"] == source_info - assert "written_at" in manifest