Model/scripts/fetch_epc_bulk_sample.py
Khalim Conn-Kowlessar afabfa0147 feat(modelling): sample a year from the EPC bulk export, offline-ready
fetch_epc_bulk_sample streams certificates-<year>.json out of the bulk ZIP via
range requests, keeps the first N SAP-version matches, and writes each cert's
inner document to <out>/<cert>.json for run_property_report. Stops after N, so
only the member prefix transfers, not the 15.7 GB archive (RangeFile.bytes_read
reports the true transfer vs the absolute ZIP offset). Verified on 2026: 100
SAP-10.2 certs -> report ran 81 scorable (MAE 2.03), 46 flagged, 19 raises
(11 full-SAP schema 19.1.0, 7 unmapped floor_construction 0/3, 1 missing
post_town) — real shadow-validation signal vs the curated golden 57.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-04 12:20:57 +00:00

127 lines
4.7 KiB
Python

"""Sample a year of EPCs from the gov bulk export into a dump dir, offline-ready.
Streams `certificates-<year>.json` out of the live bulk ZIP via HTTP range
requests (see `harness.epc_bulk`), keeps the first N records matching the wanted
SAP version, and writes each cert's inner `document` to
`<out>/<certificate_number>.json` — exactly the shape
`scripts.run_property_report` (and `from_api_response`) reads. Because it stops
after N matches, only the compressed prefix of the year is downloaded, never the
15.7 GB archive.
# 100 SAP-10.2 certs from 2026 (guarantees SAP 10.2) into epc_dump/
python -m scripts.fetch_epc_bulk_sample --year 2026 --limit 100
python -m scripts.fetch_epc_bulk_sample --year 2026 --limit 250 --out epc_dump_2026
Reads the Bearer token from OPEN_EPC_API_TOKEN (backend/.env). Run from the
worktree root (import trap).
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import zipfile
from pathlib import Path
import httpx
from dotenv import load_dotenv
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap
from harness.epc_bulk import RangeFile, is_sap_version, parse_bulk_line # noqa: E402
_BULK_URL = (
"https://api.get-energy-performance-data.communities.gov.uk/api/files/domestic/json"
)
_DEFAULT_OUT = _REPO_ROOT / "epc_dump"
# Read the deflate stream in ~4 MB compressed chunks.
_CHUNK = 4 * 1024 * 1024
def _fresh_s3_object(token: str) -> tuple[str, int]:
"""Resolve the bulk endpoint's 302 to its temporary S3 URL and total size."""
redirect = httpx.get(
_BULK_URL,
headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
timeout=30,
follow_redirects=False,
)
if redirect.status_code != 302:
raise SystemExit(f"expected 302 from bulk endpoint, got {redirect.status_code}")
s3_url: str = redirect.headers["location"]
probe = httpx.get(s3_url, headers={"Range": "bytes=0-0"}, timeout=60)
total: int = int(probe.headers["content-range"].split("/")[-1])
return s3_url, total
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Sample a year from the EPC bulk export.")
parser.add_argument("--year", default="2026", help="certificate year (default 2026)")
parser.add_argument("--limit", type=int, default=100, help="certs to keep")
parser.add_argument("--sap-version", default="10.2", help="SAP version filter")
parser.add_argument("--out", type=Path, default=_DEFAULT_OUT, help="dump directory")
return parser.parse_args()
def main() -> int:
args = _parse_args()
load_dotenv(_REPO_ROOT / "backend" / ".env")
token = os.environ.get("OPEN_EPC_API_TOKEN")
if not token:
print("OPEN_EPC_API_TOKEN is not set (backend/.env) — cannot fetch")
return 2
out: Path = args.out
out.mkdir(parents=True, exist_ok=True)
member = f"certificates-{args.year}.json"
print(f"resolving bulk archive for {member} (SAP {args.sap_version}, first {args.limit})...")
s3_url, total = _fresh_s3_object(token)
print(f"archive {total / 1e9:.2f} GB — streaming {member} (range requests, early stop)\n")
written = 0
scanned = 0
skipped_version = 0
range_file = RangeFile(s3_url, total)
archive = zipfile.ZipFile(range_file)
buffer = ""
with archive.open(member) as stream:
while written < args.limit:
compressed = stream.read(_CHUNK)
if not compressed:
break
buffer += compressed.decode("utf-8", errors="replace")
lines = buffer.split("\n")
buffer = lines.pop() # keep the trailing partial line for the next chunk
for line in lines:
parsed = parse_bulk_line(line)
if parsed is None:
continue
scanned += 1
cert_number, document = parsed
if not is_sap_version(document, args.sap_version):
skipped_version += 1
continue
(out / f"{cert_number}.json").write_text(
json.dumps(document), encoding="utf-8"
)
written += 1
if written >= args.limit:
break
print(
f"wrote {written} certs ({scanned} scanned, {skipped_version} non-SAP-{args.sap_version}) "
f"-> {out.resolve()}\n"
f"transferred ~{range_file.bytes_read / 1e6:.0f} MB (early stop; full archive is {total / 1e9:.1f} GB)"
)
print(f"\nnow run: python -m scripts.run_property_report {out}")
range_file.close()
return 0
if __name__ == "__main__":
sys.exit(main())