slice 14h: handle real bulk-JSON shape (NDJSON wrappers + document payload)

Bulk entries are NDJSON of wrapper records, not a JSON array. Each wrapper
carries certificate_number, assessment_type, and a stringified document with
the actual EPC schema payload. Filter to RdSAP, unwrap document, then map.

remote_bulk_fetcher: per-entry presigned-URL refresh (30s S3 TTL).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-05-16 19:45:52 +00:00
parent 9eb70cede1
commit 611c07de94
5 changed files with 129 additions and 79 deletions

View file

@ -1,14 +1,17 @@
"""JSON certs -> EpcPropertyData -> feature+target DataFrame.
"""Wrapper records -> EpcPropertyData -> feature+target DataFrame.
Wires three pieces from elsewhere in the repo:
- BulkZipReader streams raw cert dicts out of the gov bulk ZIP.
- EpcPropertyDataMapper.from_api_response parses each cert into EpcPropertyData.
- EpcMlTransform.to_rows produces the ML feature+target DataFrame.
Each wrapper record from the bulk ZIP carries the cert metadata plus a
JSON-encoded `document` payload. This function:
A leading `certificate_number` column is prepended so every row stays traceable
back to its source cert through the rest of the pipeline (parquet, training).
- Filters out non-RdSAP assessments (our mapper only handles RdSAP-Schema-21.x).
- Parses `document` and feeds it to EpcPropertyDataMapper.from_api_response.
- Calls EpcMlTransform.to_rows over the parsed properties.
- Prepends a `certificate_number` column so every row is traceable to its source.
"""
import json
from typing import Any, cast
import pandas as pd
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
@ -16,6 +19,8 @@ from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.ml.transform import EpcMlTransform
from ml_training_data.bulk_zip_reader import BulkZipReader
_RDSAP_ASSESSMENT_TYPE = "RdSAP"
def build_features(
bulk_reader: BulkZipReader,
@ -26,15 +31,24 @@ def build_features(
transform = EpcMlTransform()
properties: list[EpcPropertyData] = []
cert_nums: list[str] = []
for cert in bulk_reader.iter_certificates_filtered(certificate_numbers):
for record in bulk_reader.iter_certificates_filtered(certificate_numbers):
if record.get("assessment_type") != _RDSAP_ASSESSMENT_TYPE:
continue
document_field = record.get("document")
if isinstance(document_field, str):
document = cast(dict[str, Any], json.loads(document_field))
elif isinstance(document_field, dict):
document = cast(dict[str, Any], document_field)
else:
continue
try:
prop = EpcPropertyDataMapper.from_api_response(cert)
prop = EpcPropertyDataMapper.from_api_response(document)
except ValueError:
if skip_unsupported_schemas:
continue
raise
properties.append(prop)
cert_nums.append(str(cert["certificate_number"]))
cert_nums.append(str(record["certificate_number"]))
df = transform.to_rows(properties)
df["certificate_number"] = cert_nums
return df[["certificate_number", *[c for c in df.columns if c != "certificate_number"]]]

View file

@ -1,9 +1,17 @@
"""Stream EPC certificates from the gov bulk JSON ZIP.
"""Stream EPC certificate wrappers from the gov bulk JSON ZIP.
The bulk ZIP from /api/files/domestic/json is ZIP64, ~15 GB, with one JSON entry
per inspection year (certificates-YYYY.json). Each entry is a JSON array of
certificate dicts. The reader streams entries via ijson so 5GB+ yearly files
never have to fit in memory.
The bulk ZIP from /api/files/domestic/json contains one entry per inspection year
(certificates-YYYY.json). Each entry is a stream of concatenated JSON objects
(NDJSON-shaped, no enclosing array) where every wrapper has:
- certificate_number: str
- assessment_type: "RdSAP" | "SAP" | ...
- document: str a JSON-encoded payload with the full EPC schema document
- warehouse_created_at, updated_at: provenance timestamps
The reader yields those wrappers as-is. Unwrapping `document` and filtering by
assessment_type belongs at the build_features layer, where the domain knowledge
of which assessment types our mapper supports already lives.
"""
import zipfile
@ -27,7 +35,7 @@ class BulkZipReader:
def iter_certificates(self, entry: str) -> Iterator[dict[str, Any]]:
with zipfile.ZipFile(self._storage.open_read(self._zip_key)) as zf:
with zf.open(entry) as f:
for item in ijson.items(f, "item", use_float=True):
for item in ijson.items(f, "", use_float=True, multiple_values=True):
yield cast(dict[str, Any], item)
def iter_certificates_filtered(

View file

@ -1,63 +1,68 @@
"""Extract specific yearly entries from the gov bulk JSON ZIP without downloading
the whole 15 GB archive.
The gov endpoint returns a 302 to a pre-signed S3 URL. remotezip uses HTTP Range
requests against that URL to read only the central directory + the bytes for the
requested entries, so disk usage stays at "size of the entries we actually want"
instead of the full archive.
The gov endpoint redirects to a pre-signed S3 URL; remotezip uses HTTP Range
requests so only the bytes for the requested entries traverse the wire. Each entry
is streamed in chunks and re-zipped into a *separate* local archive (one per year):
Entries are streamed via zipfile.ZipExtFile.read(chunk) so partial-network failures
during the multi-GB read don't waste the whole transfer, and so we never hold the
full entry in memory.
- Smaller disk footprint than the uncompressed JSON would need (5-8 GB per year).
- Per-year resumability: a partial-network failure on year N doesn't waste years
that already completed; rerun and the existing zips are skipped.
- BulkZipReader keeps working unchanged on each per-year local ZIP.
"""
import zipfile
from pathlib import Path
from tempfile import NamedTemporaryFile
import httpx
from remotezip import RemoteZip # type: ignore[import-untyped] # pyright: ignore[reportMissingTypeStubs]
from ml_training_data.storage import Storage
_BULK_JSON_URL = (
"https://api.get-energy-performance-data.communities.gov.uk/api/files/domestic/json"
)
_READ_CHUNK_BYTES = 8 * 1024 * 1024 # 8 MB
def extract_entries(
def extract_entries_to_local_zips(
auth_token: str,
entry_names: list[str],
storage: Storage,
key_prefix: str,
output_dir: Path,
) -> dict[str, int]:
presigned_url = _resolve_presigned_url(auth_token)
output_dir.mkdir(parents=True, exist_ok=True)
sizes: dict[str, int] = {}
with RemoteZip(presigned_url) as zf: # pyright: ignore[reportUnknownVariableType]
for entry in entry_names:
n_bytes = _stream_entry_to_storage(zf, entry, storage, f"{key_prefix}{entry}")
sizes[entry] = n_bytes
for entry in entry_names:
out_path = output_dir / f"{entry}.zip"
if out_path.exists():
sizes[entry] = out_path.stat().st_size
continue
# The S3 presigned URL is short-lived (X-Amz-Expires=30s), so refresh it
# per entry — extracting one entry can easily take longer than the TTL.
presigned_url = _resolve_presigned_url(auth_token)
with RemoteZip(presigned_url) as in_zf: # pyright: ignore[reportUnknownVariableType]
_stream_entry_to_zip(in_zf, entry, out_path)
sizes[entry] = out_path.stat().st_size
return sizes
def _stream_entry_to_storage(
zf: RemoteZip, # pyright: ignore[reportUnknownParameterType]
def _stream_entry_to_zip(
in_zf: RemoteZip, # pyright: ignore[reportUnknownParameterType]
entry: str,
storage: Storage,
output_key: str,
) -> int:
with NamedTemporaryFile(delete=False) as tmp:
tmp_path = Path(tmp.name)
with zf.open(entry) as src: # pyright: ignore[reportUnknownMemberType,reportUnknownVariableType]
while True:
chunk: bytes = src.read(_READ_CHUNK_BYTES) # pyright: ignore[reportUnknownVariableType,reportUnknownMemberType]
if not chunk:
break
tmp.write(chunk)
total = tmp_path.stat().st_size
storage.write_bytes(output_key, tmp_path.read_bytes())
tmp_path.unlink()
return total
out_path: Path,
) -> None:
tmp_path = out_path.with_suffix(out_path.suffix + ".part")
try:
with zipfile.ZipFile(tmp_path, "w", zipfile.ZIP_DEFLATED) as out_zf:
with in_zf.open(entry) as src, out_zf.open(entry, "w", force_zip64=True) as dst: # pyright: ignore[reportUnknownMemberType,reportUnknownVariableType]
while True:
chunk: bytes = src.read(_READ_CHUNK_BYTES) # pyright: ignore[reportUnknownVariableType,reportUnknownMemberType]
if not chunk:
break
dst.write(chunk)
tmp_path.rename(out_path)
except BaseException:
if tmp_path.exists():
tmp_path.unlink()
raise
def _resolve_presigned_url(auth_token: str) -> str:
@ -75,5 +80,3 @@ def _resolve_presigned_url(auth_token: str) -> str:
if not location:
raise RuntimeError("Bulk JSON 302 had no Location header")
return location

View file

@ -1,9 +1,10 @@
"""Tests for build_features() — JSON cert → EpcPropertyData → feature row.
"""Tests for build_features() — wrapper record → EpcPropertyData → feature row.
build_features wires three existing pieces: BulkZipReader yields JSON cert dicts,
EpcPropertyDataMapper.from_api_response parses them into EpcPropertyData, and
EpcMlTransform.to_rows produces the feature+target DataFrame. The function adds a
leading `certificate_number` column so each row stays traceable to its source cert.
build_features wires three existing pieces: BulkZipReader yields wrapper records
out of the bulk ZIP, EpcPropertyDataMapper.from_api_response parses the
JSON-encoded `document` payload into EpcPropertyData, and EpcMlTransform.to_rows
produces the feature+target DataFrame. The function adds a leading
`certificate_number` column so each row stays traceable to its source cert.
"""
import io
@ -20,25 +21,30 @@ from ml_training_data.storage import LocalStorage
_FIXTURE_PATH = Path("datatypes/epc/schema/tests/fixtures/21_0_0.json")
def _load_fixture_cert(cert_number: str) -> dict[str, object]:
cert = json.loads(_FIXTURE_PATH.read_text())
cert["certificate_number"] = cert_number
return cert
def _wrapper(cert_number: str, *, assessment_type: str = "RdSAP", document: str | None = None) -> dict[str, object]:
if document is None:
document = _FIXTURE_PATH.read_text()
return {
"certificate_number": cert_number,
"assessment_type": assessment_type,
"document": document,
}
def _write_zip(storage: LocalStorage, key: str, entries: dict[str, list[dict[str, object]]]) -> None:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for entry_name, payload in entries.items():
zf.writestr(entry_name, json.dumps(payload))
for entry_name, records in entries.items():
ndjson = "\n".join(json.dumps(r) for r in records)
zf.writestr(entry_name, ndjson)
storage.write_bytes(key, buf.getvalue())
def test_build_features_returns_one_row_per_supported_cert(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
cert = _load_fixture_cert("CERT-001")
_write_zip(storage, "bulk.zip", {"certificates-2024.json": [cert]})
record = _wrapper("CERT-001")
_write_zip(storage, "bulk.zip", {"certificates-2024.json": [record]})
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")
# Act
@ -50,12 +56,28 @@ def test_build_features_returns_one_row_per_supported_cert(tmp_path: Path) -> No
assert "certificate_number" == df.columns[0]
def test_build_features_skips_non_rdsap_assessment_types(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
rdsap = _wrapper("OK-1", assessment_type="RdSAP")
sap = _wrapper("SAP-1", assessment_type="SAP")
_write_zip(storage, "bulk.zip", {"certificates-2024.json": [rdsap, sap]})
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")
# Act
df = build_features(reader, certificate_numbers={"OK-1", "SAP-1"})
# Assert
assert df["certificate_number"].tolist() == ["OK-1"]
def test_build_features_skips_unsupported_schemas_by_default(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
supported = _load_fixture_cert("OK-1")
unsupported = _load_fixture_cert("BAD-1")
unsupported["schema_type"] = "RdSAP-Schema-19.0.0" # not in mapper dispatch
bad_doc = json.loads(_FIXTURE_PATH.read_text())
bad_doc["schema_type"] = "RdSAP-Schema-19.0.0" # not in mapper dispatch
supported = _wrapper("OK-1")
unsupported = _wrapper("BAD-1", document=json.dumps(bad_doc))
_write_zip(
storage,
"bulk.zip",
@ -73,8 +95,9 @@ def test_build_features_skips_unsupported_schemas_by_default(tmp_path: Path) ->
def test_build_features_raises_when_skip_unsupported_schemas_is_false(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
unsupported = _load_fixture_cert("BAD-1")
unsupported["schema_type"] = "RdSAP-Schema-19.0.0"
bad_doc = json.loads(_FIXTURE_PATH.read_text())
bad_doc["schema_type"] = "RdSAP-Schema-19.0.0"
unsupported = _wrapper("BAD-1", document=json.dumps(bad_doc))
_write_zip(storage, "bulk.zip", {"certificates-2024.json": [unsupported]})
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")

View file

@ -1,15 +1,14 @@
"""Tests for BulkZipReader — stream EPC certificates from the gov bulk JSON ZIP.
"""Tests for BulkZipReader — stream EPC certificate wrappers from the gov bulk JSON ZIP.
The real bulk ZIP is ZIP64, ~15 GB, with one JSON entry per inspection year
(certificates-YYYY.json). Each entry is a JSON array of certificate dicts.
We test against small synthetic ZIPs of the same shape.
The real bulk ZIP holds one entry per inspection year. Each entry is a sequence of
concatenated JSON objects (NDJSON-shaped, no enclosing array). Tests use small
synthetic ZIPs of the same shape.
"""
import io
import json
import zipfile
from pathlib import Path
from typing import Any
import pytest
@ -17,11 +16,14 @@ from ml_training_data.bulk_zip_reader import BulkZipReader
from ml_training_data.storage import LocalStorage
def _write_zip(storage: LocalStorage, key: str, entries: dict[str, Any]) -> None:
def _write_zip(
storage: LocalStorage, key: str, entries: dict[str, list[dict[str, object]]]
) -> None:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for entry_name, payload in entries.items():
zf.writestr(entry_name, json.dumps(payload))
for entry_name, records in entries.items():
ndjson = "\n".join(json.dumps(r) for r in records)
zf.writestr(entry_name, ndjson)
storage.write_bytes(key, buf.getvalue())