deleted scaffolding services folder

This commit is contained in:
Khalim Conn-Kowlessar 2026-05-26 10:41:00 +00:00
parent a75052dcca
commit 168e7f18a1
31 changed files with 0 additions and 1691 deletions

View file

@ -1,13 +0,0 @@
# Services
Each subdirectory is a deployable unit — typically a Lambda image. Own `pyproject.toml`, own `Dockerfile`, own deps. Lambda bundle contains only that service's deps + its workspace deps.
| Service | Purpose |
|---------|---------|
| [`ara/`](./ara/) | The Domna retrofit modelling backend — ingestion + modelling pipelines, all 9 services in [PRD §9.2](../ara_backend_design.md). |
Other Domna services (address2uprn, hubspot, pashub, ecmk, magicplan) live in the legacy `backend/` and `etl/` trees for now; they are slated to migrate here as their owners pick them up — see [PRD §11](../ara_backend_design.md). When that work starts, scaffold the service under `services/<name>/` and add it to the workspace members in the root `pyproject.toml`.
## Service boundary
A service can `import domain.*`, `import repos.*`, `import fetchers.*`, `import utils.*` (workspace deps). It **cannot** import another service's modules — they are separate distributions with no cross-import path. This is the structural enforcement of the modelling/ingestion separation ([ADR-0003](../docs/adr/0003-strict-ingestion-modelling-separation.md)).

View file

@ -1,12 +0,0 @@
# Lambda image for the Ara modelling backend.
#
# This is a scaffold — final image will install only ara + its workspace deps
# (domna-domain, domna-repos, domna-fetchers, domna-utils) plus ML/data libraries.
# Build via uv to keep cold-start size contained.
FROM public.ecr.aws/lambda/python:3.11
# TODO: install uv, sync this service's deps from the workspace lock file,
# copy src/ara/ into ${LAMBDA_TASK_ROOT}/, set CMD to the Lambda handler.
CMD ["ara.lambdas.handler.handler"]

View file

@ -1,30 +0,0 @@
# ara
The Domna retrofit modelling backend. Replaces the legacy `backend/engine/engine.py` monolith with a service-oriented pipeline that survives the 30 May 2026 gov EPC API cut-over and that other team members can read, fix, and extend.
Design document: [`../../ara_backend_design.md`](../../ara_backend_design.md).
Domain glossary: [`../../CONTEXT.md`](../../CONTEXT.md).
## Layout
```
src/ara/
├── services/ # the 9 domain services from PRD §9.2:
│ # EpcRemappingService, EpcPredictionService,
│ # FeatureBuilder, EpcEnergyDerivationService,
│ # RebaseliningService, RecommendationService,
│ # ImpactPredictionService, OptimiserService,
│ # ValuationService, ResultsPersister
├── orchestrators/ # IngestionPipeline, ModellingPipeline, RefreshOrchestrator
└── lambdas/ # one handler.py per Lambda + the event-shape contracts
```
## Pipeline
See [PRD §9.4](../../ara_backend_design.md) for the per-batch step order. Briefly: per-property setup (steps 16) runs once per Property; the per-scenario × per-phase loop (steps 710) re-derives candidates and impact predictions against the rolling Effective EPC state; results are persisted under one Unit of Work per (Plan, Scenario).
## Testing
- `tests/unit/` — service tests against fakes from `tests/fakes/`. No DB, no network, no ML lambda.
- `tests/integration/` — real Postgres (testcontainers / localstack), fake fetchers + fake ML lambdas.
- ML transform contract tests live with `domain.ml.transform` in `packages/domain/`.

View file

@ -1,28 +0,0 @@
[project]
name = "ara"
version = "0.1.0"
description = "The Domna retrofit modelling backend. Ingestion + modelling pipelines."
requires-python = ">=3.11"
dependencies = [
"domna-domain",
"domna-repos",
"domna-fetchers",
"domna-utils",
"pandas>=2.0",
"pandas-stubs",
"numpy>=1.26",
"pydantic>=2.0",
]
[tool.uv.sources]
domna-domain = { workspace = true }
domna-repos = { workspace = true }
domna-fetchers = { workspace = true }
domna-utils = { workspace = true }
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/ara"]

View file

@ -1,4 +0,0 @@
"""The Domna retrofit modelling backend.
See README.md and ara_backend_design.md (repo root) for the architecture.
"""

View file

@ -1,5 +0,0 @@
"""Lambda handlers + event-shape contracts.
One handler per deployable Lambda. See PRD §4.6 for the ModelTriggerRequest
shape.
"""

View file

@ -1,5 +0,0 @@
"""Orchestrators for the Ara pipeline.
IngestionPipeline, ModellingPipeline, RefreshOrchestrator. The only place
where step order is encoded and where fetchers + services + repos meet.
"""

View file

@ -1,9 +0,0 @@
"""Domain services for the Ara modelling pipeline (PRD §9.2).
EpcRemappingService, EpcPredictionService, FeatureBuilder,
EpcEnergyDerivationService, RebaseliningService, RecommendationService,
ImpactPredictionService, OptimiserService, ValuationService, ResultsPersister.
Each service operates on `Properties` and depends only on repos + other services
+ domain objects. No external IO (per ADR-0003).
"""

View file

@ -1,4 +0,0 @@
"""Fake repos and fetchers for unit tests.
One Fake<Name>Repo per real repo; dict-backed; no DB. Same for fetchers.
"""

View file

@ -1,26 +0,0 @@
[project]
name = "ml-training-data"
version = "0.1.0"
description = "Pipeline that turns the EPC open-data CSV into ML training parquet + baseline models."
requires-python = ">=3.11"
dependencies = [
"domna-domain",
"pandas>=2.0",
"pandas-stubs",
"ijson>=3.2",
"pyarrow>=15",
"lightgbm>=4.0",
"scikit-learn>=1.4",
"httpx",
"remotezip>=0.12",
]
[tool.uv.sources]
domna-domain = { workspace = true }
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/ml_training_data"]

View file

@ -1,5 +0,0 @@
"""EPC CSV → training-data pipeline.
Produces parquet + schema.json + manifest.json for baseline LightGBM training.
See ara_backend_design.md (repo root) for the pipeline shape.
"""

View file

@ -1,74 +0,0 @@
"""Wrapper records -> EpcPropertyData -> feature+target DataFrame.
Each wrapper record from the bulk ZIP carries the cert metadata plus a
JSON-encoded `document` payload. This function:
- Filters out non-RdSAP assessments (our mapper only handles RdSAP-Schema-21.x).
- Parses `document` and feeds it to EpcPropertyDataMapper.from_api_response.
- Calls EpcMlTransform.to_row per cert (streaming) and discards the heavy
EpcPropertyData immediately so memory stays O(row-dict) per cert rather than
O(EpcPropertyData * n) critical for the 500k+ cert full-year runs.
- Prepends a `certificate_number` column so every row is traceable to its source.
"""
import json
from typing import Any, cast
import pandas as pd
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from domain.ml.schema import TransformSchema
from domain.ml.transform import EpcMlTransform
from ml_training_data.bulk_zip_reader import BulkZipReader
_RDSAP_ASSESSMENT_TYPE = "RdSAP"
def build_features(
bulk_reader: BulkZipReader,
certificate_numbers: set[str],
*,
skip_unsupported_schemas: bool = True,
) -> pd.DataFrame:
transform = EpcMlTransform()
rows: list[dict[str, Any]] = []
cert_nums: list[str] = []
for record in bulk_reader.iter_certificates_filtered(certificate_numbers):
if record.get("assessment_type") != _RDSAP_ASSESSMENT_TYPE:
continue
document_field = record.get("document")
if isinstance(document_field, str):
document = cast(dict[str, Any], json.loads(document_field))
elif isinstance(document_field, dict):
document = cast(dict[str, Any], document_field)
else:
continue
try:
prop = EpcPropertyDataMapper.from_api_response(document)
except ValueError:
if skip_unsupported_schemas:
continue
raise
rows.append(transform.to_row(prop))
cert_nums.append(str(record["certificate_number"]))
# prop and document drop out of scope; GC reclaims before the next iter.
df = _frame_from_rows(rows, transform.schema())
df["certificate_number"] = cert_nums
return df[["certificate_number", *[c for c in df.columns if c != "certificate_number"]]]
def _frame_from_rows(rows: list[dict[str, Any]], schema: TransformSchema) -> pd.DataFrame:
"""Build the typed DataFrame from streamed row dicts.
Mirrors EpcMlTransform.to_rows post-processing: full column set even when empty,
and pd.Categorical casts for any column flagged categorical in the schema.
"""
all_columns = list(schema.feature_columns.keys()) + list(schema.target_columns.keys())
df = pd.DataFrame(rows, columns=all_columns)
for name, spec in schema.feature_columns.items():
if spec.categorical:
df[name] = df[name].astype("category")
for name, spec in schema.target_columns.items():
if spec.categorical:
df[name] = df[name].astype("category")
return df

View file

@ -1,54 +0,0 @@
"""Stream EPC certificate wrappers from the gov bulk JSON ZIP.
The bulk ZIP from /api/files/domestic/json contains one entry per inspection year
(certificates-YYYY.json). Each entry is a stream of concatenated JSON objects
(NDJSON-shaped, no enclosing array) where every wrapper has:
- certificate_number: str
- assessment_type: "RdSAP" | "SAP" | ...
- document: str a JSON-encoded payload with the full EPC schema document
- warehouse_created_at, updated_at: provenance timestamps
The reader yields those wrappers as-is. Unwrapping `document` and filtering by
assessment_type belongs at the build_features layer, where the domain knowledge
of which assessment types our mapper supports already lives.
"""
import zipfile
from collections.abc import Iterator
from typing import Any, cast
import ijson # type: ignore[import-untyped]
from ml_training_data.storage import Storage
class BulkZipReader:
def __init__(self, storage: Storage, zip_key: str) -> None:
self._storage = storage
self._zip_key = zip_key
def list_entries(self) -> list[str]:
with zipfile.ZipFile(self._storage.open_read(self._zip_key)) as zf:
return zf.namelist()
def iter_certificates(self, entry: str) -> Iterator[dict[str, Any]]:
with zipfile.ZipFile(self._storage.open_read(self._zip_key)) as zf:
with zf.open(entry) as f:
for item in ijson.items(f, "", use_float=True, multiple_values=True):
yield cast(dict[str, Any], item)
def iter_certificates_filtered(
self, certificate_numbers: set[str]
) -> Iterator[dict[str, Any]]:
remaining = set(certificate_numbers)
for entry in self.list_entries():
if not remaining:
return
for cert in self.iter_certificates(entry):
cert_num = cert.get("certificate_number")
if cert_num in remaining:
remaining.discard(cert_num)
yield cert
if not remaining:
return

View file

@ -1,82 +0,0 @@
"""Extract specific yearly entries from the gov bulk JSON ZIP without downloading
the whole 15 GB archive.
The gov endpoint redirects to a pre-signed S3 URL; remotezip uses HTTP Range
requests so only the bytes for the requested entries traverse the wire. Each entry
is streamed in chunks and re-zipped into a *separate* local archive (one per year):
- Smaller disk footprint than the uncompressed JSON would need (5-8 GB per year).
- Per-year resumability: a partial-network failure on year N doesn't waste years
that already completed; rerun and the existing zips are skipped.
- BulkZipReader keeps working unchanged on each per-year local ZIP.
"""
import zipfile
from pathlib import Path
import httpx
from remotezip import RemoteZip # type: ignore[import-untyped] # pyright: ignore[reportMissingTypeStubs]
_BULK_JSON_URL = (
"https://api.get-energy-performance-data.communities.gov.uk/api/files/domestic/json"
)
_READ_CHUNK_BYTES = 8 * 1024 * 1024 # 8 MB
def extract_entries_to_local_zips(
auth_token: str,
entry_names: list[str],
output_dir: Path,
) -> dict[str, int]:
output_dir.mkdir(parents=True, exist_ok=True)
sizes: dict[str, int] = {}
for entry in entry_names:
out_path = output_dir / f"{entry}.zip"
if out_path.exists():
sizes[entry] = out_path.stat().st_size
continue
# The S3 presigned URL is short-lived (X-Amz-Expires=30s), so refresh it
# per entry — extracting one entry can easily take longer than the TTL.
presigned_url = _resolve_presigned_url(auth_token)
with RemoteZip(presigned_url) as in_zf: # pyright: ignore[reportUnknownVariableType]
_stream_entry_to_zip(in_zf, entry, out_path)
sizes[entry] = out_path.stat().st_size
return sizes
def _stream_entry_to_zip(
in_zf: RemoteZip, # pyright: ignore[reportUnknownParameterType]
entry: str,
out_path: Path,
) -> None:
tmp_path = out_path.with_suffix(out_path.suffix + ".part")
try:
with zipfile.ZipFile(tmp_path, "w", zipfile.ZIP_DEFLATED) as out_zf:
with in_zf.open(entry) as src, out_zf.open(entry, "w", force_zip64=True) as dst: # pyright: ignore[reportUnknownMemberType,reportUnknownVariableType]
while True:
chunk: bytes = src.read(_READ_CHUNK_BYTES) # pyright: ignore[reportUnknownVariableType,reportUnknownMemberType]
if not chunk:
break
dst.write(chunk)
tmp_path.rename(out_path)
except BaseException:
if tmp_path.exists():
tmp_path.unlink()
raise
def _resolve_presigned_url(auth_token: str) -> str:
response = httpx.get(
_BULK_JSON_URL,
headers={"Authorization": f"Bearer {auth_token}"},
follow_redirects=False,
timeout=30,
)
if response.status_code != 302:
raise RuntimeError(
f"Bulk JSON endpoint did not redirect: {response.status_code} {response.text[:200]}"
)
location = response.headers.get("location")
if not location:
raise RuntimeError("Bulk JSON 302 had no Location header")
return location

View file

@ -1,33 +0,0 @@
"""Sample certificate rows from the EPC flat-register CSV.
The flat-register CSV (2.4M rows) is the *only* exhaustive list of certificate
numbers; per-certificate detail is fetched separately downstream. sample() returns
a thin DataFrame keyed by certificate_number so later stages know which records to
fetch and how to join their per-cert JSON back to register-side metadata.
"""
from collections.abc import Mapping, Sequence
from pathlib import Path
from typing import Optional
import pandas as pd
def sample(
csv_path: Path,
n: int,
seed: int,
filters: Optional[Mapping[str, Sequence[str]]] = None,
) -> pd.DataFrame:
df: pd.DataFrame = pd.read_csv(csv_path, dtype=str, keep_default_na=False)
if filters:
for column, allowed in filters.items():
if column not in df.columns:
raise KeyError(f"filter column not present in CSV: {column!r}")
df = df[df[column].isin(list(allowed))]
if len(df) <= n:
return df.reset_index(drop=True)
return df.sample(n=n, random_state=seed).reset_index(drop=True)

View file

@ -1,243 +0,0 @@
"""Sap10Calculator parity probe over N random certs from the corpus.
ADR-0009 Session B exploratory tool. Loads the v18a parquet, samples N
certs from the typical sap-score range, streams them from the bulk JSON
ZIPs, runs the calculator, and prints the residual distribution +
worst-N residuals for spec-iteration triage.
Usage (from repo root, with the workspace venv active):
python -m ml_training_data.sap_parity_probe # N=100, seed=7
python -m ml_training_data.sap_parity_probe 500 13 # custom N + seed
Findings get written up in docs/sap-spec/PARITY_FINDINGS.md.
"""
from __future__ import annotations
import json
import sys
import time
from pathlib import Path
from typing import Any, cast
import pandas as pd
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from domain.sap.calculator import calculate_sap_from_inputs
from domain.sap.rdsap.cert_to_inputs import (
PriceTable,
SAP_10_2_SPEC_PRICES,
cert_to_inputs,
)
from ml_training_data.bulk_zip_reader import BulkZipReader
from ml_training_data.storage import LocalStorage
_REPO = Path(__file__).resolve().parents[4]
_PARQUET = _REPO / "data" / "ml_training" / "runs" / "2025_2026_n250000_v18a" / "data.parquet"
_BULK = _REPO / "data" / "ml_training" / "bulk"
_ZIP_KEYS = ("certificates-2025.json.zip", "certificates-2026.json.zip")
def predict_sap_for_cert(
cert_document: dict[str, Any], *, prices: PriceTable
) -> int:
"""Run the mapper → cert_to_inputs → calculator pipeline on a single
cert document and return the rounded RdSAP-style SAP score. The
pure-function seam the corpus probe and any future per-cert dev
tools share."""
epc = EpcPropertyDataMapper.from_api_response(cert_document)
inputs = cert_to_inputs(epc, prices=prices)
result = calculate_sap_from_inputs(inputs)
return result.sap_score
def _sample_certs(n: int, seed: int) -> dict[str, int]:
df = pd.read_parquet(_PARQUET, columns=["certificate_number", "sap_score"])
# Wide range so the sample includes full-SAP new-builds (sap_score 90+)
# and the deepest-tail heritage/anomaly certs (sap_score ≤ 20). Earlier
# `between(20, 95)` excluded the populations where the calculator's
# biggest spec gaps tend to live.
df = df[df["sap_score"].between(5, 99)]
s = df.sample(n, random_state=seed)
return dict(zip(s["certificate_number"], s["sap_score"].astype(int)))
def main(argv: list[str] | None = None) -> None:
args = argv if argv is not None else sys.argv[1:]
n = int(args[0]) if args else 100
seed = int(args[1]) if len(args) > 1 else 7
targets = _sample_certs(n, seed)
print(
f"Sampling {len(targets)} certs (seed={seed}) — using SAP 10.2 "
f"(14-03-2025) spec prices per ADR-0010"
)
storage = LocalStorage(_BULK)
prices = SAP_10_2_SPEC_PRICES
results: list[dict[str, Any]] = []
errors: list[dict[str, Any]] = []
remaining = set(targets)
t0 = time.monotonic()
for zip_key in _ZIP_KEYS:
if not remaining:
break
if not storage.exists(zip_key):
print(f"!! missing {zip_key}", file=sys.stderr)
continue
reader = BulkZipReader(storage, zip_key)
for cert in reader.iter_certificates_filtered(remaining):
cn = cert["certificate_number"]
actual = targets[cn]
doc_field = cert.get("document")
document = cast(
dict[str, Any],
json.loads(doc_field) if isinstance(doc_field, str) else doc_field,
)
try:
epc = EpcPropertyDataMapper.from_api_response(document)
inputs = cert_to_inputs(epc, prices=prices)
result = calculate_sap_from_inputs(inputs)
cert_primary = epc.energy_consumption_current
tfa = epc.total_floor_area_m2 or 1.0
pf_space = inputs.space_heating_primary_factor
pf_hw = inputs.hot_water_primary_factor
pf_other = inputs.other_primary_factor
main_detail = (
epc.sap_heating.main_heating_details[0]
if epc.sap_heating and epc.sap_heating.main_heating_details
else None
)
age_band = (
epc.sap_building_parts[0].construction_age_band
if epc.sap_building_parts
else None
)
results.append({
"cert": cn,
"actual": actual,
"predicted": result.sap_score,
"residual": result.sap_score - actual,
"ecf": round(result.ecf, 3),
"tfa": epc.total_floor_area_m2,
"ext": epc.extensions_count,
"dwelling": epc.dwelling_type,
"our_primary_kwh_m2": round(result.primary_energy_kwh_per_m2, 1),
"cert_primary_kwh_m2": cert_primary,
"primary_resid": (
round(result.primary_energy_kwh_per_m2 - cert_primary, 1)
if cert_primary is not None
else None
),
# End-use primary-energy split, kWh/m² of TFA (calculator side).
"space_pe_m2": round(
(result.main_heating_fuel_kwh_per_yr + result.secondary_heating_fuel_kwh_per_yr)
* pf_space / tfa, 1
),
"hw_pe_m2": round(result.hot_water_kwh_per_yr * pf_hw / tfa, 1),
"lighting_pe_m2": round(result.lighting_kwh_per_yr * pf_other / tfa, 1),
"pumps_pe_m2": round(result.pumps_fans_kwh_per_yr * pf_other / tfa, 1),
"pv_pe_m2": round(
-inputs.pv_generation_kwh_per_yr * pf_other / tfa, 1
),
# Strata for residual decomposition.
"main_cat": main_detail.main_heating_category if main_detail else None,
"main_fuel": main_detail.main_fuel_type if main_detail else None,
"age": age_band,
"water_code": epc.sap_heating.water_heating_code if epc.sap_heating else None,
"has_cyl": epc.sap_heating.cylinder_size is not None if epc.sap_heating else False,
})
except Exception as e: # noqa: BLE001 — exploratory probe
errors.append({"cert": cn, "actual": actual, "error": f"{type(e).__name__}: {e}"})
remaining.discard(cn)
elapsed = time.monotonic() - t0
df = pd.DataFrame(results)
print(f"\nelapsed {elapsed:.1f}s; calculated={len(results)}, errored={len(errors)}, not_found={len(remaining)}")
if not df.empty:
df["abs_resid"] = df["residual"].abs()
print(f"\nMAE: {df['residual'].abs().mean():.2f}")
print(f"RMSE: {((df['residual'] ** 2).mean()) ** 0.5:.2f}")
print(f"bias: {df['residual'].mean():.2f}")
for thr in (1, 3, 5, 10):
pct = (df["abs_resid"] <= thr).mean() * 100
print(f"within ±{thr}: {pct:.1f}%")
print("\nresidual distribution:")
print(df["residual"].describe(percentiles=[0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95]))
print("\nworst 15 by |residual| (SAP):")
print(df.nlargest(15, "abs_resid")[
["cert", "actual", "predicted", "residual", "ecf", "tfa", "ext", "dwelling"]
].to_string(index=False))
# ------------------------------------------------------------------
# Primary-energy decomposition. The cert reports a single PEUI
# (`energy_consumption_current`, kWh/m² TFA), so we can't see the
# cert's per-end-use mix — but we can see ours, and stratify the
# residual by cert attributes to localise which population drives
# the bias.
# ------------------------------------------------------------------
pe_df = df.dropna(subset=["primary_resid"]).copy()
if not pe_df.empty:
pe_df["abs_pe_resid"] = pe_df["primary_resid"].abs()
print(f"\n=== Primary energy (kWh/m² TFA) ===")
print(f"PE MAE: {pe_df['abs_pe_resid'].mean():.2f}")
print(f"PE bias: {pe_df['primary_resid'].mean():.2f}")
print(f"PE RMSE: {((pe_df['primary_resid'] ** 2).mean()) ** 0.5:.2f}")
print("PE residual distribution:")
print(pe_df["primary_resid"].describe(
percentiles=[0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95]
))
print("\nCorpus mean OUR primary energy by end-use (kWh/m² TFA):")
mix_cols = ["space_pe_m2", "hw_pe_m2", "lighting_pe_m2", "pumps_pe_m2", "pv_pe_m2"]
for col in mix_cols:
print(f" {col:14s} {pe_df[col].mean():6.1f}")
print(f" {'TOTAL (ours)':14s} {pe_df['our_primary_kwh_m2'].mean():6.1f}")
print(f" {'TOTAL (cert)':14s} {pe_df['cert_primary_kwh_m2'].mean():6.1f}")
# Stratified bias by main heating category.
print("\nPE residual stratified by main_heating_category:")
by_cat = pe_df.groupby("main_cat", dropna=False).agg(
n=("cert", "count"),
pe_mae=("abs_pe_resid", "mean"),
pe_bias=("primary_resid", "mean"),
our_pe=("our_primary_kwh_m2", "mean"),
cert_pe=("cert_primary_kwh_m2", "mean"),
space_pe=("space_pe_m2", "mean"),
hw_pe=("hw_pe_m2", "mean"),
light_pe=("lighting_pe_m2", "mean"),
).round(1)
print(by_cat.to_string())
# Stratified bias by age band — narrows the envelope-side hypothesis.
print("\nPE residual stratified by construction_age_band:")
by_age = pe_df.groupby("age", dropna=False).agg(
n=("cert", "count"),
pe_mae=("abs_pe_resid", "mean"),
pe_bias=("primary_resid", "mean"),
space_pe=("space_pe_m2", "mean"),
).round(1).sort_index()
print(by_age.to_string())
# Stratified bias by dwelling type — flat vs detached should split
# if envelope-side (party-wall surfaces) is the dominant residual.
print("\nPE residual stratified by dwelling_type:")
by_dwel = pe_df.groupby("dwelling", dropna=False).agg(
n=("cert", "count"),
pe_mae=("abs_pe_resid", "mean"),
pe_bias=("primary_resid", "mean"),
space_pe=("space_pe_m2", "mean"),
).round(1)
print(by_dwel.to_string())
print("\nworst 15 by |PE residual|:")
print(pe_df.nlargest(15, "abs_pe_resid")[
["cert", "actual", "primary_resid",
"our_primary_kwh_m2", "cert_primary_kwh_m2",
"space_pe_m2", "hw_pe_m2", "main_cat", "dwelling"]
].to_string(index=False))
if errors:
print("\nerrors:")
for e in errors[:10]:
print(" ", e)
if __name__ == "__main__":
main()

View file

@ -1,49 +0,0 @@
"""Storage protocol + LocalStorage impl for the training-data pipeline.
The protocol is the seam between local-dev (filesystem, gitignored ./data/) and a
future S3-backed implementation. Downstream stages depend only on the protocol so
the swap is a constructor change, not a callsite rewrite.
"""
from collections.abc import Iterator
from pathlib import Path
from typing import IO, Protocol
class Storage(Protocol):
def write_bytes(self, key: str, data: bytes) -> None: ...
def read_bytes(self, key: str) -> bytes: ...
def exists(self, key: str) -> bool: ...
def iter_keys(self, prefix: str = "") -> Iterator[str]: ...
def open_read(self, key: str) -> IO[bytes]: ...
class LocalStorage:
def __init__(self, root: Path) -> None:
self._root = root
def _path(self, key: str) -> Path:
return self._root / key
def write_bytes(self, key: str, data: bytes) -> None:
target = self._path(key)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_bytes(data)
def read_bytes(self, key: str) -> bytes:
return self._path(key).read_bytes()
def exists(self, key: str) -> bool:
return self._path(key).exists()
def open_read(self, key: str) -> IO[bytes]:
return self._path(key).open("rb")
def iter_keys(self, prefix: str = "") -> Iterator[str]:
if not self._root.exists():
return
for p in self._root.rglob("*"):
if p.is_file():
key = p.relative_to(self._root).as_posix()
if key.startswith(prefix):
yield key

View file

@ -1,189 +0,0 @@
"""Fit one LightGBM regressor per target; emit metrics + feature importance.
This is the final stage of the training-data pipeline. Inputs: a feature DataFrame
(produced by build_features + persisted by write_training_dataset) plus a list of
target columns to fit. Output: a metrics dict (MAPE + R^2 per target) and a
per-target JSON file of feature importances written via Storage.
The certificate_number column, if present, is dropped before fitting so it never
leaks into the model as a feature.
"""
import json
from typing import Any, Callable, Optional, cast
import lightgbm as lgb # type: ignore[import-untyped]
import numpy as np
import pandas as pd
import sklearn.metrics as _sk_metrics # type: ignore[import-untyped] # pyright: ignore[reportMissingTypeStubs]
import sklearn.model_selection as _sk_model_selection # type: ignore[import-untyped] # pyright: ignore[reportMissingTypeStubs]
mean_absolute_error: Any = _sk_metrics.mean_absolute_error # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
mean_absolute_percentage_error: Any = _sk_metrics.mean_absolute_percentage_error # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
mean_squared_error: Any = _sk_metrics.mean_squared_error # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
r2_score: Any = _sk_metrics.r2_score # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
train_test_split: Any = _sk_model_selection.train_test_split # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
from ml_training_data.storage import Storage
_CERT_NUM_COLUMN = "certificate_number"
# Per-target LightGBM objective overrides. Initially (slice 16g) we switched
# sap_score + peui_ucl to 'mape' to align objective with reporting metric;
# the 250k v16 ablation (slice 16h) showed 'mape' loses ~0.6 percentage
# points of global MAPE because it over-weights the low-SAP tail at the
# expense of the body. Reverted to the default 'regression' for all targets.
# Tail bias needs a different fix (sample weights / stratified loss) — slice 16i.
_OBJECTIVE_OVERRIDES: dict[str, str] = {}
SampleWeightFn = Callable[["pd.Series[Any]"], "pd.Series[Any]"]
# Default tail-bucket weight curve for slice 16i. Boundary 58 picked from
# slice 16h's per-decile residuals (decile 0 = SAP 1-58 carries 17% MAPE
# vs <5% in the body); 3x multiplier is the lightest weight that demonstrably
# reduces the +3.1 bias at decile 0 without inflating body MAPE. Configurable
# via sample_weight_fn for ablation runs.
_DEFAULT_LOW_SAP_THRESHOLD: float = 58.0
_DEFAULT_LOW_SAP_WEIGHT: float = 3.0
def low_sap_tail_weight(
y: "pd.Series[Any]",
threshold: float = _DEFAULT_LOW_SAP_THRESHOLD,
weight: float = _DEFAULT_LOW_SAP_WEIGHT,
) -> "pd.Series[Any]":
"""Return per-row weights: `weight` where y < threshold, 1.0 otherwise.
Use as `train_baseline(..., sample_weight_fn=low_sap_tail_weight)` to
apply the slice 16i tail strategy.
"""
arr = np.asarray(y, dtype=float)
return pd.Series(np.where(arr < threshold, weight, 1.0), index=y.index)
def train_baseline(
df: pd.DataFrame,
targets: list[str],
storage: Storage,
run_key: str,
*,
test_size: float = 0.2,
seed: int = 42,
n_estimators: int = 200,
sample_weight_fn: Optional[SampleWeightFn] = None,
) -> dict[str, dict[str, float]]:
feature_cols = [c for c in df.columns if c not in targets and c != _CERT_NUM_COLUMN]
# LightGBM needs numeric (or pd.Categorical) dtypes. Coerce object columns whose
# contents are numeric-or-None to numeric (None -> NaN). Pandas object columns
# that are *actually* string categoricals are left alone if coercion would
# destroy data — pd.Categorical features pass through LightGBM correctly.
for col in [*feature_cols, *targets]:
if df[col].dtype == "object":
df[col] = pd.to_numeric(df[col], errors="coerce")
metrics: dict[str, dict[str, float]] = {}
for target in targets:
# Drop rows where this target is null so LightGBM doesn't trip on label NaN.
target_df = df.dropna(subset=[target])
x = target_df[feature_cols]
y = target_df[target]
split = cast(
tuple[pd.DataFrame, pd.DataFrame, "pd.Series[Any]", "pd.Series[Any]"],
train_test_split(x, y, test_size=test_size, random_state=seed),
)
x_train, x_test, y_train, y_test = split
objective = _OBJECTIVE_OVERRIDES.get(target, "regression")
model: Any = lgb.LGBMRegressor(
n_estimators=n_estimators, random_state=seed, verbose=-1, objective=objective,
)
sample_weight = sample_weight_fn(y_train) if sample_weight_fn is not None else None
model.fit(x_train, y_train, sample_weight=sample_weight)
preds: np.ndarray[Any, Any] = np.asarray(model.predict(x_test))
metrics[target] = {
"mape": float(cast(float, mean_absolute_percentage_error(y_test, preds))),
"smape": _smape(y_test, preds),
"mae": float(cast(float, mean_absolute_error(y_test, preds))),
"rmse": float(np.sqrt(cast(float, mean_squared_error(y_test, preds)))),
"r2": float(cast(float, r2_score(y_test, preds))),
}
importance_arr = np.asarray(model.feature_importances_, dtype=float)
importance = {col: float(score) for col, score in zip(feature_cols, importance_arr)}
storage.write_bytes(
f"{run_key}importance_{target}.json",
json.dumps(importance, indent=2).encode("utf-8"),
)
residuals = _per_decile_residuals(np.asarray(y_test, dtype=float), preds)
storage.write_bytes(
f"{run_key}residuals_{target}.json",
json.dumps({"buckets": residuals}, indent=2).encode("utf-8"),
)
storage.write_bytes(
f"{run_key}metrics.json",
json.dumps(metrics, indent=2).encode("utf-8"),
)
return metrics
def _smape(y_true: Any, y_pred: Any) -> float:
"""Symmetric MAPE: mean(|y - yhat| / ((|y| + |yhat|) / 2)).
Bounded in [0, 2] (often reported as 0-200%). Stable when |y| is near zero,
so it's a better summary than MAPE for low-magnitude targets like
`hot_water_kwh` in well-insulated homes.
"""
y_t = np.asarray(y_true, dtype=float)
y_p = np.asarray(y_pred, dtype=float)
denom = (np.abs(y_t) + np.abs(y_p)) / 2.0
mask = denom > 0
if not mask.any():
return 0.0
return float(np.mean(np.abs(y_t[mask] - y_p[mask]) / denom[mask]))
def _per_decile_residuals(
y_true: np.ndarray[Any, Any], y_pred: np.ndarray[Any, Any]
) -> list[dict[str, float]]:
"""Bucket the test set by deciles of the true target value, then report
MAPE / MAE / mean residual / count per bucket.
Lets us tell whether errors concentrate in the tails of the true distribution
(e.g. SAP<40 / SAP>85) vs the mid-band which the global MAPE alone hides.
"""
order = np.argsort(y_true, kind="stable")
y_t = y_true[order]
y_p = y_pred[order]
n = len(y_t)
bucket_size = n // 10 # last bucket absorbs the remainder
buckets: list[dict[str, float]] = []
for i in range(10):
start = i * bucket_size
stop = n if i == 9 else (i + 1) * bucket_size
slice_t = y_t[start:stop]
slice_p = y_p[start:stop]
count = len(slice_t)
if count == 0:
continue
abs_err = np.abs(slice_t - slice_p)
mae = float(np.mean(abs_err))
mean_residual = float(np.mean(slice_p - slice_t))
mape_mask = slice_t != 0
mape = float(np.mean(abs_err[mape_mask] / np.abs(slice_t[mape_mask]))) if mape_mask.any() else 0.0
buckets.append(
{
"decile": float(i),
"true_min": float(slice_t[0]),
"true_max": float(slice_t[-1]),
"count": float(count),
"mape": mape,
"mae": mae,
"mean_residual": mean_residual,
}
)
return buckets

View file

@ -1,54 +0,0 @@
"""Persist a training-feature DataFrame as parquet + schema.json + manifest.json.
The output triple is the artefact contract this pipeline hands to downstream model
training: parquet for the data, schema.json for dtype intent, manifest.json for
run provenance. Writes go through Storage so the same code lands on local-fs or S3
without a callsite change.
"""
import io
import json
from datetime import datetime, timezone
from typing import Any, Optional
import pandas as pd
from domain.ml.schema import ColumnSpec, TransformSchema
from ml_training_data.storage import Storage
def write_training_dataset(
df: pd.DataFrame,
storage: Storage,
run_key: str,
*,
schema: TransformSchema,
source_info: Optional[dict[str, Any]] = None,
) -> None:
buf = io.BytesIO()
df.to_parquet(buf, engine="pyarrow", index=False)
storage.write_bytes(f"{run_key}data.parquet", buf.getvalue())
schema_doc = {
"transform_version": schema.transform_version,
"features": {name: _column_to_json(spec) for name, spec in schema.feature_columns.items()},
"targets": {name: _column_to_json(spec) for name, spec in schema.target_columns.items()},
}
storage.write_bytes(f"{run_key}schema.json", json.dumps(schema_doc, indent=2).encode("utf-8"))
manifest = {
"transform_version": schema.transform_version,
"row_count": len(df),
"written_at": datetime.now(timezone.utc).isoformat(),
"source_info": source_info or {},
}
storage.write_bytes(f"{run_key}manifest.json", json.dumps(manifest, indent=2).encode("utf-8"))
def _column_to_json(spec: ColumnSpec) -> dict[str, object]:
return {
"dtype": spec.dtype.__name__,
"nullable": spec.nullable,
"categorical": spec.categorical,
"description": spec.description,
}

View file

@ -1,110 +0,0 @@
"""Tests for build_features() — wrapper record → EpcPropertyData → feature row.
build_features wires three existing pieces: BulkZipReader yields wrapper records
out of the bulk ZIP, EpcPropertyDataMapper.from_api_response parses the
JSON-encoded `document` payload into EpcPropertyData, and EpcMlTransform.to_rows
produces the feature+target DataFrame. The function adds a leading
`certificate_number` column so each row stays traceable to its source cert.
"""
import io
import json
import zipfile
from pathlib import Path
import pytest
from ml_training_data.bulk_zip_reader import BulkZipReader
from ml_training_data.build_features import build_features
from ml_training_data.storage import LocalStorage
_FIXTURE_PATH = Path("datatypes/epc/schema/tests/fixtures/21_0_0.json")
def _wrapper(cert_number: str, *, assessment_type: str = "RdSAP", document: str | None = None) -> dict[str, object]:
if document is None:
document = _FIXTURE_PATH.read_text()
return {
"certificate_number": cert_number,
"assessment_type": assessment_type,
"document": document,
}
def _write_zip(storage: LocalStorage, key: str, entries: dict[str, list[dict[str, object]]]) -> None:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for entry_name, records in entries.items():
ndjson = "\n".join(json.dumps(r) for r in records)
zf.writestr(entry_name, ndjson)
storage.write_bytes(key, buf.getvalue())
def test_build_features_returns_one_row_per_supported_cert(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
record = _wrapper("CERT-001")
_write_zip(storage, "bulk.zip", {"certificates-2024.json": [record]})
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")
# Act
df = build_features(reader, certificate_numbers={"CERT-001"})
# Assert
assert len(df) == 1
assert df.iloc[0]["certificate_number"] == "CERT-001"
assert "certificate_number" == df.columns[0]
def test_build_features_skips_non_rdsap_assessment_types(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
rdsap = _wrapper("OK-1", assessment_type="RdSAP")
sap = _wrapper("SAP-1", assessment_type="SAP")
_write_zip(storage, "bulk.zip", {"certificates-2024.json": [rdsap, sap]})
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")
# Act
df = build_features(reader, certificate_numbers={"OK-1", "SAP-1"})
# Assert
assert df["certificate_number"].tolist() == ["OK-1"]
def test_build_features_skips_unsupported_schemas_by_default(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
bad_doc = json.loads(_FIXTURE_PATH.read_text())
bad_doc["schema_type"] = "RdSAP-Schema-19.0.0" # not in mapper dispatch
supported = _wrapper("OK-1")
unsupported = _wrapper("BAD-1", document=json.dumps(bad_doc))
_write_zip(
storage,
"bulk.zip",
{"certificates-2024.json": [supported, unsupported]},
)
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")
# Act
df = build_features(reader, certificate_numbers={"OK-1", "BAD-1"})
# Assert
assert df["certificate_number"].tolist() == ["OK-1"]
def test_build_features_raises_when_skip_unsupported_schemas_is_false(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
bad_doc = json.loads(_FIXTURE_PATH.read_text())
bad_doc["schema_type"] = "RdSAP-Schema-19.0.0"
unsupported = _wrapper("BAD-1", document=json.dumps(bad_doc))
_write_zip(storage, "bulk.zip", {"certificates-2024.json": [unsupported]})
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")
# Act / Assert
with pytest.raises(ValueError, match="Unsupported EPC schema"):
build_features(
reader,
certificate_numbers={"BAD-1"},
skip_unsupported_schemas=False,
)

View file

@ -1,91 +0,0 @@
"""Tests for BulkZipReader — stream EPC certificate wrappers from the gov bulk JSON ZIP.
The real bulk ZIP holds one entry per inspection year. Each entry is a sequence of
concatenated JSON objects (NDJSON-shaped, no enclosing array). Tests use small
synthetic ZIPs of the same shape.
"""
import io
import json
import zipfile
from pathlib import Path
import pytest
from ml_training_data.bulk_zip_reader import BulkZipReader
from ml_training_data.storage import LocalStorage
def _write_zip(
storage: LocalStorage, key: str, entries: dict[str, list[dict[str, object]]]
) -> None:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for entry_name, records in entries.items():
ndjson = "\n".join(json.dumps(r) for r in records)
zf.writestr(entry_name, ndjson)
storage.write_bytes(key, buf.getvalue())
def test_list_entries_returns_zip_member_names_in_archive_order(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
_write_zip(
storage,
"bulk.zip",
{"certificates-2024.json": [], "certificates-2025.json": []},
)
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")
# Act
entries = reader.list_entries()
# Assert
assert entries == ["certificates-2024.json", "certificates-2025.json"]
def test_iter_certificates_yields_every_cert_in_entry(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
certs = [
{"certificate_number": "CN-0001", "postcode": "AA1 1AA"},
{"certificate_number": "CN-0002", "postcode": "BB2 2BB"},
{"certificate_number": "CN-0003", "postcode": "CC3 3CC"},
]
_write_zip(storage, "bulk.zip", {"certificates-2025.json": certs})
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")
# Act
out = list(reader.iter_certificates("certificates-2025.json"))
# Assert
assert out == certs
def test_iter_certificates_filtered_yields_only_requested_across_entries(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
_write_zip(
storage,
"bulk.zip",
{
"certificates-2024.json": [
{"certificate_number": "A-1", "postcode": "AA1"},
{"certificate_number": "A-2", "postcode": "AA2"},
],
"certificates-2025.json": [
{"certificate_number": "B-1", "postcode": "BB1"},
{"certificate_number": "B-2", "postcode": "BB2"},
],
},
)
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")
# Act
out = sorted(
reader.iter_certificates_filtered({"A-1", "B-2", "MISSING-9"}),
key=lambda c: c["certificate_number"],
)
# Assert
assert [c["certificate_number"] for c in out] == ["A-1", "B-2"]

View file

@ -1,98 +0,0 @@
"""Tests for sample.sample() — random + filterable selection over the EPC flat-register CSV.
sample() is the entry point of the training-data pipeline: it produces a thin DataFrame
of certificate rows that downstream stages (fetch -> build_features -> write_parquet)
operate on. Filtering supports excluding obviously-wrong cohorts before paying the
per-cert fetch cost.
"""
from pathlib import Path
import pandas as pd
import pytest
from ml_training_data.sample import sample
def _write_csv(path: Path, rows: list[dict[str, str]]) -> None:
df = pd.DataFrame(rows)
df.to_csv(path, index=False)
def test_sample_returns_n_rows_when_no_filter(tmp_path: Path) -> None:
# Arrange
csv = tmp_path / "register.csv"
rows = [
{"certificate_number": f"CN-{i:04d}", "property_type": "House", "postcode": "AB1 2CD"}
for i in range(100)
]
_write_csv(csv, rows)
# Act
out = sample(csv_path=csv, n=10, seed=42)
# Assert
assert len(out) == 10
assert set(out.columns) == {"certificate_number", "property_type", "postcode"}
def test_sample_is_deterministic_with_same_seed(tmp_path: Path) -> None:
# Arrange
csv = tmp_path / "register.csv"
_write_csv(
csv,
[{"certificate_number": f"CN-{i:04d}", "property_type": "House"} for i in range(200)],
)
# Act
first = sample(csv_path=csv, n=20, seed=7)
second = sample(csv_path=csv, n=20, seed=7)
# Assert
assert first["certificate_number"].tolist() == second["certificate_number"].tolist()
def test_sample_filter_selects_only_matching_rows(tmp_path: Path) -> None:
# Arrange
csv = tmp_path / "register.csv"
rows: list[dict[str, str]] = []
for i in range(50):
rows.append({"certificate_number": f"H-{i:03d}", "property_type": "House"})
for i in range(50):
rows.append({"certificate_number": f"F-{i:03d}", "property_type": "Flat"})
_write_csv(csv, rows)
# Act
out = sample(csv_path=csv, n=30, seed=1, filters={"property_type": ["House"]})
# Assert
assert len(out) == 30
assert (out["property_type"] == "House").all()
def test_sample_returns_fewer_than_n_when_filter_too_strict(tmp_path: Path) -> None:
# Arrange
csv = tmp_path / "register.csv"
rows: list[dict[str, str]] = []
for i in range(3):
rows.append({"certificate_number": f"BG-{i}", "property_type": "Bungalow"})
for i in range(50):
rows.append({"certificate_number": f"H-{i:03d}", "property_type": "House"})
_write_csv(csv, rows)
# Act
out = sample(csv_path=csv, n=100, seed=1, filters={"property_type": ["Bungalow"]})
# Assert
assert len(out) == 3
assert (out["property_type"] == "Bungalow").all()
def test_sample_raises_when_filter_column_missing(tmp_path: Path) -> None:
# Arrange
csv = tmp_path / "register.csv"
_write_csv(csv, [{"certificate_number": "CN-001", "property_type": "House"}])
# Act / Assert
with pytest.raises(KeyError):
sample(csv_path=csv, n=1, seed=1, filters={"nonexistent": ["x"]})

View file

@ -1,50 +0,0 @@
"""Tests for sap_parity_probe — the per-cert SAP10 prediction pipeline
exercised by the corpus-wide parity probe.
P2.1 (ADR-0010): the probe now uses SAP 10.2 (14-03-2025) spec prices
exclusively. The extracted `predict_sap_for_cert` pure function exists
so the spec-prices path is unit-testable in isolation.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, cast
from domain.sap.rdsap.cert_to_inputs import SAP_10_2_SPEC_PRICES
from ml_training_data.sap_parity_probe import predict_sap_for_cert
_REPO_ROOT = Path(__file__).resolve().parents[4]
_GOLDEN_FIXTURES = (
_REPO_ROOT
/ "packages/domain/src/domain/sap/rdsap/tests/fixtures/golden"
)
def _load_cert_document(cert_number: str) -> dict[str, Any]:
path = _GOLDEN_FIXTURES / f"{cert_number}.json"
return cast(dict[str, Any], json.loads(path.read_text()))
def test_predict_sap_for_cert_returns_spec_prices_score_for_6035_7729() -> None:
# Arrange
# Cert 6035-7729-2309-0879-2296: mid-terrace, age band A, gas combi
# boiler code 104, TFA 128 m². One of the previously-retired golden
# fixtures; the cert JSON is preserved on disk as reference data
# (ADR-0010 §10), the test against it is new and pins the
# spec-prices SAP score (not the cert SAP).
cert_document = _load_cert_document("6035-7729-2309-0879-2296")
# Act
score = predict_sap_for_cert(cert_document, prices=SAP_10_2_SPEC_PRICES)
# Assert
# Pinned in GREEN of P2.1. If this drifts, either the calculator's
# spec-correct output for this fixture has genuinely moved (a real
# behavioural change worth investigating) or the SAP 10.2 spec
# prices in table_12.py have changed.
assert score == _EXPECTED_SCORE_6035_7729
_EXPECTED_SCORE_6035_7729: int = 67

View file

@ -1,90 +0,0 @@
"""Tests for LocalStorage — fs-backed Storage protocol for the training pipeline.
Storage is the swap-point between local-dev (LocalStorage rooted at ./data/) and the
eventual S3-backed impl. Downstream stages (bulk_fetch, write_parquet) talk to the
Storage protocol only, not Path.
"""
from pathlib import Path
import pytest
from ml_training_data.storage import LocalStorage
def test_write_bytes_then_read_bytes_returns_same_data(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
payload = b"hello world"
# Act
storage.write_bytes("greetings/hello.txt", payload)
out = storage.read_bytes("greetings/hello.txt")
# Assert
assert out == payload
def test_exists_is_false_before_write_and_true_after(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
# Act
before = storage.exists("a/b.bin")
storage.write_bytes("a/b.bin", b"x")
after = storage.exists("a/b.bin")
# Assert
assert before is False
assert after is True
def test_iter_keys_yields_every_written_key(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
storage.write_bytes("certs/a.json", b"1")
storage.write_bytes("certs/b.json", b"2")
storage.write_bytes("manifest.json", b"3")
# Act
keys = sorted(storage.iter_keys())
# Assert
assert keys == ["certs/a.json", "certs/b.json", "manifest.json"]
def test_iter_keys_filters_by_prefix(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
storage.write_bytes("certs/a.json", b"1")
storage.write_bytes("certs/b.json", b"2")
storage.write_bytes("manifest.json", b"3")
# Act
keys = sorted(storage.iter_keys(prefix="certs/"))
# Assert
assert keys == ["certs/a.json", "certs/b.json"]
def test_read_bytes_raises_filenotfound_for_missing_key(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
# Act / Assert
with pytest.raises(FileNotFoundError):
storage.read_bytes("nope.bin")
def test_open_read_returns_seekable_binary_stream(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
storage.write_bytes("big.bin", b"abcdefghij")
# Act
with storage.open_read("big.bin") as f:
f.seek(4)
chunk = f.read(3)
# Assert
assert chunk == b"efg"

View file

@ -1,235 +0,0 @@
"""Tests for train_baseline() — fits one LightGBM regressor per target.
train_baseline produces the baseline metrics (MAPE + R^2) and dumps per-target
feature-importance JSON to storage. This is the only stage that pulls in
LightGBM + sklearn; downstream training repos read the metrics + parquet only.
"""
import json
from pathlib import Path
import numpy as np
import pandas as pd
from ml_training_data.storage import LocalStorage
from ml_training_data.train_baseline import train_baseline
def _synthetic_dataset(n: int = 200, seed: int = 0) -> pd.DataFrame:
rng = np.random.default_rng(seed)
floor_area = rng.uniform(40, 200, size=n)
walls = rng.integers(1, 5, size=n)
# sap_score correlates with floor_area + walls, plus noise.
sap_score = (100 - 0.2 * floor_area + 3 * walls + rng.normal(0, 2, size=n)).astype(int)
return pd.DataFrame(
{
"certificate_number": [f"CN-{i:04d}" for i in range(n)],
"total_floor_area_m2": floor_area,
"wall_count": walls,
"sap_score": sap_score,
}
)
def test_train_baseline_returns_mape_and_r2_per_target(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
df = _synthetic_dataset()
# Act
metrics = train_baseline(
df=df,
targets=["sap_score"],
storage=storage,
run_key="runs/2026-05-16/",
seed=42,
)
# Assert
assert "sap_score" in metrics
assert "mape" in metrics["sap_score"]
assert "r2" in metrics["sap_score"]
assert metrics["sap_score"]["r2"] > 0.0 # learns something on a correlated signal
def test_low_sap_tail_weight_returns_3x_for_rows_below_58_else_1x() -> None:
# Arrange — exposed helper so callers wanting the default tail strategy
# can plug it straight into train_baseline. SAP-rating boundary 58 chosen
# from slice 16h's per-decile residuals: decile 0 (SAP 1-58) carries 17%
# MAPE; deciles 1-9 are all below 5%.
import pandas as pd # noqa: PLC0415
from ml_training_data.train_baseline import low_sap_tail_weight # noqa: PLC0415
# Act
weights = low_sap_tail_weight(pd.Series([20, 50, 58, 60, 90]))
# Assert
assert list(weights) == [3.0, 3.0, 1.0, 1.0, 1.0]
def test_train_baseline_accepts_sample_weight_fn_per_target(tmp_path: Path) -> None:
# Arrange — sample_weight_fn is a callable taking the training-label Series
# and returning a Series of weights the same length. When supplied, the
# weights flow into LGBMRegressor.fit's sample_weight argument and the
# model emphasizes the heavily-weighted rows. We verify the indirection
# works by training twice (no weights vs heavy-weighted tail) and
# confirming the predictions differ on the tail subset.
import numpy as np # noqa: PLC0415
import pandas as pd # noqa: PLC0415
storage = LocalStorage(root=tmp_path)
df = _synthetic_dataset(n=600, seed=0)
def weight_tail(y: "pd.Series[Any]") -> "pd.Series[Any]":
return pd.Series(np.where(np.asarray(y, dtype=float) < 60, 10.0, 1.0), index=y.index)
# Act
m_unweighted = train_baseline(
df=df.copy(), targets=["sap_score"], storage=storage,
run_key="runs/unw/", seed=42,
)
m_weighted = train_baseline(
df=df.copy(), targets=["sap_score"], storage=storage,
run_key="runs/w/", seed=42, sample_weight_fn=weight_tail,
)
# Assert — global MAE should differ between weighted and unweighted runs.
# (Direction depends on data; we just need to see that the weight reached LGBM.)
assert m_unweighted["sap_score"]["mae"] != m_weighted["sap_score"]["mae"]
def test_train_baseline_reports_mae_and_rmse_per_target(tmp_path: Path) -> None:
# Arrange — MAE gives users-facing "predicted SAP within N points" meaning;
# RMSE penalises large errors quadratically. Both should be reported next
# to MAPE so we can read the residual without inverting MAPE math by hand.
storage = LocalStorage(root=tmp_path)
df = _synthetic_dataset()
# Act
metrics = train_baseline(
df=df, targets=["sap_score"], storage=storage,
run_key="runs/2026-05-16/", seed=42,
)
# Assert
assert "mae" in metrics["sap_score"]
assert "rmse" in metrics["sap_score"]
assert metrics["sap_score"]["mae"] > 0
assert metrics["sap_score"]["rmse"] >= metrics["sap_score"]["mae"] # always true mathematically
def test_train_baseline_writes_feature_importance_per_target(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
df = _synthetic_dataset()
# Act
train_baseline(
df=df,
targets=["sap_score"],
storage=storage,
run_key="runs/2026-05-16/",
seed=42,
)
# Assert
importance = json.loads(storage.read_bytes("runs/2026-05-16/importance_sap_score.json"))
assert set(importance.keys()) == {"total_floor_area_m2", "wall_count"}
assert all(isinstance(v, (int, float)) for v in importance.values())
def test_train_baseline_handles_multiple_targets_independently(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
df = _synthetic_dataset()
df["co2_emissions"] = df["sap_score"] * 0.1 + 1.0 # second correlated target
# Act
metrics = train_baseline(
df=df,
targets=["sap_score", "co2_emissions"],
storage=storage,
run_key="runs/2026-05-16/",
seed=42,
)
# Assert
assert set(metrics.keys()) == {"sap_score", "co2_emissions"}
assert storage.exists("runs/2026-05-16/importance_sap_score.json")
assert storage.exists("runs/2026-05-16/importance_co2_emissions.json")
assert storage.exists("runs/2026-05-16/metrics.json")
def test_train_baseline_writes_per_decile_residuals_per_target(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
df = _synthetic_dataset(n=500)
# Act
train_baseline(
df=df,
targets=["sap_score"],
storage=storage,
run_key="runs/2026-05-16/",
seed=42,
)
# Assert
residuals = json.loads(storage.read_bytes("runs/2026-05-16/residuals_sap_score.json"))
assert "buckets" in residuals
assert len(residuals["buckets"]) == 10
expected_keys = {"decile", "true_min", "true_max", "count", "mape", "mae", "mean_residual"}
for bucket in residuals["buckets"]:
assert expected_keys <= set(bucket.keys())
# The 10 bucket counts sum to the test-set size (20% of df).
assert sum(b["count"] for b in residuals["buckets"]) == int(len(df) * 0.2)
# Buckets are ordered by true_min ascending.
true_mins = [b["true_min"] for b in residuals["buckets"]]
assert true_mins == sorted(true_mins)
def test_train_baseline_uses_default_regression_objective_per_slice_16h(tmp_path: Path) -> None:
# Arrange — slice 16g originally switched sap_score + peui_ucl to
# objective='mape'; slice 16h's 250k ablation showed that lost ~0.6 pts
# of global MAPE because mape over-weights the low-SAP tail. Reverted
# to default 'regression' for all targets; tail strategy moves to
# sample weights in slice 16i.
storage = LocalStorage(root=tmp_path)
df = _synthetic_dataset(n=300)
df["peui_ucl"] = df["sap_score"].astype(float) + 5.0
# Act
metrics = train_baseline(
df=df,
targets=["sap_score", "peui_ucl"],
storage=storage,
run_key="runs/2026-05-16/",
seed=42,
)
# Assert
assert "sap_score" in metrics
assert "peui_ucl" in metrics
from ml_training_data.train_baseline import _OBJECTIVE_OVERRIDES # noqa: PLC0415
assert _OBJECTIVE_OVERRIDES == {}
def test_train_baseline_residuals_emitted_per_target_independently(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
df = _synthetic_dataset(n=500)
df["co2_emissions"] = df["sap_score"] * 0.1 + 1.0
# Act
train_baseline(
df=df,
targets=["sap_score", "co2_emissions"],
storage=storage,
run_key="runs/2026-05-16/",
seed=42,
)
# Assert
assert storage.exists("runs/2026-05-16/residuals_sap_score.json")
assert storage.exists("runs/2026-05-16/residuals_co2_emissions.json")

View file

@ -1,98 +0,0 @@
"""Tests for write_training_dataset() — parquet + schema.json + manifest.json.
The output triple is the contract handed to the AutoGluon training repo:
- data.parquet: feature+target rows
- schema.json: column specs (categorical flags, target list) so the consumer
can reconstruct dtype intent without re-parsing the transform module.
- manifest.json: run metadata (when, transform version, source bulk-ZIP info).
"""
import json
from pathlib import Path
import pandas as pd
from domain.ml.schema import ColumnSpec, TransformSchema
from ml_training_data.storage import LocalStorage
from ml_training_data.write_parquet import write_training_dataset
def _toy_schema() -> "TransformSchema":
return TransformSchema(
transform_version="0.1.0",
feature_columns={
"total_floor_area_m2": ColumnSpec(
dtype=float, nullable=False, description="floor area"
),
"property_type": ColumnSpec(
dtype=str, nullable=True, description="cat", categorical=True
),
},
target_columns={
"sap_score": ColumnSpec(dtype=int, nullable=False, description="target"),
},
)
def test_write_training_dataset_persists_dataframe_to_parquet(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
df = pd.DataFrame(
{
"certificate_number": ["A-1", "A-2"],
"total_floor_area_m2": [80.0, 120.0],
"sap_score": [70, 85],
}
)
# Act
write_training_dataset(
df=df, storage=storage, run_key="runs/2026-05-16/", schema=_toy_schema()
)
# Assert
assert storage.exists("runs/2026-05-16/data.parquet")
roundtrip = pd.read_parquet(tmp_path / "runs/2026-05-16/data.parquet")
assert roundtrip["certificate_number"].tolist() == ["A-1", "A-2"]
assert roundtrip["sap_score"].tolist() == [70, 85]
def test_write_training_dataset_writes_schema_json_alongside_parquet(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
df = pd.DataFrame({"total_floor_area_m2": [80.0], "property_type": ["House"], "sap_score": [70]})
# Act
write_training_dataset(
df=df, storage=storage, run_key="runs/2026-05-16/", schema=_toy_schema()
)
# Assert
schema_doc = json.loads(storage.read_bytes("runs/2026-05-16/schema.json"))
assert schema_doc["transform_version"] == "0.1.0"
assert "total_floor_area_m2" in schema_doc["features"]
assert schema_doc["features"]["property_type"]["categorical"] is True
assert schema_doc["targets"]["sap_score"]["dtype"] == "int"
def test_write_training_dataset_writes_manifest_with_row_count_and_source(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
df = pd.DataFrame({"total_floor_area_m2": [80.0, 95.0, 110.0], "sap_score": [70, 75, 80]})
source_info = {"bulk_zip_last_updated": "2026-05-14T09:59:32Z", "bulk_zip_size_bytes": 15_642_371_075}
# Act
write_training_dataset(
df=df,
storage=storage,
run_key="runs/2026-05-16/",
schema=_toy_schema(),
source_info=source_info,
)
# Assert
manifest = json.loads(storage.read_bytes("runs/2026-05-16/manifest.json"))
assert manifest["row_count"] == 3
assert manifest["transform_version"] == "0.1.0"
assert manifest["source_info"] == source_info
assert "written_at" in manifest