Model/infrastructure/epc_client/epc_client_service.py
Khalim Conn-Kowlessar caee4de2f4 feat(ingestion): relocate EpcClientService to infrastructure + SolarRepo (#1133)
Move the EpcClientService package (client + _retry + exceptions + tests) from
the dying backend/ tree to infrastructure/epc_client/ as the New-EPC-API Fetcher;
update the two callers (address2UPRN, a script). All 14 client tests pass.

Add SolarRepository port + SolarPostgresRepository persisting Google Solar
building insights as JSONB (solar_building_insights table), one row per Property.
The EPC repo half of this slice already landed in #1129. pyright strict clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-30 19:45:26 +00:00

124 lines
4.4 KiB
Python

# Spec: https://raw.githubusercontent.com/communitiesuk/epb-data-warehouse/main/api/api.yml
from __future__ import annotations
from typing import Any, Optional
import httpx
from infrastructure.epc_client.exceptions import (
EpcApiError,
EpcNotFoundError,
EpcRateLimitError,
)
from infrastructure.epc_client._retry import call_with_retry
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from datatypes.epc.search import EpcSearchResult
class EpcClientService:
BASE_URL = "https://api.get-energy-performance-data.communities.gov.uk"
REQUEST_TIMEOUT = 10.0
def __init__(self, auth_token: str) -> None:
self._headers = {
"Authorization": f"Bearer {auth_token}",
"Accept": "application/json",
}
@staticmethod
def _parse_retry_after(resp: httpx.Response) -> Optional[float]:
header = resp.headers.get("Retry-After")
if header is None:
return None
try:
return float(header)
except (TypeError, ValueError):
return None
def get_by_certificate_number(self, cert_num: str) -> EpcPropertyData:
raw = call_with_retry(lambda: self._fetch_certificate(cert_num))
return EpcPropertyDataMapper.from_api_response(raw)
def get_by_uprn(self, uprn: int) -> Optional[EpcPropertyData]:
results = call_with_retry(lambda: self._search(uprn=uprn))
if not results:
return None
latest = max(results, key=lambda r: r.registration_date)
return self.get_by_certificate_number(latest.certificate_number)
@staticmethod
def _normalise_postcode(postcode: str) -> str:
"""Return the postcode with all spaces removed and uppercased."""
return postcode.replace(" ", "").upper()
def search_by_postcode(self, postcode: str) -> list[EpcSearchResult]:
normalised = self._normalise_postcode(postcode)
return call_with_retry(lambda: self._search(postcode=normalised))
# ------------------------------------------------------------------
# Private helperEpcRateLimpolarss
# ------------------------------------------------------------------
def _fetch_certificate(self, cert_num: str) -> dict[str, Any]:
resp = httpx.get(
f"{self.BASE_URL}/api/certificate",
params={"certificate_number": cert_num},
headers=self._headers,
timeout=self.REQUEST_TIMEOUT,
)
if resp.status_code == 404:
raise EpcNotFoundError(cert_num)
if resp.status_code == 429:
raise EpcRateLimitError(
"Rate limited by EPC API",
retry_after=self._parse_retry_after(resp),
)
if not resp.is_success:
raise EpcApiError(f"EPC API error {resp.status_code}: {resp.text}")
return resp.json()["data"]
def _search(
self,
postcode: Optional[str] = None,
uprn: Optional[int] = None,
) -> list[EpcSearchResult]:
params: dict[str, str | int] = {}
if postcode:
params["postcode"] = postcode
if uprn is not None:
params["uprn"] = uprn
resp = httpx.get(
f"{self.BASE_URL}/api/domestic/search",
params=params,
headers=self._headers,
timeout=self.REQUEST_TIMEOUT,
)
if resp.status_code == 404:
return []
if resp.status_code == 429:
raise EpcRateLimitError(
"Rate limited by EPC API",
retry_after=self._parse_retry_after(resp),
)
if not resp.is_success:
raise EpcApiError(f"EPC API error {resp.status_code}: {resp.text}")
rows = resp.json().get("data", [])
return [self._parse_search_result(r) for r in rows]
@staticmethod
def _parse_search_result(row: dict[str, Any]) -> EpcSearchResult:
return EpcSearchResult(
certificate_number=row["certificateNumber"],
address_line_1=row["addressLine1"],
address_line_2=row.get("addressLine2"),
address_line_3=row.get("addressLine3"),
address_line_4=row.get("addressLine4"),
postcode=row["postcode"],
post_town=row["postTown"],
uprn=row.get("uprn"),
current_energy_efficiency_band=row["currentEnergyEfficiencyBand"],
registration_date=row["registrationDate"],
)