implemented EPC caching logic

This commit is contained in:
Khalim Conn-Kowlessar 2025-11-25 18:22:20 +00:00
parent 7fde580b37
commit 7640baec02
5 changed files with 223 additions and 20 deletions

View file

@ -917,7 +917,7 @@ class SearchEpc:
return agg[key].values[0]
def find_property(self, skip_os=False):
def find_property(self, skip_os=False, api_data=None):
"""
This method will attempt to identify a property. It will, at first, use the EPC api to try and
find the EPC for the property and the associated UPRN. If this fails, it will use the Ordnance Survey API to
@ -928,10 +928,17 @@ class SearchEpc:
as a final check to see if there is any EPC data.
If there is no EPC data, the epc data will be estimated based on the surrounding properties
:param skip_os: If True, the ordnance survey api will be skipped and only the EPC api will be used
:param api_data: If provided, this data will be used instead of querying the EPC api
"""
# Step 1: use the epc api to find the property and uprn
response = self.get_epc()
if api_data:
self.data = api_data
response = {"status": 200}
else:
response = self.get_epc()
if response["status"] == 200:
(

View file

@ -0,0 +1,12 @@
from .epc_functions import *
from .address_functions import *
from .portfolio_functions import *
from .energy_assessment_functions import *
from .property_functions import *
from .recommendations_functions import *
from .solar_functions import *
from .funding_functions import *
from .materials_functions import *
from .inspections_functions import *
from .non_intrusive_surveys import *
from .whlg_functions import *

View file

@ -0,0 +1,125 @@
from datetime import datetime, timedelta, timezone
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from backend.app.db.models.epc import EpcStore
class EpcStoreService:
"""
Service layer for EPC data lookup and persistence.
"""
FRESHNESS_DAYS = 30
# status labels
FRESH = "fresh"
EXPIRED = "expired"
MISSING = "missing"
@classmethod
def get_epc_for_uprn(cls, session: Session, uprn: int):
"""
Query EPC data for a given UPRN and return a dict describing:
- epc_api: only if within last 30 days
- epc_page: only if epc_api exists
- status: 'fresh', 'expired', or 'missing'
"""
record = session.query(EpcStore).filter(EpcStore.uprn == uprn).first()
if not record:
return {"status": cls.MISSING, "epc_api": None, "epc_page": None}
if not record.epc_api_created_at:
# API data missing → treat as missing even if page data exists
return {"status": cls.MISSING, "epc_api": None, "epc_page": None}
# check freshness
cutoff = datetime.now(timezone.utc) - timedelta(days=EpcStoreService.FRESHNESS_DAYS)
if record.epc_api_created_at.date() < cutoff.date():
return {"status": cls.EXPIRED, "epc_api": None, "epc_page": None}
# Fresh API → include page only if present
return {
"status": cls.FRESH,
"epc_api": record.epc_api,
"epc_page": record.epc_page if record.epc_page else None,
"epc_page_rrn": record.epc_page_rrn,
"epc_api_created_at": record.epc_api_created_at,
"epc_page_created_at": record.epc_page_created_at,
}
@classmethod
def check_insert_needed(cls, epc_cache, epc_estimated, uprn):
"""
Check if an insert is needed based on existing data.
:return:
"""
no_existing_epc_cache = epc_cache.get("epc_api") is None
existing_cache_expired = (
epc_cache.get("status") == cls.EXPIRED
)
needs_insert = bool((no_existing_epc_cache or existing_cache_expired) and not epc_estimated and uprn)
return needs_insert
@staticmethod
def upsert_epc_data(
session: Session,
uprn: int,
epc_api: dict | None,
epc_page: str | None,
epc_page_rrn: str | None,
epc_api_created_at: datetime | None = None,
epc_page_created_at: datetime | None = None,
):
"""
Insert or update EPC data for a UPRN.
Rules:
- If record exists update it
- If record does not exist create new
"""
try:
record = session.query(EpcStore).filter(EpcStore.uprn == uprn).first()
if record:
# update path
if epc_api is not None:
record.epc_api = epc_api
if epc_api_created_at is None:
epc_api_created_at = datetime.now(timezone.utc)
record.epc_api_created_at = epc_api_created_at
# update page data only if BOTH:
# 1) the caller passed page data
# 2) epc_api is not None (page only allowed when API exists)
if epc_page is not None and epc_api is not None:
record.epc_page = epc_page
record.epc_page_rrn = epc_page_rrn
if epc_page_created_at is None:
epc_page_created_at = datetime.now(timezone.utc)
record.epc_page_created_at = epc_page_created_at
else:
# insert path
record = EpcStore(
uprn=uprn,
epc_api=epc_api,
epc_api_created_at=epc_api_created_at,
epc_page=epc_page if epc_api is not None else None,
epc_page_rrn=epc_page_rrn if epc_api is not None else None,
epc_page_created_at=epc_page_created_at if epc_api is not None else None,
)
session.add(record)
session.flush()
session.commit()
return record
except SQLAlchemyError as e:
session.rollback()
raise e

View file

@ -5,6 +5,7 @@ from datetime import datetime
from tqdm import tqdm
import pandas as pd
import numpy as np
from etl.epc.Record import EPCRecord
from backend.SearchEpc import SearchEpc
from sqlalchemy.exc import IntegrityError, OperationalError
@ -24,7 +25,7 @@ from backend.app.db.functions.recommendations_functions import (
)
from backend.app.db.functions.funding_functions import upload_funding
from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn
from backend.app.db.functions.address_functions import get_associated_uprns
import backend.app.db.functions as db_funcs
from backend.app.db.models.portfolio import rating_lookup
from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES
@ -527,6 +528,14 @@ async def model_engine(body: PlanTriggerRequest):
if uprn:
uprn = int(float(uprn))
epc_api_data, epc_page, rrn, epc_cache = None, None, None, {}
if uprn:
# if we have a UPRN, we check if we already have EPC data associated with this UPRN
epc_cache = db_funcs.epc_functions.EpcStoreService.get_epc_for_uprn(session, uprn)
if epc_cache["status"] == db_funcs.epc_functions.EpcStoreService.FRESH:
epc_api_data, epc_page, rrn = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"]
address1 = config.get("address", None)
# Handle domna address list format
if pd.isnull(address1) and body.file_format == "domna_asset_list":
@ -540,7 +549,9 @@ async def model_engine(body: PlanTriggerRequest):
if (body.event_type == "remote_assessment") and config.get("property_type") == "Flat":
# We're running a remote assessment for a flat - we go and grab the associated
# UPRNS for other units in the same building
associated_uprns = get_associated_uprns(session, postcode=config["postcode"], uprn=uprn)
associated_uprns = db_funcs.address_functions.get_associated_uprns(
session, postcode=config["postcode"], uprn=uprn
)
epc_searcher = SearchEpc(
address1=address1,
@ -555,7 +566,9 @@ async def model_engine(body: PlanTriggerRequest):
epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None)
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)
# For the moment, our OS API access is unavailable, so we skip and interpolate
epc_searcher.find_property(skip_os=True)
epc_searcher.find_property(skip_os=True, api_data=epc_api_data)
if epc_searcher.newest_epc.get("estimated") and body.file_format == "domna_asset_list" and (
epc_searcher.newest_epc["uprn"] < 0
):
@ -609,18 +622,19 @@ async def model_engine(body: PlanTriggerRequest):
patch = req_data.patch
# if we have a remote assment data type, we pull the additional data and include it
epc_page_source = {}
if (body.event_type == "remote_assessment") and not (epc_searcher.newest_epc.get("estimated")):
logger.info("Retrieving find my epc data")
try:
property_non_invasive_recommendations, patch = RetrieveFindMyEpc.get_from_epc(
epc_searcher.newest_epc
property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc(
epc_searcher.newest_epc, epc_page, rrn=rrn
)
except Exception as e:
logger.error(f"Failed to retrieve without cleaning address {e}")
for k in ["address", "address1"]:
epc_searcher.newest_epc[k] = epc_searcher.address_clean
property_non_invasive_recommendations, patch = RetrieveFindMyEpc.get_from_epc(
epc_searcher.newest_epc
property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc(
epc_searcher.newest_epc, epc_page, rrn=rrn
)
# If we have a property type, this means when we pull the epc data, we might need to make a patch
@ -657,6 +671,24 @@ async def model_engine(body: PlanTriggerRequest):
)
)
# If we have:
# 1) No EPC API data
# 2) A real EPC
# 3) A UPRN (meaning that a UPRN could be fetched against that property)
# We store this data
if db_funcs.epc_functions.EpcStoreService.check_insert_needed(
epc_cache, epc_searcher.newest_epc.get("estimated"), epc_searcher.uprn
):
# We store the EPC data we have found for this property
db_funcs.epc_functions.EpcStoreService.upsert_epc_data(
session=session,
uprn=epc_searcher.uprn,
epc_api=epc_searcher.data,
epc_page=epc_page_source.get("page_source"),
epc_page_rrn=epc_page_source.get("rrn"),
)
if not input_properties:
return Response(status_code=204)

View file

@ -371,9 +371,12 @@ class RetrieveFindMyEpc:
return all_find_my_epc_data
def retrieve_newest_find_my_epc_data(self, sap_2012_date=None, return_page=False):
def _find_epc_page(self):
"""
For a post code and address, we pull out all the required data from the find my epc website
This function is used to find the EPC page source for a given address and postcode.
It is done by fetching the page, associating to the postcode and then matching the
addresses on the page to the address we have been given.
:return:
"""
postcode_input = self.postcode.replace(" ", "+")
@ -428,8 +431,22 @@ class RetrieveFindMyEpc:
chosen_epc = self.BASE_ENERGY_URL + extracted_table[0]['extracted_address_url']
epc_certificate = chosen_epc.split('/')[-1]
address_response = requests.get(chosen_epc, headers=self.HEADERS)
address_res = BeautifulSoup(address_response.text, features="html.parser")
return chosen_epc, epc_certificate
def retrieve_newest_find_my_epc_data(self, sap_2012_date=None, return_page=False, epc_page_source=None, rrn=None):
"""
For a post code and address, we pull out all the required data from the find my epc website
"""
if epc_page_source is None:
chosen_epc, rrn = self._find_epc_page()
address_response = requests.get(chosen_epc, headers=self.HEADERS)
epc_page_source = address_response.text
address_res = BeautifulSoup(address_response.text, features="html.parser")
else:
if rrn is None:
raise ValueError("rrn must be provided if epc_page_source is provided")
address_res = BeautifulSoup(epc_page_source, features="html.parser")
# Key data we want to retrieve:
# 1) Rating
@ -565,7 +582,7 @@ class RetrieveFindMyEpc:
epc_data = self.extract_epc_data(address_res)
resulting_data = {
'epc_certificate': epc_certificate,
'epc_certificate': rrn,
'current_epc_rating': current_rating.split(' ')[-6],
'current_epc_efficiency': current_sap,
'potential_epc_rating': potential_rating.split(' ')[-6],
@ -576,11 +593,12 @@ class RetrieveFindMyEpc:
"epc_data": epc_data,
**assessment_data,
**low_carbon_energy_sources,
"page_source": epc_page_source,
}
if return_page:
# We return the page text as well, which can be parsed again later
return resulting_data, postcode_response.text
return resulting_data, epc_page_source
return resulting_data
@ -722,11 +740,15 @@ class RetrieveFindMyEpc:
return formatted_recommendations
@classmethod
def get_from_epc(cls, epc):
def get_from_epc(cls, epc, epc_page_source=None, rrn=None):
if epc_page_source is not None and rrn is None:
raise ValueError("rrn must be provided if epc_page_source is provided")
# Attempt both methods:
try:
searcher = cls(address=epc["address"], postcode=epc["postcode"])
find_epc_data = searcher.retrieve_newest_find_my_epc_data()
find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn)
except Exception as e:
logger.error(f"Error retrieving find my epc data: {e}")
@ -734,7 +756,7 @@ class RetrieveFindMyEpc:
address1 = ",".join(epc["address"].split(",")[:-1])
try:
searcher = cls(address=address1, postcode=epc["postcode"])
find_epc_data = searcher.retrieve_newest_find_my_epc_data()
find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn)
logger.info("Successfully retrieved find my epc data using trimmed address")
except Exception as e2:
logger.error(f"Error retrieving find my epc data using trimmed address: {e2}")
@ -747,7 +769,7 @@ class RetrieveFindMyEpc:
address1 = epc["address1"]
# We attempt with the backup add
searcher = cls(address=address1, postcode=epc["postcode"])
find_epc_data = searcher.retrieve_newest_find_my_epc_data()
find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn)
logger.info("Successfully retrieved find my epc data using backup address")
non_invasive_recommendations = {
@ -766,4 +788,9 @@ class RetrieveFindMyEpc:
**find_epc_data["epc_data"],
}
return non_invasive_recommendations, patch
page_source = {
"rrn": find_epc_data["epc_certificate"],
"page_source": find_epc_data["page_source"]
}
return non_invasive_recommendations, patch, page_source