handling more cases

This commit is contained in:
Khalim Conn-Kowlessar 2025-11-30 20:12:50 +00:00
parent b2f6016e5b
commit 852420a8fa
6 changed files with 50 additions and 100 deletions

View file

@ -208,6 +208,7 @@ class SearchEpc:
# These are the address and postcode values, which we store in the database
self.address_clean = None
self.postcode_clean = None
self.address_postal_town = None
self.size = size if size is not None else 25
@ -490,7 +491,11 @@ class SearchEpc:
postcode = postcode.upper()
return address, postcode
# We also return a "postal town variant - useful for edge cases when fetching from find my EPC
address_postal_town = ", ".join(
[newest_epc["address1"], newest_epc["address2"], newest_epc["posttown"]]).strip().title()
return address, postcode, address_postal_town
def extract_epc_data(self, address=None):
@ -545,9 +550,9 @@ class SearchEpc:
return newest_epc, [], {}, "", "", None
# Retrieve postcode and address
address_epc, postcode_epc = self.format_address(newest_epc=newest_epc)
address_epc, postcode_epc, address_postal_town = self.format_address(newest_epc=newest_epc)
return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, uprn
return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, uprn, address_postal_town
@staticmethod
def filter_newest_epc(list_of_epcs: List):
@ -970,7 +975,8 @@ class SearchEpc:
if response["status"] == 200:
(
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn,
self.address_postal_town
) = self.extract_epc_data(address=self.full_address)
# Before we return, we check if we need to overwrite a SAP05 EPC
@ -1032,7 +1038,8 @@ class SearchEpc:
response = self.get_epc()
if response["status"] == 200:
(
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn,
self.address_postal_town
) = self.extract_epc_data()
return

View file

@ -170,67 +170,6 @@ def upload_recommendations(session: Session, recommendations_to_upload, property
return False
# def clear_portfolio(session: Session, portfolio_id: int):
# # Fetch all property IDs associated with the given portfolio
# property_ids = session.query(PropertyModel.id).filter(PropertyModel.portfolio_id == portfolio_id).all()
# property_ids = [p.id for p in property_ids]
#
# # Fetch all recommendation IDs associated with the properties
# recommendation_ids = session.query(Recommendation.id).filter(Recommendation.property_id.in_(property_ids)).all()
# recommendation_ids = [r.id for r in recommendation_ids]
#
# # Fetch all plan IDs associated with the portfolio
# plan_ids = session.query(Plan.id).filter(Plan.portfolio_id == portfolio_id).all()
# plan_ids = [p.id for p in plan_ids]
#
# # Delete all entries from RecommendationMaterials for these recommendations
# session.execute(
# delete(RecommendationMaterials).where(RecommendationMaterials.recommendation_id.in_(recommendation_ids))
# )
#
# # Delete all entries from PlanRecommendations that reference plans in the portfolio
# session.execute(delete(PlanRecommendations).where(PlanRecommendations.plan_id.in_(
# session.query(Plan.id).filter(Plan.portfolio_id == portfolio_id).subquery().as_scalar()
# )))
#
# # Delete FundingPackageMeasures → FundingPackage → Plan
# session.execute(
# delete(FundingPackageMeasures).where(FundingPackageMeasures.funding_package_id.in_(
# session.query(FundingPackage.id).filter(FundingPackage.plan_id.in_(plan_ids))
# ))
# )
# session.execute(
# delete(FundingPackage).where(FundingPackage.plan_id.in_(plan_ids))
# )
#
# # Delete all Plans associated with the portfolio
# session.execute(delete(Plan).where(Plan.portfolio_id == portfolio_id))
#
# # Delete all Scenarios associated with the portfolio
# session.execute(delete(Scenario).where(Scenario.portfolio_id == portfolio_id))
#
# # Delete all Recommendations associated with the properties
# session.execute(delete(Recommendation).where(Recommendation.property_id.in_(property_ids)))
#
# session.execute(
# delete(InspectionModel)
# .where(InspectionModel.property_id.in_(
# session.query(PropertyModel.id).filter(PropertyModel.portfolio_id == portfolio_id)
# ))
# .execution_options(synchronize_session=False)
# )
#
# # Now, delete the PropertyModels and related details
# # Delete PropertyTargetsModel, PropertyDetailsMeter, PropertyDetailsEpcModel, and PropertyModel
# session.execute(delete(PropertyTargetsModel).where(PropertyTargetsModel.portfolio_id == portfolio_id))
# # session.execute(delete(PropertyDetailsMeter).where(PropertyDetailsMeter.uprn.in_(property_ids)))
# session.execute(delete(PropertyDetailsEpcModel).where(PropertyDetailsEpcModel.portfolio_id == portfolio_id))
# session.execute(delete(PropertyModel).where(PropertyModel.portfolio_id == portfolio_id))
#
# # Commit the changes
# session.commit()
def chunked(iterable, size=100):
for i in range(0, len(iterable), size):
yield iterable[i:i + size]

View file

@ -691,7 +691,8 @@ async def model_engine(body: PlanTriggerRequest):
epc_page=epc_page,
rrn=rrn,
cleaned_address=epc_searcher.address_clean,
config_address=config["address"]
config_address=config["address"],
address_postal_town=epc_searcher.address_postal_town
)
)

View file

@ -48,6 +48,12 @@ class FloorAttributes(Definitions):
"crog, inswleiddio cyfyngedig (rhagdybiaeth)": "suspended, limited insulation (assumed)",
}
REMAP = {
# Have only seen this once - though perhaps need to investigate older EPCs in the production of EPC clean.
# When looking at a newer EPC, which had been re-assessed as another dwelling below
"above unheated space or full exposed": "(another dwelling below)",
}
def __init__(self, description: str):
self.description: str = description.lower()
@ -62,6 +68,10 @@ class FloorAttributes(Definitions):
# Try and perform a translation, incase it's in welsh
self.translate_welsh_text()
# Remap known issues
if self.description in self.REMAP:
self.description = self.REMAP[self.description]
# We handle seemind occurances of mixed translations
self.description = handle_mixed_translation(self.description)

View file

@ -375,6 +375,12 @@ clean_floor_cases = [
'thermal_transmittance_unit': 'w/m-¦k', 'is_assumed': False,
'is_to_unheated_space': False, 'is_to_external_air': False, 'is_suspended': False, 'is_solid': False,
'another_property_below': False, 'insulation_thickness': None
},
{
# This example gets remapped to another dwelling below
"description": "Above unheated space or full exposed",
'thermal_transmittance': 0, 'thermal_transmittance_unit': 'w/m-¦k', 'is_assumed': False,
'is_to_unheated_space': False, 'is_to_external_air': False, 'is_suspended': False, 'is_solid': False,
'another_property_below': True, 'insulation_thickness': None
}
]

View file

@ -22,7 +22,7 @@ class RetrieveFindMyEpc:
'Chrome/111.0.0.0 Safari/537.36'
}
def __init__(self, address: str, postcode: str, rrn: str = None):
def __init__(self, address: str, postcode: str, rrn: str = None, address_postal_town: str = ""):
"""
This class is tasked with retrieving the latest EPC data from the find my epc website
:param address: The address of the property
@ -36,6 +36,10 @@ class RetrieveFindMyEpc:
self.address_cleaned = self.address.replace(",", "").replace(" ", "").lower()
self.walls = []
self.address_postal_town = address_postal_town
if self.address_postal_town:
self.address_postal_town = self.address_postal_town.replace(",", "").replace(" ", "").lower()
@staticmethod
def extract_low_carbon_sources(soup):
# Find the section header
@ -363,7 +367,12 @@ class RetrieveFindMyEpc:
extracted_address.replace(",", "").replace(" ", "").lower()
)
if not extracted_address_cleaned.startswith(self.address_cleaned):
no_primary_match = not extracted_address_cleaned.startswith(self.address_cleaned)
no_backup_match = True if not self.address_postal_town else not (
extracted_address_cleaned.startswith(self.address_postal_town)
)
if no_primary_match and no_backup_match:
continue
# If the address is a match, we can extract the data
@ -394,7 +403,9 @@ class RetrieveFindMyEpc:
return chosen_epc, epc_certificate
def retrieve_newest_find_my_epc_data(self, sap_2012_date=None, return_page=False, epc_page_source=None, rrn=None):
def retrieve_newest_find_my_epc_data(
self, sap_2012_date=None, return_page=False, epc_page_source=None, rrn=None
):
"""
For a post code and address, we pull out all the required data from the find my epc website
"""
@ -725,37 +736,13 @@ class RetrieveFindMyEpc:
return formatted_recommendations
@classmethod
def get_from_epc(cls, epc, epc_page_source=None, rrn=None):
def get_from_epc(cls, epc, epc_page_source=None, rrn=None, address_postal_town=None):
if epc_page_source is not None and rrn is None:
raise ValueError("rrn must be provided if epc_page_source is provided")
# Attempt both methods:
try:
searcher = cls(address=epc["address"], postcode=epc["postcode"])
find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn)
except Exception as e:
logger.error(f"Error retrieving find my epc data: {e}")
# We try two backup approaches. The first is to trim the final section off the end of the address
address1 = ",".join(epc["address"].split(",")[:-1])
try:
searcher = cls(address=address1, postcode=epc["postcode"])
find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn)
logger.info("Successfully retrieved find my epc data using trimmed address")
except Exception as e2:
logger.error(f"Error retrieving find my epc data using trimmed address: {e2}")
# Attempt final approach
if epc["address1"] == epc["address"]:
# There's no benefit of using the same address, so we split on comma
address1 = epc["address"].split(",")[0]
else:
address1 = epc["address1"]
# We attempt with the backup add
searcher = cls(address=address1, postcode=epc["postcode"])
find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn)
logger.info("Successfully retrieved find my epc data using backup address")
searcher = cls(address=epc["address"], postcode=epc["postcode"], address_postal_town=address_postal_town)
find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn)
non_invasive_recommendations = {
"uprn": epc["uprn"],
@ -782,7 +769,7 @@ class RetrieveFindMyEpc:
@classmethod
def get_from_epc_with_fallback(
cls, epc, epc_page, rrn, cleaned_address=None, config_address=None
cls, epc, epc_page, rrn, cleaned_address=None, config_address=None, address_postal_town=None
):
"""
Attempt get_from_epc with:
@ -814,7 +801,7 @@ class RetrieveFindMyEpc:
last_error = None
for idx, attempt in enumerate(attempts, start=1):
try:
return cls.get_from_epc(attempt, epc_page, rrn=rrn)
return cls.get_from_epc(attempt, epc_page, rrn=rrn, address_postal_town=address_postal_town)
except Exception as e:
last_error = e
logger.error(f"Attempt {idx} failed: {e}")