from typing import List from datetime import datetime, timedelta, timezone from sqlalchemy.exc import SQLAlchemyError from backend.app.db.models.epc import EpcStore from sqlmodel import Session from sqlalchemy.dialects.postgresql import insert class EpcStoreService: """ Service layer for EPC data lookup and persistence. """ FRESHNESS_DAYS = 14 # Upgraded to 14 days # status labels FRESH = "fresh" EXPIRED = "expired" MISSING = "missing" @classmethod def get_epc_for_uprn(cls, session: Session, uprn: int): """ Query EPC data for a given UPRN and return a dict describing: - epc_api: only if within last 21 days - epc_page: only if epc_api exists - status: 'fresh', 'expired', or 'missing' """ record = session.query(EpcStore).filter(EpcStore.uprn == uprn).first() if not record: return {"status": cls.MISSING, "epc_api": None, "epc_page": None} if not record.epc_api_created_at: # API data missing → treat as missing even if page data exists return {"status": cls.MISSING, "epc_api": None, "epc_page": None} # check freshness cutoff = datetime.now(timezone.utc) - timedelta(days=EpcStoreService.FRESHNESS_DAYS) if record.epc_api_created_at.date() < cutoff.date(): return {"status": cls.EXPIRED, "epc_api": None, "epc_page": None} # Fresh API → include page only if present return { "status": cls.FRESH, "epc_api": record.epc_api, "epc_page": record.epc_page if record.epc_page else None, "epc_page_rrn": record.epc_page_rrn, "epc_api_created_at": record.epc_api_created_at, "epc_page_created_at": record.epc_page_created_at, } @classmethod def get_epcs_for_uprns(cls, session: Session, uprns: List[int]) -> dict[int, dict]: """ Given a list of uprns, return a dict mapping each uprn to its EPC data status and content. :param session: :param uprns: :return: """ if not uprns: return {} cutoff = datetime.now(timezone.utc) - timedelta(days=cls.FRESHNESS_DAYS) records = ( session.query(EpcStore) .filter(EpcStore.uprn.in_(uprns)) .all() ) result: dict[int, dict] = {} for record in records: if not record.epc_api_created_at: result[record.uprn] = { "status": cls.MISSING, "epc_api": None, "epc_page": None, "epc_page_rrn": None, "epc_api_created_at": None, "epc_page_created_at": None, } continue if record.epc_api_created_at.date() < cutoff.date(): # We only expose epc_page when epc_api is fresh. result[record.uprn] = { "status": cls.EXPIRED, "epc_api": None, "epc_page": None, "epc_page_rrn": None, "epc_api_created_at": None, "epc_page_created_at": None, } continue result[record.uprn] = { "status": cls.FRESH, "epc_api": record.epc_api, "epc_page": record.epc_page, "epc_page_rrn": record.epc_page_rrn, "epc_api_created_at": record.epc_api_created_at, "epc_page_created_at": record.epc_page_created_at, } # For the uprns not found in records, mark them as missing requested = set(uprns) found = set(result.keys()) missing = requested - found for uprn in missing: result[uprn] = { "status": cls.MISSING, "epc_api": None, "epc_page": None, "epc_page_rrn": None, "epc_api_created_at": None, "epc_page_created_at": None, } return result @classmethod def check_insert_needed(cls, epc_cache, epc_estimated, uprn): """ Check if an insert is needed based on existing data. :return: """ no_existing_epc_cache = epc_cache.get("epc_api") is None existing_cache_expired = ( epc_cache.get("status") == cls.EXPIRED ) needs_insert = bool((no_existing_epc_cache or existing_cache_expired) and not epc_estimated and uprn) return needs_insert @staticmethod def upsert_epc_data( session: Session, uprn: int, epc_api: dict | None, epc_page: str | None, epc_page_rrn: str | None, epc_api_created_at: datetime | None = None, epc_page_created_at: datetime | None = None, ): """ Insert or update EPC data for a UPRN. Rules: - If record exists → update it - If record does not exist → create new """ try: record = session.query(EpcStore).filter(EpcStore.uprn == uprn).first() if record: # update path if epc_api is not None: record.epc_api = epc_api if epc_api_created_at is None: epc_api_created_at = datetime.now(timezone.utc) record.epc_api_created_at = epc_api_created_at # update page data only if BOTH: # 1) the caller passed page data # 2) epc_api is not None (page only allowed when API exists) if epc_page is not None and epc_api is not None: record.epc_page = epc_page record.epc_page_rrn = epc_page_rrn if epc_page_created_at is None: epc_page_created_at = datetime.now(timezone.utc) record.epc_page_created_at = epc_page_created_at else: # insert path record = EpcStore( uprn=uprn, epc_api=epc_api, epc_api_created_at=epc_api_created_at, epc_page=epc_page if epc_api is not None else None, epc_page_rrn=epc_page_rrn if epc_api is not None else None, epc_page_created_at=epc_page_created_at if epc_api is not None else None, ) session.add(record) return record except SQLAlchemyError as e: raise e @classmethod def bulk_upsert_epc_data(cls, session: Session, rows_to_insert: list[dict]): if not rows_to_insert: return now = datetime.now(timezone.utc) values = [ { "uprn": row["uprn"], "epc_api": row["epc_api"], "epc_api_created_at": now, "epc_page": row["epc_page"], "epc_page_rrn": row["epc_page_rrn"], "epc_page_created_at": now if row["epc_page"] else None, } for row in rows_to_insert ] insert_stmt = insert(EpcStore).values(values) stmt = insert_stmt.on_conflict_do_update( index_elements=["uprn"], set_={ "epc_api": insert_stmt.excluded.epc_api, "epc_api_created_at": insert_stmt.excluded.epc_api_created_at, "epc_page": insert_stmt.excluded.epc_page, "epc_page_rrn": insert_stmt.excluded.epc_page_rrn, "epc_page_created_at": insert_stmt.excluded.epc_page_created_at, }, ) session.execute(stmt) session.commit()