"""Sample a year of EPCs from the gov bulk export into a dump dir, offline-ready. Streams `certificates-.json` out of the live bulk ZIP via HTTP range requests (see `harness.epc_bulk`), keeps the first N records matching the wanted SAP version, and writes each cert's inner `document` to `/.json` — exactly the shape `scripts.run_property_report` (and `from_api_response`) reads. Because it stops after N matches, only the compressed prefix of the year is downloaded, never the 15.7 GB archive. # 100 SAP-10.2 certs from 2026 (guarantees SAP 10.2) into epc_dump/ python -m scripts.fetch_epc_bulk_sample --year 2026 --limit 100 python -m scripts.fetch_epc_bulk_sample --year 2026 --limit 250 --out epc_dump_2026 Reads the Bearer token from OPEN_EPC_API_TOKEN (backend/.env). Run from the worktree root (import trap). """ from __future__ import annotations import argparse import json import os import sys import zipfile from pathlib import Path import httpx from dotenv import load_dotenv _REPO_ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(_REPO_ROOT)) # worktree root first — avoid the import trap from harness.epc_bulk import RangeFile, is_sap_version, parse_bulk_line # noqa: E402 _BULK_URL = ( "https://api.get-energy-performance-data.communities.gov.uk/api/files/domestic/json" ) _DEFAULT_OUT = _REPO_ROOT / "epc_dump" # Read the deflate stream in ~4 MB compressed chunks. _CHUNK = 4 * 1024 * 1024 def _fresh_s3_object(token: str) -> tuple[str, int]: """Resolve the bulk endpoint's 302 to its temporary S3 URL and total size.""" redirect = httpx.get( _BULK_URL, headers={"Authorization": f"Bearer {token}", "Accept": "application/json"}, timeout=30, follow_redirects=False, ) if redirect.status_code != 302: raise SystemExit(f"expected 302 from bulk endpoint, got {redirect.status_code}") s3_url: str = redirect.headers["location"] probe = httpx.get(s3_url, headers={"Range": "bytes=0-0"}, timeout=60) total: int = int(probe.headers["content-range"].split("/")[-1]) return s3_url, total def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Sample a year from the EPC bulk export.") parser.add_argument("--year", default="2026", help="certificate year (default 2026)") parser.add_argument("--limit", type=int, default=100, help="certs to keep") parser.add_argument("--sap-version", default="10.2", help="SAP version filter") parser.add_argument("--out", type=Path, default=_DEFAULT_OUT, help="dump directory") return parser.parse_args() def main() -> int: args = _parse_args() load_dotenv(_REPO_ROOT / "backend" / ".env") token = os.environ.get("OPEN_EPC_API_TOKEN") if not token: print("OPEN_EPC_API_TOKEN is not set (backend/.env) — cannot fetch") return 2 out: Path = args.out out.mkdir(parents=True, exist_ok=True) member = f"certificates-{args.year}.json" print(f"resolving bulk archive for {member} (SAP {args.sap_version}, first {args.limit})...") s3_url, total = _fresh_s3_object(token) print(f"archive {total / 1e9:.2f} GB — streaming {member} (range requests, early stop)\n") written = 0 scanned = 0 skipped_version = 0 range_file = RangeFile(s3_url, total) archive = zipfile.ZipFile(range_file) buffer = "" with archive.open(member) as stream: while written < args.limit: compressed = stream.read(_CHUNK) if not compressed: break buffer += compressed.decode("utf-8", errors="replace") lines = buffer.split("\n") buffer = lines.pop() # keep the trailing partial line for the next chunk for line in lines: parsed = parse_bulk_line(line) if parsed is None: continue scanned += 1 cert_number, document = parsed if not is_sap_version(document, args.sap_version): skipped_version += 1 continue (out / f"{cert_number}.json").write_text( json.dumps(document), encoding="utf-8" ) written += 1 if written >= args.limit: break print( f"wrote {written} certs ({scanned} scanned, {skipped_version} non-SAP-{args.sap_version}) " f"-> {out.resolve()}\n" f"transferred ~{range_file.bytes_read / 1e6:.0f} MB (early stop; full archive is {total / 1e9:.1f} GB)" ) print(f"\nnow run: python -m scripts.run_property_report {out}") range_file.close() return 0 if __name__ == "__main__": sys.exit(main())