feat(modelling): read the gov EPC bulk export via HTTP range requests

The bulk endpoint 302-redirects to a 15.7 GB S3 ZIP with one NDJSON member per
year; each line wraps the per-cert payload in a stringified 'document' that
parses to the same RdSAP-Schema-21.0.1 shape from_api_response already handles.
parse_bulk_line unwraps a record; is_sap_version filters to SAP 10.2; RangeFile
exposes the S3 object as a seekable file so zipfile streams a single year's
member (and a sampler stops early) without downloading the whole archive.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-04 12:16:18 +00:00
parent ea3af8d2f4
commit cf8e5b9ec6
2 changed files with 135 additions and 0 deletions

92
harness/epc_bulk.py Normal file
View file

@ -0,0 +1,92 @@
"""Read the gov EPC **bulk** export without downloading the 15.7 GB archive.
The live API's bulk endpoint (`/api/files/domestic/json`) 302-redirects to a
temporary S3 ZIP holding one NDJSON member per year (`certificates-<year>.json`,
e.g. 2026 is ~559 MB compressed / ~7.6 GB uncompressed). Each NDJSON line is a
warehouse record whose per-cert payload is a *stringified* `document` field; the
parsed document is the same shape `EpcPropertyDataMapper.from_api_response`
already handles (`RdSAP-Schema-21.0.1`, `sap_building_parts`,
`energy_rating_current`, ...).
`RangeFile` exposes the S3 object as a seekable file backed by HTTP range
requests, so `zipfile` reads the central directory and streams a single member's
deflate stream and a sampler can stop early after N records, fetching only the
compressed prefix it needs. The line-level parsing is pure and unit-tested here;
the network wiring lives in `scripts/fetch_epc_bulk_sample.py`.
"""
from __future__ import annotations
import io
import json
from typing import Any, Optional
import httpx
def parse_bulk_line(line: str) -> Optional[tuple[str, dict[str, Any]]]:
"""Parse one NDJSON bulk record into `(certificate_number, document)`,
unwrapping the stringified `document`. Blank lines return None."""
stripped: str = line.strip()
if not stripped:
return None
record: dict[str, Any] = json.loads(stripped)
raw_document: Any = record["document"]
document: dict[str, Any] = (
json.loads(raw_document) if isinstance(raw_document, str) else raw_document
)
return record["certificate_number"], document
def is_sap_version(document: dict[str, Any], wanted: str) -> bool:
"""True when the document's `sap_version` equals `wanted` (the export carries
it as a number, so compare on the string form)."""
version: Any = document.get("sap_version")
return version is not None and str(version) == wanted
class RangeFile(io.RawIOBase):
"""A seekable read-only file over an HTTP object that supports byte ranges
(an S3 presigned URL). Each `read` issues a `Range` GET, so `zipfile` can
parse the central directory and stream one member without downloading the
whole archive."""
def __init__(self, url: str, size: int) -> None:
self._url = url
self._size = size
self._pos = 0
self._client = httpx.Client(timeout=120)
def seekable(self) -> bool:
return True
def readable(self) -> bool:
return True
def tell(self) -> int:
return self._pos
def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
if whence == io.SEEK_SET:
self._pos = offset
elif whence == io.SEEK_CUR:
self._pos += offset
elif whence == io.SEEK_END:
self._pos = self._size + offset
return self._pos
def read(self, size: Optional[int] = -1) -> bytes:
if size is None or size < 0:
size = self._size - self._pos
if size == 0 or self._pos >= self._size:
return b""
end: int = min(self._pos + size, self._size) - 1
resp = self._client.get(self._url, headers={"Range": f"bytes={self._pos}-{end}"})
resp.raise_for_status()
data: bytes = resp.content
self._pos += len(data)
return data
def close(self) -> None:
self._client.close()
super().close()

View file

@ -0,0 +1,43 @@
"""Parse records from the gov EPC bulk export (NDJSON, stringified `document`)."""
from __future__ import annotations
import json
from harness.epc_bulk import is_sap_version, parse_bulk_line
def test_parse_bulk_line_unwraps_the_stringified_document() -> None:
# Arrange — a bulk record wraps the per-cert payload in a `document` string.
inner: dict[str, object] = {
"schema_type": "RdSAP-Schema-21.0.1",
"sap_version": 10.2,
"energy_rating_current": 71,
}
line: str = json.dumps(
{"certificate_number": "0000-1111-2222-3333-4444", "document": json.dumps(inner)}
)
# Act
parsed = parse_bulk_line(line)
# Assert — the cert number and the parsed inner document come back.
assert parsed is not None
cert_number, document = parsed
assert cert_number == "0000-1111-2222-3333-4444"
assert document["schema_type"] == "RdSAP-Schema-21.0.1"
assert document["energy_rating_current"] == 71
def test_parse_bulk_line_ignores_blank_lines() -> None:
# Arrange / Act / Assert — trailing/blank NDJSON lines are skipped.
assert parse_bulk_line("") is None
assert parse_bulk_line(" \n") is None
def test_is_sap_version_matches_regardless_of_numeric_or_string_form() -> None:
# Arrange / Act / Assert — the export carries sap_version as a number.
assert is_sap_version({"sap_version": 10.2}, "10.2") is True
assert is_sap_version({"sap_version": "10.2"}, "10.2") is True
assert is_sap_version({"sap_version": 10.1}, "10.2") is False
assert is_sap_version({}, "10.2") is False