From 7640baec02a1fe7084eb383811e4c26b695e06c3 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 25 Nov 2025 18:22:20 +0000 Subject: [PATCH] implemented EPC caching logic --- backend/SearchEpc.py | 11 +- backend/app/db/functions/__init__.py | 12 +++ backend/app/db/functions/epc_functions.py | 125 ++++++++++++++++++++++ backend/engine/engine.py | 46 ++++++-- etl/find_my_epc/RetrieveFindMyEpc.py | 49 +++++++-- 5 files changed, 223 insertions(+), 20 deletions(-) create mode 100644 backend/app/db/functions/__init__.py create mode 100644 backend/app/db/functions/epc_functions.py diff --git a/backend/SearchEpc.py b/backend/SearchEpc.py index eb2b0b23..c47e82c4 100644 --- a/backend/SearchEpc.py +++ b/backend/SearchEpc.py @@ -917,7 +917,7 @@ class SearchEpc: return agg[key].values[0] - def find_property(self, skip_os=False): + def find_property(self, skip_os=False, api_data=None): """ This method will attempt to identify a property. It will, at first, use the EPC api to try and find the EPC for the property and the associated UPRN. If this fails, it will use the Ordnance Survey API to @@ -928,10 +928,17 @@ class SearchEpc: as a final check to see if there is any EPC data. If there is no EPC data, the epc data will be estimated based on the surrounding properties + + :param skip_os: If True, the ordnance survey api will be skipped and only the EPC api will be used + :param api_data: If provided, this data will be used instead of querying the EPC api """ # Step 1: use the epc api to find the property and uprn - response = self.get_epc() + if api_data: + self.data = api_data + response = {"status": 200} + else: + response = self.get_epc() if response["status"] == 200: ( diff --git a/backend/app/db/functions/__init__.py b/backend/app/db/functions/__init__.py new file mode 100644 index 00000000..0f239d6e --- /dev/null +++ b/backend/app/db/functions/__init__.py @@ -0,0 +1,12 @@ +from .epc_functions import * +from .address_functions import * +from .portfolio_functions import * +from .energy_assessment_functions import * +from .property_functions import * +from .recommendations_functions import * +from .solar_functions import * +from .funding_functions import * +from .materials_functions import * +from .inspections_functions import * +from .non_intrusive_surveys import * +from .whlg_functions import * diff --git a/backend/app/db/functions/epc_functions.py b/backend/app/db/functions/epc_functions.py new file mode 100644 index 00000000..4b675f1f --- /dev/null +++ b/backend/app/db/functions/epc_functions.py @@ -0,0 +1,125 @@ +from datetime import datetime, timedelta, timezone +from sqlalchemy.orm import Session +from sqlalchemy.exc import SQLAlchemyError +from backend.app.db.models.epc import EpcStore + + +class EpcStoreService: + """ + Service layer for EPC data lookup and persistence. + """ + + FRESHNESS_DAYS = 30 + + # status labels + FRESH = "fresh" + EXPIRED = "expired" + MISSING = "missing" + + @classmethod + def get_epc_for_uprn(cls, session: Session, uprn: int): + """ + Query EPC data for a given UPRN and return a dict describing: + - epc_api: only if within last 30 days + - epc_page: only if epc_api exists + - status: 'fresh', 'expired', or 'missing' + """ + + record = session.query(EpcStore).filter(EpcStore.uprn == uprn).first() + + if not record: + return {"status": cls.MISSING, "epc_api": None, "epc_page": None} + + if not record.epc_api_created_at: + # API data missing → treat as missing even if page data exists + return {"status": cls.MISSING, "epc_api": None, "epc_page": None} + + # check freshness + cutoff = datetime.now(timezone.utc) - timedelta(days=EpcStoreService.FRESHNESS_DAYS) + + if record.epc_api_created_at.date() < cutoff.date(): + return {"status": cls.EXPIRED, "epc_api": None, "epc_page": None} + + # Fresh API → include page only if present + return { + "status": cls.FRESH, + "epc_api": record.epc_api, + "epc_page": record.epc_page if record.epc_page else None, + "epc_page_rrn": record.epc_page_rrn, + "epc_api_created_at": record.epc_api_created_at, + "epc_page_created_at": record.epc_page_created_at, + } + + @classmethod + def check_insert_needed(cls, epc_cache, epc_estimated, uprn): + """ + Check if an insert is needed based on existing data. + :return: + """ + no_existing_epc_cache = epc_cache.get("epc_api") is None + existing_cache_expired = ( + epc_cache.get("status") == cls.EXPIRED + ) + + needs_insert = bool((no_existing_epc_cache or existing_cache_expired) and not epc_estimated and uprn) + + return needs_insert + + @staticmethod + def upsert_epc_data( + session: Session, + uprn: int, + epc_api: dict | None, + epc_page: str | None, + epc_page_rrn: str | None, + epc_api_created_at: datetime | None = None, + epc_page_created_at: datetime | None = None, + ): + """ + Insert or update EPC data for a UPRN. + + Rules: + - If record exists → update it + - If record does not exist → create new + """ + + try: + record = session.query(EpcStore).filter(EpcStore.uprn == uprn).first() + + if record: + # update path + if epc_api is not None: + record.epc_api = epc_api + if epc_api_created_at is None: + epc_api_created_at = datetime.now(timezone.utc) + record.epc_api_created_at = epc_api_created_at + + # update page data only if BOTH: + # 1) the caller passed page data + # 2) epc_api is not None (page only allowed when API exists) + if epc_page is not None and epc_api is not None: + record.epc_page = epc_page + record.epc_page_rrn = epc_page_rrn + if epc_page_created_at is None: + epc_page_created_at = datetime.now(timezone.utc) + record.epc_page_created_at = epc_page_created_at + else: + # insert path + record = EpcStore( + uprn=uprn, + epc_api=epc_api, + epc_api_created_at=epc_api_created_at, + epc_page=epc_page if epc_api is not None else None, + epc_page_rrn=epc_page_rrn if epc_api is not None else None, + epc_page_created_at=epc_page_created_at if epc_api is not None else None, + ) + session.add(record) + + session.flush() + session.commit() + + return record + + except SQLAlchemyError as e: + session.rollback() + raise e diff --git a/backend/engine/engine.py b/backend/engine/engine.py index 6e90a297..ee415593 100644 --- a/backend/engine/engine.py +++ b/backend/engine/engine.py @@ -5,6 +5,7 @@ from datetime import datetime from tqdm import tqdm import pandas as pd import numpy as np + from etl.epc.Record import EPCRecord from backend.SearchEpc import SearchEpc from sqlalchemy.exc import IntegrityError, OperationalError @@ -24,7 +25,7 @@ from backend.app.db.functions.recommendations_functions import ( ) from backend.app.db.functions.funding_functions import upload_funding from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn -from backend.app.db.functions.address_functions import get_associated_uprns +import backend.app.db.functions as db_funcs from backend.app.db.models.portfolio import rating_lookup from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES @@ -527,6 +528,14 @@ async def model_engine(body: PlanTriggerRequest): if uprn: uprn = int(float(uprn)) + epc_api_data, epc_page, rrn, epc_cache = None, None, None, {} + if uprn: + # if we have a UPRN, we check if we already have EPC data associated with this UPRN + epc_cache = db_funcs.epc_functions.EpcStoreService.get_epc_for_uprn(session, uprn) + + if epc_cache["status"] == db_funcs.epc_functions.EpcStoreService.FRESH: + epc_api_data, epc_page, rrn = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"] + address1 = config.get("address", None) # Handle domna address list format if pd.isnull(address1) and body.file_format == "domna_asset_list": @@ -540,7 +549,9 @@ async def model_engine(body: PlanTriggerRequest): if (body.event_type == "remote_assessment") and config.get("property_type") == "Flat": # We're running a remote assessment for a flat - we go and grab the associated # UPRNS for other units in the same building - associated_uprns = get_associated_uprns(session, postcode=config["postcode"], uprn=uprn) + associated_uprns = db_funcs.address_functions.get_associated_uprns( + session, postcode=config["postcode"], uprn=uprn + ) epc_searcher = SearchEpc( address1=address1, @@ -555,7 +566,9 @@ async def model_engine(body: PlanTriggerRequest): epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None) epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None) # For the moment, our OS API access is unavailable, so we skip and interpolate - epc_searcher.find_property(skip_os=True) + + epc_searcher.find_property(skip_os=True, api_data=epc_api_data) + if epc_searcher.newest_epc.get("estimated") and body.file_format == "domna_asset_list" and ( epc_searcher.newest_epc["uprn"] < 0 ): @@ -609,18 +622,19 @@ async def model_engine(body: PlanTriggerRequest): patch = req_data.patch # if we have a remote assment data type, we pull the additional data and include it + epc_page_source = {} if (body.event_type == "remote_assessment") and not (epc_searcher.newest_epc.get("estimated")): logger.info("Retrieving find my epc data") try: - property_non_invasive_recommendations, patch = RetrieveFindMyEpc.get_from_epc( - epc_searcher.newest_epc + property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc( + epc_searcher.newest_epc, epc_page, rrn=rrn ) except Exception as e: logger.error(f"Failed to retrieve without cleaning address {e}") for k in ["address", "address1"]: epc_searcher.newest_epc[k] = epc_searcher.address_clean - property_non_invasive_recommendations, patch = RetrieveFindMyEpc.get_from_epc( - epc_searcher.newest_epc + property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc( + epc_searcher.newest_epc, epc_page, rrn=rrn ) # If we have a property type, this means when we pull the epc data, we might need to make a patch @@ -657,6 +671,24 @@ async def model_engine(body: PlanTriggerRequest): ) ) + # If we have: + # 1) No EPC API data + # 2) A real EPC + # 3) A UPRN (meaning that a UPRN could be fetched against that property) + # We store this data + + if db_funcs.epc_functions.EpcStoreService.check_insert_needed( + epc_cache, epc_searcher.newest_epc.get("estimated"), epc_searcher.uprn + ): + # We store the EPC data we have found for this property + db_funcs.epc_functions.EpcStoreService.upsert_epc_data( + session=session, + uprn=epc_searcher.uprn, + epc_api=epc_searcher.data, + epc_page=epc_page_source.get("page_source"), + epc_page_rrn=epc_page_source.get("rrn"), + ) + if not input_properties: return Response(status_code=204) diff --git a/etl/find_my_epc/RetrieveFindMyEpc.py b/etl/find_my_epc/RetrieveFindMyEpc.py index c57f9ca8..ae9e5ff7 100644 --- a/etl/find_my_epc/RetrieveFindMyEpc.py +++ b/etl/find_my_epc/RetrieveFindMyEpc.py @@ -371,9 +371,12 @@ class RetrieveFindMyEpc: return all_find_my_epc_data - def retrieve_newest_find_my_epc_data(self, sap_2012_date=None, return_page=False): + def _find_epc_page(self): """ - For a post code and address, we pull out all the required data from the find my epc website + This function is used to find the EPC page source for a given address and postcode. + It is done by fetching the page, associating to the postcode and then matching the + addresses on the page to the address we have been given. + :return: """ postcode_input = self.postcode.replace(" ", "+") @@ -428,8 +431,22 @@ class RetrieveFindMyEpc: chosen_epc = self.BASE_ENERGY_URL + extracted_table[0]['extracted_address_url'] epc_certificate = chosen_epc.split('/')[-1] - address_response = requests.get(chosen_epc, headers=self.HEADERS) - address_res = BeautifulSoup(address_response.text, features="html.parser") + return chosen_epc, epc_certificate + + def retrieve_newest_find_my_epc_data(self, sap_2012_date=None, return_page=False, epc_page_source=None, rrn=None): + """ + For a post code and address, we pull out all the required data from the find my epc website + """ + + if epc_page_source is None: + chosen_epc, rrn = self._find_epc_page() + address_response = requests.get(chosen_epc, headers=self.HEADERS) + epc_page_source = address_response.text + address_res = BeautifulSoup(address_response.text, features="html.parser") + else: + if rrn is None: + raise ValueError("rrn must be provided if epc_page_source is provided") + address_res = BeautifulSoup(epc_page_source, features="html.parser") # Key data we want to retrieve: # 1) Rating @@ -565,7 +582,7 @@ class RetrieveFindMyEpc: epc_data = self.extract_epc_data(address_res) resulting_data = { - 'epc_certificate': epc_certificate, + 'epc_certificate': rrn, 'current_epc_rating': current_rating.split(' ')[-6], 'current_epc_efficiency': current_sap, 'potential_epc_rating': potential_rating.split(' ')[-6], @@ -576,11 +593,12 @@ class RetrieveFindMyEpc: "epc_data": epc_data, **assessment_data, **low_carbon_energy_sources, + "page_source": epc_page_source, } if return_page: # We return the page text as well, which can be parsed again later - return resulting_data, postcode_response.text + return resulting_data, epc_page_source return resulting_data @@ -722,11 +740,15 @@ class RetrieveFindMyEpc: return formatted_recommendations @classmethod - def get_from_epc(cls, epc): + def get_from_epc(cls, epc, epc_page_source=None, rrn=None): + + if epc_page_source is not None and rrn is None: + raise ValueError("rrn must be provided if epc_page_source is provided") + # Attempt both methods: try: searcher = cls(address=epc["address"], postcode=epc["postcode"]) - find_epc_data = searcher.retrieve_newest_find_my_epc_data() + find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn) except Exception as e: logger.error(f"Error retrieving find my epc data: {e}") @@ -734,7 +756,7 @@ class RetrieveFindMyEpc: address1 = ",".join(epc["address"].split(",")[:-1]) try: searcher = cls(address=address1, postcode=epc["postcode"]) - find_epc_data = searcher.retrieve_newest_find_my_epc_data() + find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn) logger.info("Successfully retrieved find my epc data using trimmed address") except Exception as e2: logger.error(f"Error retrieving find my epc data using trimmed address: {e2}") @@ -747,7 +769,7 @@ class RetrieveFindMyEpc: address1 = epc["address1"] # We attempt with the backup add searcher = cls(address=address1, postcode=epc["postcode"]) - find_epc_data = searcher.retrieve_newest_find_my_epc_data() + find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn) logger.info("Successfully retrieved find my epc data using backup address") non_invasive_recommendations = { @@ -766,4 +788,9 @@ class RetrieveFindMyEpc: **find_epc_data["epc_data"], } - return non_invasive_recommendations, patch + page_source = { + "rrn": find_epc_data["epc_certificate"], + "page_source": find_epc_data["page_source"] + } + + return non_invasive_recommendations, patch, page_source