Merge pull request #579 from Hestia-Homes/eco-eligiblity-bug

Edge case handling after portfolio build with >3k properties
This commit is contained in:
KhalimCK 2025-12-01 05:40:45 +08:00 committed by GitHub
commit bafb501187
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 136 additions and 145 deletions

View file

@ -208,6 +208,7 @@ class SearchEpc:
# These are the address and postcode values, which we store in the database
self.address_clean = None
self.postcode_clean = None
self.address_postal_town = None
self.size = size if size is not None else 25
@ -490,7 +491,11 @@ class SearchEpc:
postcode = postcode.upper()
return address, postcode
# We also return a "postal town variant - useful for edge cases when fetching from find my EPC
address_postal_town = ", ".join(
[newest_epc["address1"], newest_epc["address2"], newest_epc["posttown"]]).strip().title()
return address, postcode, address_postal_town
def extract_epc_data(self, address=None):
@ -545,9 +550,9 @@ class SearchEpc:
return newest_epc, [], {}, "", "", None
# Retrieve postcode and address
address_epc, postcode_epc = self.format_address(newest_epc=newest_epc)
address_epc, postcode_epc, address_postal_town = self.format_address(newest_epc=newest_epc)
return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, uprn
return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, uprn, address_postal_town
@staticmethod
def filter_newest_epc(list_of_epcs: List):
@ -970,7 +975,8 @@ class SearchEpc:
if response["status"] == 200:
(
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn,
self.address_postal_town
) = self.extract_epc_data(address=self.full_address)
# Before we return, we check if we need to overwrite a SAP05 EPC
@ -1032,7 +1038,8 @@ class SearchEpc:
response = self.get_epc()
if response["status"] == 200:
(
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn,
self.address_postal_town
) = self.extract_epc_data()
return

View file

@ -704,7 +704,7 @@ class GoogleSolarApi:
# We set the target rating to EPC C, which is the typical EPC rating we would expect the
# property to achieve post retrofit of just the fabric
"energy_consumption": cls.estimate_new_consumption(
current_energy_efficiency=p.data["current-energy-efficiency"],
current_energy_efficiency=min(p.data["current-energy-efficiency"], 100),
target_efficiency="69",
current_consumption=p.estimate_electrical_consumption(
assumed_ashp_efficiency=assumptions.AVERAGE_ASHP_EFFICIENCY, exclusions=body.exclusions
@ -723,7 +723,7 @@ class GoogleSolarApi:
# We set the target rating to EPC C, which is the typical EPC rating we would expect the
# property to achieve post retrofit of just the fabric
"energy_consumption": cls.estimate_new_consumption(
current_energy_efficiency=p.data["current-energy-efficiency"],
current_energy_efficiency=min(p.data["current-energy-efficiency"], 100),
target_efficiency="69",
current_consumption=p.estimate_electrical_consumption(
assumed_ashp_efficiency=assumptions.AVERAGE_ASHP_EFFICIENCY, exclusions=body.exclusions

View file

@ -90,6 +90,7 @@ DESCRIPTIONS_TO_FUEL_TYPES = {
"Oil range cooker, no cylinder thermostat": {"fuel": "Oil", "cop": 0.85},
"Air source heat pump, Warm air, electric": {"fuel": "Electricity", "cop": AVERAGE_ASHP_EFFICIENCY / 100},
"Boiler and underfloor heating, electric": {"fuel": "Electricity", "cop": 1},
"Community scheme with CHP, mains gas": {"fuel": "Natural Gas", "cop": 0.85},
}
# These are the measure types where if there is a ventilation recommendation, we force the inclusion of it

View file

@ -1,5 +1,5 @@
from tqdm import tqdm
from sqlalchemy import insert, delete
from sqlalchemy import insert, delete, text
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from backend.app.db.models.recommendations import (
@ -170,72 +170,39 @@ def upload_recommendations(session: Session, recommendations_to_upload, property
return False
# def clear_portfolio(session: Session, portfolio_id: int):
# # Fetch all property IDs associated with the given portfolio
# property_ids = session.query(PropertyModel.id).filter(PropertyModel.portfolio_id == portfolio_id).all()
# property_ids = [p.id for p in property_ids]
#
# # Fetch all recommendation IDs associated with the properties
# recommendation_ids = session.query(Recommendation.id).filter(Recommendation.property_id.in_(property_ids)).all()
# recommendation_ids = [r.id for r in recommendation_ids]
#
# # Fetch all plan IDs associated with the portfolio
# plan_ids = session.query(Plan.id).filter(Plan.portfolio_id == portfolio_id).all()
# plan_ids = [p.id for p in plan_ids]
#
# # Delete all entries from RecommendationMaterials for these recommendations
# session.execute(
# delete(RecommendationMaterials).where(RecommendationMaterials.recommendation_id.in_(recommendation_ids))
# )
#
# # Delete all entries from PlanRecommendations that reference plans in the portfolio
# session.execute(delete(PlanRecommendations).where(PlanRecommendations.plan_id.in_(
# session.query(Plan.id).filter(Plan.portfolio_id == portfolio_id).subquery().as_scalar()
# )))
#
# # Delete FundingPackageMeasures → FundingPackage → Plan
# session.execute(
# delete(FundingPackageMeasures).where(FundingPackageMeasures.funding_package_id.in_(
# session.query(FundingPackage.id).filter(FundingPackage.plan_id.in_(plan_ids))
# ))
# )
# session.execute(
# delete(FundingPackage).where(FundingPackage.plan_id.in_(plan_ids))
# )
#
# # Delete all Plans associated with the portfolio
# session.execute(delete(Plan).where(Plan.portfolio_id == portfolio_id))
#
# # Delete all Scenarios associated with the portfolio
# session.execute(delete(Scenario).where(Scenario.portfolio_id == portfolio_id))
#
# # Delete all Recommendations associated with the properties
# session.execute(delete(Recommendation).where(Recommendation.property_id.in_(property_ids)))
#
# session.execute(
# delete(InspectionModel)
# .where(InspectionModel.property_id.in_(
# session.query(PropertyModel.id).filter(PropertyModel.portfolio_id == portfolio_id)
# ))
# .execution_options(synchronize_session=False)
# )
#
# # Now, delete the PropertyModels and related details
# # Delete PropertyTargetsModel, PropertyDetailsMeter, PropertyDetailsEpcModel, and PropertyModel
# session.execute(delete(PropertyTargetsModel).where(PropertyTargetsModel.portfolio_id == portfolio_id))
# # session.execute(delete(PropertyDetailsMeter).where(PropertyDetailsMeter.uprn.in_(property_ids)))
# session.execute(delete(PropertyDetailsEpcModel).where(PropertyDetailsEpcModel.portfolio_id == portfolio_id))
# session.execute(delete(PropertyModel).where(PropertyModel.portfolio_id == portfolio_id))
#
# # Commit the changes
# session.commit()
def chunked(iterable, size=100):
for i in range(0, len(iterable), size):
yield iterable[i:i + size]
def fast_delete_recommendations(session, chunk):
values = ",".join(f"({pid})" for pid in chunk)
sql = text(f"""
WITH ids(property_id) AS (
VALUES {values}
)
DELETE FROM recommendation r
USING ids
WHERE r.property_id = ids.property_id;
""")
session.execute(sql)
# Note; we may be able to go even faster like this:
# def delete_with_temp_table(session, chunk):
# session.execute(text("CREATE TEMP TABLE tmp_ids (id bigint) ON COMMIT DROP;"))
#
# insert_sql = "INSERT INTO tmp_ids (id) VALUES " + ",".join(f"({i})" for i in chunk)
# session.execute(text(insert_sql))
#
# session.execute(text("""
# DELETE FROM recommendation r
# USING tmp_ids t
# WHERE r.property_id = t.id;
# """))
def clear_portfolio(session: Session, portfolio_id: int, batch_size=100):
# --------------------------
# Collect IDs up-front
@ -313,14 +280,11 @@ def clear_portfolio(session: Session, portfolio_id: int, batch_size=100):
tqdm.write("Deleting Scenarios…")
session.execute(delete(Scenario).where(Scenario.portfolio_id == portfolio_id))
# Recommendations
# Recommendations - fast delete
for chunk in tqdm(chunked(property_ids, batch_size),
total=(len(property_ids) // batch_size) + 1,
desc="Deleting Recommendations"):
session.execute(
delete(Recommendation)
.where(Recommendation.property_id.in_(chunk))
)
fast_delete_recommendations(session, chunk)
# Inspections
for chunk in tqdm(chunked(property_ids, batch_size),

View file

@ -472,8 +472,6 @@ async def model_engine(body: PlanTriggerRequest):
created_at = datetime.now().isoformat()
start_ms = int(time.time() * 1000)
# TODO: if the measure is already installed, it should actually be the very first phase
try:
session.begin()
logger.info("Getting the inputs")
@ -691,7 +689,8 @@ async def model_engine(body: PlanTriggerRequest):
epc_page=epc_page,
rrn=rrn,
cleaned_address=epc_searcher.address_clean,
config_address=config["address"]
config_address=config["address"],
address_postal_town=epc_searcher.address_postal_town
)
)
@ -1042,38 +1041,47 @@ async def model_engine(body: PlanTriggerRequest):
work_package=eco_packages[p.id][2]
)
# If the solution isn't eligible, we can't really consider it
solutions = solutions[
(solutions["is_eligible"] & (solutions["scheme"] != "none")) | (solutions["scheme"] == "none")
]
if solutions["meets_upgrade_target"].any():
# If we have a solution that meets the upgrade target, we select that one
optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0]
# if handle the empty case
if solutions.empty:
scheme = "none"
funded_measures, solution = [], []
(
project_funding, total_uplift, full_project_score, partial_project_score, uplift_project_score
) = 0, 0, 0, 0, 0
else:
# Pick the cheapest
optimal_solution = solutions.iloc[0]
# This is the list of measures that we will recommend
scheme = optimal_solution["scheme"]
# If the solution isn't eligible, we can't really consider it
solutions = solutions[
(solutions["is_eligible"] & (solutions["scheme"] != "none")) | (solutions["scheme"] == "none")
]
# We create this full list of selected measures, which is used in the next section for setting
# default measures
solution = deepcopy(optimal_solution["items"]) + deepcopy(optimal_solution["unfunded_items"])
funded_measures = deepcopy(optimal_solution["items"]) if scheme != "none" else []
if solutions["meets_upgrade_target"].any():
# If we have a solution that meets the upgrade target, we select that one
optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0]
else:
# Pick the cheapest
optimal_solution = solutions.iloc[0]
# This is the total amount of funding that the project will produce (EXCLUDING uplifts) (£)
project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \
optimal_solution["partial_project_funding"]
# This is the total amount of funding associated to the uplift (£)
total_uplift = optimal_solution["total_uplift"]
# This is the funding scheme selected
# This is the full project ABS
full_project_score = optimal_solution["project_score"]
# This is the partial project ABS
partial_project_score = optimal_solution["partial_project_score"]
# This is the uplift score ABS
uplift_project_score = optimal_solution["total_uplift_score"]
# This is the list of measures that we will recommend
scheme = optimal_solution["scheme"]
# We create this full list of selected measures, which is used in the next section for setting
# default measures
solution = deepcopy(optimal_solution["items"]) + deepcopy(optimal_solution["unfunded_items"])
funded_measures = deepcopy(optimal_solution["items"]) if scheme != "none" else []
# This is the total amount of funding that the project will produce (EXCLUDING uplifts) (£)
project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \
optimal_solution["partial_project_funding"]
# This is the total amount of funding associated to the uplift (£)
total_uplift = optimal_solution["total_uplift"]
# This is the funding scheme selected
# This is the full project ABS
full_project_score = optimal_solution["project_score"]
# This is the partial project ABS
partial_project_score = optimal_solution["partial_project_score"]
# This is the uplift score ABS
uplift_project_score = optimal_solution["total_uplift_score"]
else:
# We optimise and then we determine eligibility for funding, based on the measures selected
optimiser = (

View file

@ -26,21 +26,21 @@ class AnnualBillSavings:
AVERAGE_ELECTRICITY_CONSUMPTION = 2700
AVERAGE_GAS_CONSUMPTION = 11500
# Latest price cap figures from Ofgem are for April 2024
# Latest price cap figures from Ofgem are for Jan 2026 to March 2026
# https://www.ofgem.gov.uk/energy-price-cap
ELECTRICITY_PRICE_CAP = 0.2573
GAS_PRICE_CAP = 0.0633
# This is the most recent export payment figure, at 9.28p/kWh
ELECTRICITY_PRICE_CAP = 0.2769
GAS_PRICE_CAP = 0.593
# This is the most recent export payment figure, at 13p/kWh - Updated Nov 2025
# Smart export guarantee rates can be found here:
# https://www.sunsave.energy/solar-panels-advice/exporting-to-the-grid/best-seg-rates
ELECTRICITY_EXPORT_PAYMENT = 0.0928
ELECTRICITY_EXPORT_PAYMENT = 0.13
# This is a weighted mean of the price caps, using the consumption figures above as weights
PRICE_FACTOR = 0.09549999999999999
# Daily standard charge, based on average across England, Scotland and Wales, and includes VAT
DAILY_STANDARD_CHARGE_GAS = 0.2982
DAILY_STANDARD_CHARGE_ELECTRICITY = 0.5137
DAILY_STANDARD_CHARGE_GAS = 0.3509
DAILY_STANDARD_CHARGE_ELECTRICITY = 0.5475
# Based on https://www.nottenergy.com/advice-and-tools/project-energy-cost-comparison
# For July 2024. These quotes are based on the east midlands region, so we

View file

@ -48,6 +48,12 @@ class FloorAttributes(Definitions):
"crog, inswleiddio cyfyngedig (rhagdybiaeth)": "suspended, limited insulation (assumed)",
}
REMAP = {
# Have only seen this once - though perhaps need to investigate older EPCs in the production of EPC clean.
# When looking at a newer EPC, which had been re-assessed as another dwelling below
"above unheated space or full exposed": "(another dwelling below)",
}
def __init__(self, description: str):
self.description: str = description.lower()
@ -62,6 +68,10 @@ class FloorAttributes(Definitions):
# Try and perform a translation, incase it's in welsh
self.translate_welsh_text()
# Remap known issues
if self.description in self.REMAP:
self.description = self.REMAP[self.description]
# We handle seemind occurances of mixed translations
self.description = handle_mixed_translation(self.description)

View file

@ -375,6 +375,12 @@ clean_floor_cases = [
'thermal_transmittance_unit': 'w/m-¦k', 'is_assumed': False,
'is_to_unheated_space': False, 'is_to_external_air': False, 'is_suspended': False, 'is_solid': False,
'another_property_below': False, 'insulation_thickness': None
},
{
# This example gets remapped to another dwelling below
"description": "Above unheated space or full exposed",
'thermal_transmittance': 0, 'thermal_transmittance_unit': 'w/m-¦k', 'is_assumed': False,
'is_to_unheated_space': False, 'is_to_external_air': False, 'is_suspended': False, 'is_solid': False,
'another_property_below': True, 'insulation_thickness': None
}
]

View file

@ -22,7 +22,7 @@ class RetrieveFindMyEpc:
'Chrome/111.0.0.0 Safari/537.36'
}
def __init__(self, address: str, postcode: str, rrn: str = None):
def __init__(self, address: str, postcode: str, rrn: str = None, address_postal_town: str = ""):
"""
This class is tasked with retrieving the latest EPC data from the find my epc website
:param address: The address of the property
@ -36,6 +36,10 @@ class RetrieveFindMyEpc:
self.address_cleaned = self.address.replace(",", "").replace(" ", "").lower()
self.walls = []
self.address_postal_town = address_postal_town
if self.address_postal_town:
self.address_postal_town = self.address_postal_town.replace(",", "").replace(" ", "").lower()
@staticmethod
def extract_low_carbon_sources(soup):
# Find the section header
@ -363,7 +367,12 @@ class RetrieveFindMyEpc:
extracted_address.replace(",", "").replace(" ", "").lower()
)
if not extracted_address_cleaned.startswith(self.address_cleaned):
no_primary_match = not extracted_address_cleaned.startswith(self.address_cleaned)
no_backup_match = True if not self.address_postal_town else not (
extracted_address_cleaned.startswith(self.address_postal_town)
)
if no_primary_match and no_backup_match:
continue
# If the address is a match, we can extract the data
@ -394,7 +403,9 @@ class RetrieveFindMyEpc:
return chosen_epc, epc_certificate
def retrieve_newest_find_my_epc_data(self, sap_2012_date=None, return_page=False, epc_page_source=None, rrn=None):
def retrieve_newest_find_my_epc_data(
self, sap_2012_date=None, return_page=False, epc_page_source=None, rrn=None
):
"""
For a post code and address, we pull out all the required data from the find my epc website
"""
@ -725,37 +736,13 @@ class RetrieveFindMyEpc:
return formatted_recommendations
@classmethod
def get_from_epc(cls, epc, epc_page_source=None, rrn=None):
def get_from_epc(cls, epc, epc_page_source=None, rrn=None, address_postal_town=None):
if epc_page_source is not None and rrn is None:
raise ValueError("rrn must be provided if epc_page_source is provided")
# Attempt both methods:
try:
searcher = cls(address=epc["address"], postcode=epc["postcode"])
find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn)
except Exception as e:
logger.error(f"Error retrieving find my epc data: {e}")
# We try two backup approaches. The first is to trim the final section off the end of the address
address1 = ",".join(epc["address"].split(",")[:-1])
try:
searcher = cls(address=address1, postcode=epc["postcode"])
find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn)
logger.info("Successfully retrieved find my epc data using trimmed address")
except Exception as e2:
logger.error(f"Error retrieving find my epc data using trimmed address: {e2}")
# Attempt final approach
if epc["address1"] == epc["address"]:
# There's no benefit of using the same address, so we split on comma
address1 = epc["address"].split(",")[0]
else:
address1 = epc["address1"]
# We attempt with the backup add
searcher = cls(address=address1, postcode=epc["postcode"])
find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn)
logger.info("Successfully retrieved find my epc data using backup address")
searcher = cls(address=epc["address"], postcode=epc["postcode"], address_postal_town=address_postal_town)
find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn)
non_invasive_recommendations = {
"uprn": epc["uprn"],
@ -782,7 +769,7 @@ class RetrieveFindMyEpc:
@classmethod
def get_from_epc_with_fallback(
cls, epc, epc_page, rrn, cleaned_address=None, config_address=None
cls, epc, epc_page, rrn, cleaned_address=None, config_address=None, address_postal_town=None
):
"""
Attempt get_from_epc with:
@ -814,7 +801,7 @@ class RetrieveFindMyEpc:
last_error = None
for idx, attempt in enumerate(attempts, start=1):
try:
return cls.get_from_epc(attempt, epc_page, rrn=rrn)
return cls.get_from_epc(attempt, epc_page, rrn=rrn, address_postal_town=address_postal_town)
except Exception as e:
last_error = e
logger.error(f"Attempt {idx} failed: {e}")

View file

@ -502,6 +502,10 @@ def optimise_with_funding_paths(
solutions = pd.DataFrame(solutions)
if solutions.empty:
# We return a blank dataframe
return solutions
# Given the scheme, we now check if the packages are eligible. If they *are* eligible, but they don't meet the
# final upgrade target, we then look to perform a final optimisation pass to meet the target gain.
solutions["meets_upgrade_target"] = solutions["total_gain"] >= target_gain - 0.1
@ -779,6 +783,10 @@ def run_optimizer(input_measures, budget=None, sub_target_gain=None, allow_slack
Thin wrapper over your optimisers.
Returns: list[dict] selected_options
"""
if not input_measures:
return None, 0.0, 0.0
if budget is not None:
opt = GainOptimiser(
input_measures, max_cost=budget, max_gain=(sub_target_gain or float("inf")),