diff --git a/backend/Property.py b/backend/Property.py index ae79f250..bc5660e8 100644 --- a/backend/Property.py +++ b/backend/Property.py @@ -889,6 +889,8 @@ class Property: "current_energy_demand": self.current_energy_consumption, "current_energy_demand_heating_hotwater": self.current_energy_consumption_heating_hotwater, "estimated": self.data.get("estimated", False), + # We indicate if we've overwritten a SAP 05 EPC + "sap_05_overwritten": self.data.get("sap_05_overwritten", False), **self.current_energy_bill } diff --git a/backend/SearchEpc.py b/backend/SearchEpc.py index c47e82c4..61c9cc30 100644 --- a/backend/SearchEpc.py +++ b/backend/SearchEpc.py @@ -199,7 +199,7 @@ class SearchEpc: ) self.data = None - self.newest_epc = None + self.newest_epc = {} self.older_epcs = None self.full_sap_epc = None self.metadata = None @@ -214,6 +214,9 @@ class SearchEpc: self.property_type = property_type self.fast = fast + # By default, this is set to false. This flag indicates whether we should overwrite SAP 2005 entires. + self.overwrite_sap05 = False + def set_strict_property_type_search(self): """ This method sets the strict property type search flag to True. When this flag is set, the search will @@ -531,6 +534,9 @@ class SearchEpc: if uprns: uprn = uprns.pop() + # Convert to int + if not pd.isnull(uprn): + uprn = int(uprn) else: newest_epc["uprn-source"] = self.UPRN_SOURCE_SIMULATED uprn = hash(self.address1 + self.postcode) @@ -649,6 +655,11 @@ class SearchEpc: epc_data["lodgement-datetime"] > (pd.Timestamp.now() - pd.DateOffset(years=10)) ] + # Regardless of whether or not we exclude old, we drop any SAP05 entries, which will be problematic + # if we include them + if not epc_data.empty: + epc_data = epc_data[~epc_data["mainheat-description"].str.lower().str.contains("sap05:")] + if not epc_data.empty: # Further processing of the EPC data @@ -694,6 +705,18 @@ class SearchEpc: estimation_built_form = "End-Terraced" elif (built_form == "") or (pd.isnull(built_form)): estimation_built_form = epc_built_form + elif built_form == "Enclosed Mid-Terrace": + # We check if we have any enclosed and if not, we fall back to mid-terrace + if sum(epc_data["built-form"] == "Enclosed Mid-Terrace") > 0: + estimation_built_form = "Enclosed Mid-Terrace" + else: + estimation_built_form = "Mid-Terrace" + elif built_form == "Enclosed End-Terrace": + # An enclosed end terrace has three two external facing walls so we fall back to mid-terrace + if sum(epc_data["built-form"] == "Enclosed End-Terrace") > 0: + estimation_built_form = "Enclosed Mid-Terrace" + else: + estimation_built_form = "Mid-Terrace" else: estimation_built_form = built_form @@ -917,7 +940,7 @@ class SearchEpc: return agg[key].values[0] - def find_property(self, skip_os=False, api_data=None): + def find_property(self, skip_os=False, api_data=None, overwrite_sap05=False): """ This method will attempt to identify a property. It will, at first, use the EPC api to try and find the EPC for the property and the associated UPRN. If this fails, it will use the Ordnance Survey API to @@ -931,6 +954,10 @@ class SearchEpc: :param skip_os: If True, the ordnance survey api will be skipped and only the EPC api will be used :param api_data: If provided, this data will be used instead of querying the EPC api + :param overwrite_sap05: For extrememly old, SAP05 EPCs, we may wish to overwrite them with an estimated EPC. + This is because the SAP05 EPCs will have missing information such as the main heating + will be described as SAP05:Main-Heating, which isn't particularly useful for the + purpose of providing recommendations. """ # Step 1: use the epc api to find the property and uprn @@ -944,8 +971,22 @@ class SearchEpc: ( self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn ) = self.extract_epc_data(address=self.full_address) + + # Before we return, we check if we need to overwrite a SAP05 EPC + # If we have don't have SAP05 in the heating description and overwrite_sap05 is False, we return + is_sap_o5 = "SAP05:" in self.newest_epc.get("mainheat-description", "") + if ( + (not is_sap_o5) and (not overwrite_sap05) and (response["status"] == 200) + ): return + # By default, we don't exclude old but we will do, when we are estimating to overwrite a SAP05 EPC + lmks_to_drop, exclude_old = [], False + if is_sap_o5: + self.overwrite_sap05 = True + lmks_to_drop = [self.newest_epc["lmk-key"]] + exclude_old = True + # Step 2: If we don't have an EPC, we use the ordnance survey api to find the uprn if skip_os: if self.ordnance_survey_client.property_type is not None: @@ -954,10 +995,18 @@ class SearchEpc: property_type=self.ordnance_survey_client.property_type, built_form=self.ordnance_survey_client.built_form, heating_system=self.heating_system, - associated_uprns=self.associated_uprns + associated_uprns=self.associated_uprns, + lmks_to_drop=lmks_to_drop, + exclude_old=exclude_old ) + + if self.overwrite_sap05: + # We keep a record of the fact that we have performed a SAP05 overwrite + estimated_epc["sap_05_overwritten"] = True + + # If we have overwritten a SAP05 EPC, we need to update older_epcs too + self.older_epcs = [] if not self.overwrite_sap05 else [self.newest_epc.copy()] self.newest_epc = estimated_epc - self.older_epcs = [] self.full_sap_epc = {} # Finally, set a standardised address 1 and postcode @@ -1000,6 +1049,20 @@ class SearchEpc: self.postcode_clean = self.ordnance_survey_client.postcode_os return + def set_uprn_source(self, file_format): + """ + Utility function to set the uprn source based on the file format. Only works for domna_asset_lists + and this is very much placeholder until we standardised our input data formats + :param file_format: + :return: + """ + + if not self.newest_epc: + raise ValueError("No EPC data available to set UPRN source - run find_property first") + + if self.newest_epc.get("estimated") and file_format == "domna_asset_list" and (self.newest_epc["uprn"] < 0): + self.newest_epc["uprn-source"] = self.UPRN_SOURCE_SIMULATED + def check_attribute_variations(self): attribute_map = { "walls-description": { @@ -1057,7 +1120,7 @@ class SearchEpc: return "ground" def get_metadata(self): - if self.newest_epc is None: + if not self.newest_epc: raise ValueError("No EPC data available") # We check if the property has ever been downgraded on SAP diff --git a/backend/app/db/models/materials.py b/backend/app/db/models/materials.py index 99759438..8a524491 100644 --- a/backend/app/db/models/materials.py +++ b/backend/app/db/models/materials.py @@ -45,7 +45,10 @@ class MaterialType(enum.Enum): solar_pv = "solar_pv" solar_battery = "solar_battery" scaffolding = "scaffolding" + # Heating systems high_heat_retention_storage_heaters = "high_heat_retention_storage_heaters" + air_soruce_heat_pump = "air_soruce_heat_pump" + boiler_upgrade = "boiler_upgrade" sealing_fireplace = "sealing_fireplace" roomstat_programmer_trvs = "roomstat_programmer_trvs" time_temperature_zone_control = "time_temperature_zone_control" diff --git a/backend/app/db/models/portfolio.py b/backend/app/db/models/portfolio.py index fbe9661b..7fec8c14 100644 --- a/backend/app/db/models/portfolio.py +++ b/backend/app/db/models/portfolio.py @@ -175,6 +175,7 @@ class PropertyDetailsEpcModel(Base): current_energy_demand = Column(Float) current_energy_demand_heating_hotwater = Column(Float) estimated = Column(Boolean, default=False) + sap_05_overwritten = Column(Boolean, default=False) # Include estimates for energy bills, across the different types of energy heating_cost_current = Column(Float) hot_water_cost_current = Column(Float) diff --git a/backend/engine/engine.py b/backend/engine/engine.py index 217be3c3..c0261e57 100644 --- a/backend/engine/engine.py +++ b/backend/engine/engine.py @@ -637,12 +637,8 @@ async def model_engine(body: PlanTriggerRequest): epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None) # For the moment, our OS API access is unavailable, so we skip and interpolate - epc_searcher.find_property(skip_os=True, api_data=epc_api_data) - - if epc_searcher.newest_epc.get("estimated") and body.file_format == "domna_asset_list" and ( - epc_searcher.newest_epc["uprn"] < 0 - ): - epc_searcher.newest_epc["uprn-source"] = epc_searcher.UPRN_SOURCE_SIMULATED + epc_searcher.find_property(skip_os=True, api_data=epc_api_data, overwrite_sap05=True) + epc_searcher.set_uprn_source(file_format=body.file_format) # We check for an energy assessment we have performed on this property: energy_assessment = db_funcs.energy_assessment_functions.get_latest_assessment_by_uprn( @@ -658,12 +654,6 @@ async def model_engine(body: PlanTriggerRequest): if not is_new and not body.multi_plan: continue - if epc_searcher.newest_epc is None: - raise ValueError( - "No EPCs found for this property and did not estimate - likely need to provide a" - "property type and built form" - ) - if is_new: db_funcs.property_functions.create_property_targets( session, @@ -690,35 +680,20 @@ async def model_engine(body: PlanTriggerRequest): uprn=epc_searcher.uprn, ) # Pull this out as it may get overwritten - property_non_invasive_recommendations = req_data.non_invasive_recommendations - patch = req_data.patch + property_non_invasive_recommendations, patch = req_data.non_invasive_recommendations, req_data.patch # if we have a remote assment data type, we pull the additional data and include it epc_page_source = {} if (body.event_type == "remote_assessment") and not (epc_searcher.newest_epc.get("estimated")): - try: - property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc( - epc_searcher.newest_epc, epc_page, rrn=rrn + property_non_invasive_recommendations, patch, epc_page_source = ( + RetrieveFindMyEpc.get_from_epc_with_fallback( + epc=epc_searcher.newest_epc, + epc_page=epc_page, + rrn=rrn, + cleaned_address=epc_searcher.address_clean, + config_address=config["address"] ) - except Exception as e: - logger.error(f"Failed to retrieve without cleaning address {e}") - try: - epc_to_use = deepcopy(epc_searcher.newest_epc) - for k in ["address", "address1"]: - epc_to_use[k] = epc_searcher.address_clean - property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc( - epc_to_use, epc_page, rrn=rrn - ) - except Exception as e: - # Final attempt - logger.error(f"Failed to retrieve without cleaning address {e}") - epc_to_use = deepcopy(epc_searcher.newest_epc) - for k in ["address", "address1"]: - epc_to_use[k] = config["address"] - property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc( - epc_to_use, epc_page, rrn=rrn - ) - # If we have a property type, this means when we pull the epc data, we might need to make a patch + ) epc_records = patch_epc(patch, epc_records) @@ -1190,6 +1165,8 @@ async def model_engine(body: PlanTriggerRequest): # when we have buildings, we tweak our solar PV recommendations as if one unit needs it, we apply it to all # of them # TODO: We can probably do better and optimise at the building level - this is temp + # Idea: - optimise all measures except solar at the unit level. Then, test with and without solar for + # all units at the same time logger.info("Adjusting solar PV recommendations for buildings") building_ids = set([p.building_id for p in input_properties if p.building_id is not None]) diff --git a/etl/find_my_epc/RetrieveFindMyEpc.py b/etl/find_my_epc/RetrieveFindMyEpc.py index 519c3e52..e28af4f5 100644 --- a/etl/find_my_epc/RetrieveFindMyEpc.py +++ b/etl/find_my_epc/RetrieveFindMyEpc.py @@ -1,7 +1,8 @@ import time import re -import pandas as pd import requests +import pandas as pd +from copy import deepcopy from bs4 import BeautifulSoup from datetime import datetime @@ -697,6 +698,7 @@ class RetrieveFindMyEpc: "Increase loft insulation to 250mm": ["loft_insulation"], "Solar photovoltaics panels, 25% of roof area": ["solar_pv"], 'Air or ground source heat pump': ["air_source_heat_pump"], + "Add PV Battery": ["solar_pv_battery"], } survey = True @@ -777,3 +779,44 @@ class RetrieveFindMyEpc: } return non_invasive_recommendations, patch, page_source + + @classmethod + def get_from_epc_with_fallback( + cls, epc, epc_page, rrn, cleaned_address=None, config_address=None + ): + """ + Attempt get_from_epc with: + 1) Original EPC + 2) EPC with cleaned address + 3) EPC with configured address + in that order. + """ + + # The data we'll use to attempt retrieval + # 1) Original + attempts = [epc] + + # 2) Cleaned + if cleaned_address: + modified = deepcopy(epc) + for k in ["address", "address1"]: + modified[k] = cleaned_address + attempts.append(modified) + + # 3) Config address fallback + if config_address: + modified = deepcopy(epc) + for k in ["address", "address1"]: + modified[k] = config_address + attempts.append(modified) + + # Iterate attempts + last_error = None + for idx, attempt in enumerate(attempts, start=1): + try: + return cls.get_from_epc(attempt, epc_page, rrn=rrn) + except Exception as e: + last_error = e + logger.error(f"Attempt {idx} failed: {e}") + + raise RuntimeError(f"All EPC retrieval attempts failed: {last_error}")