diff --git a/services/ml_training_data/src/ml_training_data/build_features.py b/services/ml_training_data/src/ml_training_data/build_features.py index 98678c75..a5ce40f2 100644 --- a/services/ml_training_data/src/ml_training_data/build_features.py +++ b/services/ml_training_data/src/ml_training_data/build_features.py @@ -1,14 +1,17 @@ -"""JSON certs -> EpcPropertyData -> feature+target DataFrame. +"""Wrapper records -> EpcPropertyData -> feature+target DataFrame. -Wires three pieces from elsewhere in the repo: - - BulkZipReader streams raw cert dicts out of the gov bulk ZIP. - - EpcPropertyDataMapper.from_api_response parses each cert into EpcPropertyData. - - EpcMlTransform.to_rows produces the ML feature+target DataFrame. +Each wrapper record from the bulk ZIP carries the cert metadata plus a +JSON-encoded `document` payload. This function: -A leading `certificate_number` column is prepended so every row stays traceable -back to its source cert through the rest of the pipeline (parquet, training). + - Filters out non-RdSAP assessments (our mapper only handles RdSAP-Schema-21.x). + - Parses `document` and feeds it to EpcPropertyDataMapper.from_api_response. + - Calls EpcMlTransform.to_rows over the parsed properties. + - Prepends a `certificate_number` column so every row is traceable to its source. """ +import json +from typing import Any, cast + import pandas as pd from datatypes.epc.domain.mapper import EpcPropertyDataMapper @@ -16,6 +19,8 @@ from datatypes.epc.domain.epc_property_data import EpcPropertyData from domain.ml.transform import EpcMlTransform from ml_training_data.bulk_zip_reader import BulkZipReader +_RDSAP_ASSESSMENT_TYPE = "RdSAP" + def build_features( bulk_reader: BulkZipReader, @@ -26,15 +31,24 @@ def build_features( transform = EpcMlTransform() properties: list[EpcPropertyData] = [] cert_nums: list[str] = [] - for cert in bulk_reader.iter_certificates_filtered(certificate_numbers): + for record in bulk_reader.iter_certificates_filtered(certificate_numbers): + if record.get("assessment_type") != _RDSAP_ASSESSMENT_TYPE: + continue + document_field = record.get("document") + if isinstance(document_field, str): + document = cast(dict[str, Any], json.loads(document_field)) + elif isinstance(document_field, dict): + document = cast(dict[str, Any], document_field) + else: + continue try: - prop = EpcPropertyDataMapper.from_api_response(cert) + prop = EpcPropertyDataMapper.from_api_response(document) except ValueError: if skip_unsupported_schemas: continue raise properties.append(prop) - cert_nums.append(str(cert["certificate_number"])) + cert_nums.append(str(record["certificate_number"])) df = transform.to_rows(properties) df["certificate_number"] = cert_nums return df[["certificate_number", *[c for c in df.columns if c != "certificate_number"]]] diff --git a/services/ml_training_data/src/ml_training_data/bulk_zip_reader.py b/services/ml_training_data/src/ml_training_data/bulk_zip_reader.py index 267f77e5..d3e7d80d 100644 --- a/services/ml_training_data/src/ml_training_data/bulk_zip_reader.py +++ b/services/ml_training_data/src/ml_training_data/bulk_zip_reader.py @@ -1,9 +1,17 @@ -"""Stream EPC certificates from the gov bulk JSON ZIP. +"""Stream EPC certificate wrappers from the gov bulk JSON ZIP. -The bulk ZIP from /api/files/domestic/json is ZIP64, ~15 GB, with one JSON entry -per inspection year (certificates-YYYY.json). Each entry is a JSON array of -certificate dicts. The reader streams entries via ijson so 5GB+ yearly files -never have to fit in memory. +The bulk ZIP from /api/files/domestic/json contains one entry per inspection year +(certificates-YYYY.json). Each entry is a stream of concatenated JSON objects +(NDJSON-shaped, no enclosing array) where every wrapper has: + + - certificate_number: str + - assessment_type: "RdSAP" | "SAP" | ... + - document: str — a JSON-encoded payload with the full EPC schema document + - warehouse_created_at, updated_at: provenance timestamps + +The reader yields those wrappers as-is. Unwrapping `document` and filtering by +assessment_type belongs at the build_features layer, where the domain knowledge +of which assessment types our mapper supports already lives. """ import zipfile @@ -27,7 +35,7 @@ class BulkZipReader: def iter_certificates(self, entry: str) -> Iterator[dict[str, Any]]: with zipfile.ZipFile(self._storage.open_read(self._zip_key)) as zf: with zf.open(entry) as f: - for item in ijson.items(f, "item", use_float=True): + for item in ijson.items(f, "", use_float=True, multiple_values=True): yield cast(dict[str, Any], item) def iter_certificates_filtered( diff --git a/services/ml_training_data/src/ml_training_data/remote_bulk_fetcher.py b/services/ml_training_data/src/ml_training_data/remote_bulk_fetcher.py index afa16263..1a8fcb25 100644 --- a/services/ml_training_data/src/ml_training_data/remote_bulk_fetcher.py +++ b/services/ml_training_data/src/ml_training_data/remote_bulk_fetcher.py @@ -1,63 +1,68 @@ """Extract specific yearly entries from the gov bulk JSON ZIP without downloading the whole 15 GB archive. -The gov endpoint returns a 302 to a pre-signed S3 URL. remotezip uses HTTP Range -requests against that URL to read only the central directory + the bytes for the -requested entries, so disk usage stays at "size of the entries we actually want" -instead of the full archive. +The gov endpoint redirects to a pre-signed S3 URL; remotezip uses HTTP Range +requests so only the bytes for the requested entries traverse the wire. Each entry +is streamed in chunks and re-zipped into a *separate* local archive (one per year): -Entries are streamed via zipfile.ZipExtFile.read(chunk) so partial-network failures -during the multi-GB read don't waste the whole transfer, and so we never hold the -full entry in memory. + - Smaller disk footprint than the uncompressed JSON would need (5-8 GB per year). + - Per-year resumability: a partial-network failure on year N doesn't waste years + that already completed; rerun and the existing zips are skipped. + - BulkZipReader keeps working unchanged on each per-year local ZIP. """ +import zipfile from pathlib import Path -from tempfile import NamedTemporaryFile import httpx from remotezip import RemoteZip # type: ignore[import-untyped] # pyright: ignore[reportMissingTypeStubs] -from ml_training_data.storage import Storage - _BULK_JSON_URL = ( "https://api.get-energy-performance-data.communities.gov.uk/api/files/domestic/json" ) _READ_CHUNK_BYTES = 8 * 1024 * 1024 # 8 MB -def extract_entries( +def extract_entries_to_local_zips( auth_token: str, entry_names: list[str], - storage: Storage, - key_prefix: str, + output_dir: Path, ) -> dict[str, int]: - presigned_url = _resolve_presigned_url(auth_token) + output_dir.mkdir(parents=True, exist_ok=True) sizes: dict[str, int] = {} - with RemoteZip(presigned_url) as zf: # pyright: ignore[reportUnknownVariableType] - for entry in entry_names: - n_bytes = _stream_entry_to_storage(zf, entry, storage, f"{key_prefix}{entry}") - sizes[entry] = n_bytes + for entry in entry_names: + out_path = output_dir / f"{entry}.zip" + if out_path.exists(): + sizes[entry] = out_path.stat().st_size + continue + # The S3 presigned URL is short-lived (X-Amz-Expires=30s), so refresh it + # per entry — extracting one entry can easily take longer than the TTL. + presigned_url = _resolve_presigned_url(auth_token) + with RemoteZip(presigned_url) as in_zf: # pyright: ignore[reportUnknownVariableType] + _stream_entry_to_zip(in_zf, entry, out_path) + sizes[entry] = out_path.stat().st_size return sizes -def _stream_entry_to_storage( - zf: RemoteZip, # pyright: ignore[reportUnknownParameterType] +def _stream_entry_to_zip( + in_zf: RemoteZip, # pyright: ignore[reportUnknownParameterType] entry: str, - storage: Storage, - output_key: str, -) -> int: - with NamedTemporaryFile(delete=False) as tmp: - tmp_path = Path(tmp.name) - with zf.open(entry) as src: # pyright: ignore[reportUnknownMemberType,reportUnknownVariableType] - while True: - chunk: bytes = src.read(_READ_CHUNK_BYTES) # pyright: ignore[reportUnknownVariableType,reportUnknownMemberType] - if not chunk: - break - tmp.write(chunk) - total = tmp_path.stat().st_size - storage.write_bytes(output_key, tmp_path.read_bytes()) - tmp_path.unlink() - return total + out_path: Path, +) -> None: + tmp_path = out_path.with_suffix(out_path.suffix + ".part") + try: + with zipfile.ZipFile(tmp_path, "w", zipfile.ZIP_DEFLATED) as out_zf: + with in_zf.open(entry) as src, out_zf.open(entry, "w", force_zip64=True) as dst: # pyright: ignore[reportUnknownMemberType,reportUnknownVariableType] + while True: + chunk: bytes = src.read(_READ_CHUNK_BYTES) # pyright: ignore[reportUnknownVariableType,reportUnknownMemberType] + if not chunk: + break + dst.write(chunk) + tmp_path.rename(out_path) + except BaseException: + if tmp_path.exists(): + tmp_path.unlink() + raise def _resolve_presigned_url(auth_token: str) -> str: @@ -75,5 +80,3 @@ def _resolve_presigned_url(auth_token: str) -> str: if not location: raise RuntimeError("Bulk JSON 302 had no Location header") return location - - diff --git a/services/ml_training_data/tests/unit/test_build_features.py b/services/ml_training_data/tests/unit/test_build_features.py index 1806666b..50c28089 100644 --- a/services/ml_training_data/tests/unit/test_build_features.py +++ b/services/ml_training_data/tests/unit/test_build_features.py @@ -1,9 +1,10 @@ -"""Tests for build_features() — JSON cert → EpcPropertyData → feature row. +"""Tests for build_features() — wrapper record → EpcPropertyData → feature row. -build_features wires three existing pieces: BulkZipReader yields JSON cert dicts, -EpcPropertyDataMapper.from_api_response parses them into EpcPropertyData, and -EpcMlTransform.to_rows produces the feature+target DataFrame. The function adds a -leading `certificate_number` column so each row stays traceable to its source cert. +build_features wires three existing pieces: BulkZipReader yields wrapper records +out of the bulk ZIP, EpcPropertyDataMapper.from_api_response parses the +JSON-encoded `document` payload into EpcPropertyData, and EpcMlTransform.to_rows +produces the feature+target DataFrame. The function adds a leading +`certificate_number` column so each row stays traceable to its source cert. """ import io @@ -20,25 +21,30 @@ from ml_training_data.storage import LocalStorage _FIXTURE_PATH = Path("datatypes/epc/schema/tests/fixtures/21_0_0.json") -def _load_fixture_cert(cert_number: str) -> dict[str, object]: - cert = json.loads(_FIXTURE_PATH.read_text()) - cert["certificate_number"] = cert_number - return cert +def _wrapper(cert_number: str, *, assessment_type: str = "RdSAP", document: str | None = None) -> dict[str, object]: + if document is None: + document = _FIXTURE_PATH.read_text() + return { + "certificate_number": cert_number, + "assessment_type": assessment_type, + "document": document, + } def _write_zip(storage: LocalStorage, key: str, entries: dict[str, list[dict[str, object]]]) -> None: buf = io.BytesIO() with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: - for entry_name, payload in entries.items(): - zf.writestr(entry_name, json.dumps(payload)) + for entry_name, records in entries.items(): + ndjson = "\n".join(json.dumps(r) for r in records) + zf.writestr(entry_name, ndjson) storage.write_bytes(key, buf.getvalue()) def test_build_features_returns_one_row_per_supported_cert(tmp_path: Path) -> None: # Arrange storage = LocalStorage(root=tmp_path) - cert = _load_fixture_cert("CERT-001") - _write_zip(storage, "bulk.zip", {"certificates-2024.json": [cert]}) + record = _wrapper("CERT-001") + _write_zip(storage, "bulk.zip", {"certificates-2024.json": [record]}) reader = BulkZipReader(storage=storage, zip_key="bulk.zip") # Act @@ -50,12 +56,28 @@ def test_build_features_returns_one_row_per_supported_cert(tmp_path: Path) -> No assert "certificate_number" == df.columns[0] +def test_build_features_skips_non_rdsap_assessment_types(tmp_path: Path) -> None: + # Arrange + storage = LocalStorage(root=tmp_path) + rdsap = _wrapper("OK-1", assessment_type="RdSAP") + sap = _wrapper("SAP-1", assessment_type="SAP") + _write_zip(storage, "bulk.zip", {"certificates-2024.json": [rdsap, sap]}) + reader = BulkZipReader(storage=storage, zip_key="bulk.zip") + + # Act + df = build_features(reader, certificate_numbers={"OK-1", "SAP-1"}) + + # Assert + assert df["certificate_number"].tolist() == ["OK-1"] + + def test_build_features_skips_unsupported_schemas_by_default(tmp_path: Path) -> None: # Arrange storage = LocalStorage(root=tmp_path) - supported = _load_fixture_cert("OK-1") - unsupported = _load_fixture_cert("BAD-1") - unsupported["schema_type"] = "RdSAP-Schema-19.0.0" # not in mapper dispatch + bad_doc = json.loads(_FIXTURE_PATH.read_text()) + bad_doc["schema_type"] = "RdSAP-Schema-19.0.0" # not in mapper dispatch + supported = _wrapper("OK-1") + unsupported = _wrapper("BAD-1", document=json.dumps(bad_doc)) _write_zip( storage, "bulk.zip", @@ -73,8 +95,9 @@ def test_build_features_skips_unsupported_schemas_by_default(tmp_path: Path) -> def test_build_features_raises_when_skip_unsupported_schemas_is_false(tmp_path: Path) -> None: # Arrange storage = LocalStorage(root=tmp_path) - unsupported = _load_fixture_cert("BAD-1") - unsupported["schema_type"] = "RdSAP-Schema-19.0.0" + bad_doc = json.loads(_FIXTURE_PATH.read_text()) + bad_doc["schema_type"] = "RdSAP-Schema-19.0.0" + unsupported = _wrapper("BAD-1", document=json.dumps(bad_doc)) _write_zip(storage, "bulk.zip", {"certificates-2024.json": [unsupported]}) reader = BulkZipReader(storage=storage, zip_key="bulk.zip") diff --git a/services/ml_training_data/tests/unit/test_bulk_zip_reader.py b/services/ml_training_data/tests/unit/test_bulk_zip_reader.py index a7ac0409..9f43a6dd 100644 --- a/services/ml_training_data/tests/unit/test_bulk_zip_reader.py +++ b/services/ml_training_data/tests/unit/test_bulk_zip_reader.py @@ -1,15 +1,14 @@ -"""Tests for BulkZipReader — stream EPC certificates from the gov bulk JSON ZIP. +"""Tests for BulkZipReader — stream EPC certificate wrappers from the gov bulk JSON ZIP. -The real bulk ZIP is ZIP64, ~15 GB, with one JSON entry per inspection year -(certificates-YYYY.json). Each entry is a JSON array of certificate dicts. -We test against small synthetic ZIPs of the same shape. +The real bulk ZIP holds one entry per inspection year. Each entry is a sequence of +concatenated JSON objects (NDJSON-shaped, no enclosing array). Tests use small +synthetic ZIPs of the same shape. """ import io import json import zipfile from pathlib import Path -from typing import Any import pytest @@ -17,11 +16,14 @@ from ml_training_data.bulk_zip_reader import BulkZipReader from ml_training_data.storage import LocalStorage -def _write_zip(storage: LocalStorage, key: str, entries: dict[str, Any]) -> None: +def _write_zip( + storage: LocalStorage, key: str, entries: dict[str, list[dict[str, object]]] +) -> None: buf = io.BytesIO() with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: - for entry_name, payload in entries.items(): - zf.writestr(entry_name, json.dumps(payload)) + for entry_name, records in entries.items(): + ndjson = "\n".join(json.dumps(r) for r in records) + zf.writestr(entry_name, ndjson) storage.write_bytes(key, buf.getvalue())