From cf8e5b9ec63297961a142b51eb2b7b5397e833c5 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Thu, 4 Jun 2026 12:16:18 +0000 Subject: [PATCH] feat(modelling): read the gov EPC bulk export via HTTP range requests The bulk endpoint 302-redirects to a 15.7 GB S3 ZIP with one NDJSON member per year; each line wraps the per-cert payload in a stringified 'document' that parses to the same RdSAP-Schema-21.0.1 shape from_api_response already handles. parse_bulk_line unwraps a record; is_sap_version filters to SAP 10.2; RangeFile exposes the S3 object as a seekable file so zipfile streams a single year's member (and a sampler stops early) without downloading the whole archive. Co-Authored-By: Claude Opus 4.8 --- harness/epc_bulk.py | 92 ++++++++++++++++++++++++++++++++++ tests/harness/test_epc_bulk.py | 43 ++++++++++++++++ 2 files changed, 135 insertions(+) create mode 100644 harness/epc_bulk.py create mode 100644 tests/harness/test_epc_bulk.py diff --git a/harness/epc_bulk.py b/harness/epc_bulk.py new file mode 100644 index 00000000..84ccc313 --- /dev/null +++ b/harness/epc_bulk.py @@ -0,0 +1,92 @@ +"""Read the gov EPC **bulk** export without downloading the 15.7 GB archive. + +The live API's bulk endpoint (`/api/files/domestic/json`) 302-redirects to a +temporary S3 ZIP holding one NDJSON member per year (`certificates-.json`, +e.g. 2026 is ~559 MB compressed / ~7.6 GB uncompressed). Each NDJSON line is a +warehouse record whose per-cert payload is a *stringified* `document` field; the +parsed document is the same shape `EpcPropertyDataMapper.from_api_response` +already handles (`RdSAP-Schema-21.0.1`, `sap_building_parts`, +`energy_rating_current`, ...). + +`RangeFile` exposes the S3 object as a seekable file backed by HTTP range +requests, so `zipfile` reads the central directory and streams a single member's +deflate stream — and a sampler can stop early after N records, fetching only the +compressed prefix it needs. The line-level parsing is pure and unit-tested here; +the network wiring lives in `scripts/fetch_epc_bulk_sample.py`. +""" + +from __future__ import annotations + +import io +import json +from typing import Any, Optional + +import httpx + + +def parse_bulk_line(line: str) -> Optional[tuple[str, dict[str, Any]]]: + """Parse one NDJSON bulk record into `(certificate_number, document)`, + unwrapping the stringified `document`. Blank lines return None.""" + stripped: str = line.strip() + if not stripped: + return None + record: dict[str, Any] = json.loads(stripped) + raw_document: Any = record["document"] + document: dict[str, Any] = ( + json.loads(raw_document) if isinstance(raw_document, str) else raw_document + ) + return record["certificate_number"], document + + +def is_sap_version(document: dict[str, Any], wanted: str) -> bool: + """True when the document's `sap_version` equals `wanted` (the export carries + it as a number, so compare on the string form).""" + version: Any = document.get("sap_version") + return version is not None and str(version) == wanted + + +class RangeFile(io.RawIOBase): + """A seekable read-only file over an HTTP object that supports byte ranges + (an S3 presigned URL). Each `read` issues a `Range` GET, so `zipfile` can + parse the central directory and stream one member without downloading the + whole archive.""" + + def __init__(self, url: str, size: int) -> None: + self._url = url + self._size = size + self._pos = 0 + self._client = httpx.Client(timeout=120) + + def seekable(self) -> bool: + return True + + def readable(self) -> bool: + return True + + def tell(self) -> int: + return self._pos + + def seek(self, offset: int, whence: int = io.SEEK_SET) -> int: + if whence == io.SEEK_SET: + self._pos = offset + elif whence == io.SEEK_CUR: + self._pos += offset + elif whence == io.SEEK_END: + self._pos = self._size + offset + return self._pos + + def read(self, size: Optional[int] = -1) -> bytes: + if size is None or size < 0: + size = self._size - self._pos + if size == 0 or self._pos >= self._size: + return b"" + end: int = min(self._pos + size, self._size) - 1 + resp = self._client.get(self._url, headers={"Range": f"bytes={self._pos}-{end}"}) + resp.raise_for_status() + data: bytes = resp.content + self._pos += len(data) + return data + + def close(self) -> None: + self._client.close() + super().close() diff --git a/tests/harness/test_epc_bulk.py b/tests/harness/test_epc_bulk.py new file mode 100644 index 00000000..f4d71cb6 --- /dev/null +++ b/tests/harness/test_epc_bulk.py @@ -0,0 +1,43 @@ +"""Parse records from the gov EPC bulk export (NDJSON, stringified `document`).""" + +from __future__ import annotations + +import json + +from harness.epc_bulk import is_sap_version, parse_bulk_line + + +def test_parse_bulk_line_unwraps_the_stringified_document() -> None: + # Arrange — a bulk record wraps the per-cert payload in a `document` string. + inner: dict[str, object] = { + "schema_type": "RdSAP-Schema-21.0.1", + "sap_version": 10.2, + "energy_rating_current": 71, + } + line: str = json.dumps( + {"certificate_number": "0000-1111-2222-3333-4444", "document": json.dumps(inner)} + ) + + # Act + parsed = parse_bulk_line(line) + + # Assert — the cert number and the parsed inner document come back. + assert parsed is not None + cert_number, document = parsed + assert cert_number == "0000-1111-2222-3333-4444" + assert document["schema_type"] == "RdSAP-Schema-21.0.1" + assert document["energy_rating_current"] == 71 + + +def test_parse_bulk_line_ignores_blank_lines() -> None: + # Arrange / Act / Assert — trailing/blank NDJSON lines are skipped. + assert parse_bulk_line("") is None + assert parse_bulk_line(" \n") is None + + +def test_is_sap_version_matches_regardless_of_numeric_or_string_form() -> None: + # Arrange / Act / Assert — the export carries sap_version as a number. + assert is_sap_version({"sap_version": 10.2}, "10.2") is True + assert is_sap_version({"sap_version": "10.2"}, "10.2") is True + assert is_sap_version({"sap_version": 10.1}, "10.2") is False + assert is_sap_version({}, "10.2") is False