Model/backend/app/db/functions/epc_functions.py
2026-04-09 16:01:33 +01:00

229 lines
7.7 KiB
Python

from typing import List
from datetime import datetime, timedelta, timezone
from sqlalchemy.exc import SQLAlchemyError
from backend.app.db.models.epc import EpcStore
from sqlmodel import Session
from sqlalchemy.dialects.postgresql import insert
class EpcStoreService:
"""
Service layer for EPC data lookup and persistence.
"""
FRESHNESS_DAYS = 30 # Upgraded to 30 days
# status labels
FRESH = "fresh"
EXPIRED = "expired"
MISSING = "missing"
@classmethod
def get_epc_for_uprn(cls, session: Session, uprn: int):
"""
Query EPC data for a given UPRN and return a dict describing:
- epc_api: only if within last 30 days
- epc_page: only if epc_api exists
- status: 'fresh', 'expired', or 'missing'
"""
record = session.query(EpcStore).filter(EpcStore.uprn == uprn).first()
if not record:
return {"status": cls.MISSING, "epc_api": None, "epc_page": None}
if not record.epc_api_created_at:
# API data missing → treat as missing even if page data exists
return {"status": cls.MISSING, "epc_api": None, "epc_page": None}
# check freshness
cutoff = datetime.now(timezone.utc) - timedelta(days=EpcStoreService.FRESHNESS_DAYS)
if record.epc_api_created_at.date() < cutoff.date():
return {"status": cls.EXPIRED, "epc_api": None, "epc_page": None}
# Fresh API → include page only if present
return {
"status": cls.FRESH,
"epc_api": record.epc_api,
"epc_page": record.epc_page if record.epc_page else None,
"epc_page_rrn": record.epc_page_rrn,
"epc_api_created_at": record.epc_api_created_at,
"epc_page_created_at": record.epc_page_created_at,
}
@classmethod
def get_epcs_for_uprns(cls, session: Session, uprns: List[int]) -> dict[int, dict]:
"""
Given a list of uprns, return a dict mapping each uprn to its EPC data status and content.
:param session:
:param uprns:
:return:
"""
if not uprns:
return {}
cutoff = datetime.now(timezone.utc) - timedelta(days=cls.FRESHNESS_DAYS)
records = (
session.query(EpcStore)
.filter(EpcStore.uprn.in_(uprns))
.all()
)
result: dict[int, dict] = {}
for record in records:
if not record.epc_api_created_at:
result[record.uprn] = {
"status": cls.MISSING,
"epc_api": None,
"epc_page": None,
"epc_page_rrn": None,
"epc_api_created_at": None,
"epc_page_created_at": None,
}
continue
if record.epc_api_created_at.date() < cutoff.date():
# We only expose epc_page when epc_api is fresh.
result[record.uprn] = {
"status": cls.EXPIRED,
"epc_api": None,
"epc_page": None,
"epc_page_rrn": None,
"epc_api_created_at": None,
"epc_page_created_at": None,
}
continue
result[record.uprn] = {
"status": cls.FRESH,
"epc_api": record.epc_api,
"epc_page": record.epc_page,
"epc_page_rrn": record.epc_page_rrn,
"epc_api_created_at": record.epc_api_created_at,
"epc_page_created_at": record.epc_page_created_at,
}
# For the uprns not found in records, mark them as missing
requested = set(uprns)
found = set(result.keys())
missing = requested - found
for uprn in missing:
result[uprn] = {
"status": cls.MISSING,
"epc_api": None,
"epc_page": None,
"epc_page_rrn": None,
"epc_api_created_at": None,
"epc_page_created_at": None,
}
return result
@classmethod
def check_insert_needed(cls, epc_cache, epc_estimated, uprn):
"""
Check if an insert is needed based on existing data.
:return:
"""
no_existing_epc_cache = epc_cache.get("epc_api") is None
existing_cache_expired = (
epc_cache.get("status") == cls.EXPIRED
)
needs_insert = bool((no_existing_epc_cache or existing_cache_expired) and not epc_estimated and uprn)
return needs_insert
@staticmethod
def upsert_epc_data(
session: Session,
uprn: int,
epc_api: dict | None,
epc_page: str | None,
epc_page_rrn: str | None,
epc_api_created_at: datetime | None = None,
epc_page_created_at: datetime | None = None,
):
"""
Insert or update EPC data for a UPRN.
Rules:
- If record exists → update it
- If record does not exist → create new
"""
try:
record = session.query(EpcStore).filter(EpcStore.uprn == uprn).first()
if record:
# update path
if epc_api is not None:
record.epc_api = epc_api
if epc_api_created_at is None:
epc_api_created_at = datetime.now(timezone.utc)
record.epc_api_created_at = epc_api_created_at
# update page data only if BOTH:
# 1) the caller passed page data
# 2) epc_api is not None (page only allowed when API exists)
if epc_page is not None and epc_api is not None:
record.epc_page = epc_page
record.epc_page_rrn = epc_page_rrn
if epc_page_created_at is None:
epc_page_created_at = datetime.now(timezone.utc)
record.epc_page_created_at = epc_page_created_at
else:
# insert path
record = EpcStore(
uprn=uprn,
epc_api=epc_api,
epc_api_created_at=epc_api_created_at,
epc_page=epc_page if epc_api is not None else None,
epc_page_rrn=epc_page_rrn if epc_api is not None else None,
epc_page_created_at=epc_page_created_at if epc_api is not None else None,
)
session.add(record)
return record
except SQLAlchemyError as e:
raise e
@classmethod
def bulk_upsert_epc_data(cls, session: Session, rows_to_insert: list[dict]):
if not rows_to_insert:
return
now = datetime.now(timezone.utc)
values = [
{
"uprn": row["uprn"],
"epc_api": row["epc_api"],
"epc_api_created_at": now,
"epc_page": row["epc_page"],
"epc_page_rrn": row["epc_page_rrn"],
"epc_page_created_at": now if row["epc_page"] else None,
}
for row in rows_to_insert
]
insert_stmt = insert(EpcStore).values(values)
stmt = insert_stmt.on_conflict_do_update(
index_elements=["uprn"],
set_={
"epc_api": insert_stmt.excluded.epc_api,
"epc_api_created_at": insert_stmt.excluded.epc_api_created_at,
"epc_page": insert_stmt.excluded.epc_page,
"epc_page_rrn": insert_stmt.excluded.epc_page_rrn,
"epc_page_created_at": insert_stmt.excluded.epc_page_created_at,
},
)
session.execute(stmt)
session.commit()