import os import time import re from urllib.parse import urlencode import usaddress import pandas as pd import numpy as np from epc_api.client import EpcClient from backend.OrdnanceSurvey import OrdnanceSuveyClient from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes from etl.epc_clean.epc_attributes.FloorAttributes import FloorAttributes from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes from BaseUtility import Definitions from utils.logger import setup_logger from typing import List from thefuzz import process from backend.app.utils import sap_to_epc logger = setup_logger() vartypes = { 'low-energy-fixed-light-count': "Int64", # 'address': 'str', # 'uprn-source': 'str', 'floor-height': 'float', 'heating-cost-potential': 'float', 'unheated-corridor-length': 'float', 'hot-water-cost-potential': 'float', 'construction-age-band': 'str', 'potential-energy-rating': 'str', 'mainheat-energy-eff': 'str', 'windows-env-eff': 'str', 'lighting-energy-eff': 'str', 'environment-impact-potential': "Int64", 'glazed-type': 'str', 'heating-cost-current': 'float', # 'address3': 'str', 'mainheatcont-description': 'str', 'sheating-energy-eff': 'str', 'property-type': 'str', 'local-authority-label': 'str', 'fixed-lighting-outlets-count': "Int64", 'energy-tariff': 'str', 'mechanical-ventilation': 'str', 'hot-water-cost-current': 'str', 'county': 'str', # 'postcode': 'str', 'solar-water-heating-flag': 'str', 'constituency': 'str', 'co2-emissions-potential': 'float', 'number-heated-rooms': 'float', 'floor-description': 'str', 'energy-consumption-potential': 'float', 'local-authority': 'str', 'built-form': 'str', 'number-open-fireplaces': "Int64", 'windows-description': 'str', 'glazed-area': 'str', # 'inspection-date': str, 'mains-gas-flag': 'str', 'co2-emiss-curr-per-floor-area': 'float', # 'address1': 'str', 'heat-loss-corridor': 'str', 'flat-storey-count': "Int64", 'constituency-label': 'str', 'roof-energy-eff': 'str', 'total-floor-area': 'float', 'building-reference-number': 'str', 'environment-impact-current': 'float', 'co2-emissions-current': 'float', 'roof-description': 'str', 'floor-energy-eff': 'str', 'number-habitable-rooms': 'float', # 'address2': 'str', 'hot-water-env-eff': 'str', 'posttown': 'str', 'mainheatc-energy-eff': 'str', 'main-fuel': 'str', 'lighting-env-eff': 'str', 'windows-energy-eff': 'str', 'floor-env-eff': 'str', 'sheating-env-eff': 'str', 'lighting-description': 'str', 'roof-env-eff': 'str', 'walls-energy-eff': 'str', 'photo-supply': 'float', 'lighting-cost-potential': 'float', 'mainheat-env-eff': 'str', 'multi-glaze-proportion': 'float', 'main-heating-controls': 'str', # 'lodgement-datetime', 'flat-top-storey': 'str', 'current-energy-rating': 'str', 'secondheat-description': 'str', 'walls-env-eff': 'str', 'transaction-type': 'str', # 'uprn': "Int64", 'current-energy-efficiency': 'Int64', 'energy-consumption-current': 'float', 'mainheat-description': 'str', 'lighting-cost-current': 'float', # 'lodgement-date', 'extension-count': "Int64", 'mainheatc-env-eff': 'str', # 'lmk-key': 'str', 'wind-turbine-count': "Int64", 'tenure': 'str', 'floor-level': 'str', 'potential-energy-efficiency': "Int64", 'hot-water-energy-eff': 'str', 'low-energy-lighting': 'float', 'walls-description': 'str', 'hotwater-description': 'str' } class SearchEpc: """ Given address information about a home, this class is responsible for retrieving the EPC data associated to the property. For a home, we might have address lines 1, 2, 3 and 4, as well as a postcode. Often, simply searching the EPC database with address line 1 and postcode will be enough to find the property, but there are some cases where this is not true and we might need to utilise other combinations about the home to find the property """ # If we create the uprn based on a hash, we mark it as simulated UPRN_SOURCE_SIMULATED = "SIMULATED" MAX_RETRIES = 5 SUCCESS = { "status": 200, "message": "success", "error": None } NODATA = { "status": 204, "message": "no data", "error": None } # Keys that we check for missing values to determine if the EPC is incomplete CHECK_MISSING_KEYS = [ "lighting-cost-current", "heating-cost-current", "hot-water-cost-current", "energy-consumption-potential" ] def __init__( self, address1: str, postcode: str, auth_token: str, os_api_key: str, full_address: str | None = None, max_retries: int = None, uprn: [int, None] = None, size=None, property_type=None, fast=False, heating_system: [str, None] = None, associated_uprns: [List[int] | None] = None ): """ Address lines 1 and postcode are mandatory fields. The other address lines are optional but can be used to find the epc for the home, if address1 and postcode are insufficient If you wish to run a strict property type search, please run set_strict_property_type_search() :param address1: string, propery's address line 1 :param postcode: string, propery's postcode :param full_address: string, optional parameter, the full address of the property :param max_retries: int, optional, number of retries to make when searching the api :param uprn: int, optional, the uprn of the property :param size: int, optional, the number of results to return. If not provided, defaults to 25 which is the api's default :param property_type: str, optional, the property type of the property, if known before hand :param fast: bool, optional, if true, the extract_epc_data method will skip some processing to return results faster :param heating_system: str, optional, the heating system of the property, if known before hand :param associated_uprns: list of int, optional, list of associated uprns for the property. E.g. other units in a block of flats """ self.address1 = address1 self.postcode = postcode self.full_address = full_address if full_address is not None else self.address1 self.uprn = uprn self.house_number = self.get_house_number(self.address1) self.numeric_house_number = self.extract_numeric_housenumber_part(self.house_number) self.associated_uprns = associated_uprns if associated_uprns is not None else [] # property attributes self.heating_system = heating_system self.max_retries = max_retries if max_retries is not None else self.MAX_RETRIES self.client = EpcClient(auth_token=auth_token) self.ordnance_survey_client = OrdnanceSuveyClient( address=self.address1, postcode=self.postcode, api_key=os_api_key ) self.data = None self.newest_epc = {} self.older_epcs = None self.full_sap_epc = None self.metadata = None self.strict_property_type_search = False # These are the address and postcode values, which we store in the database self.address_clean = None self.postcode_clean = None self.address_postal_town = None self.size = size if size is not None else 25 self.property_type = property_type self.fast = fast # By default, this is set to false. This flag indicates whether we should overwrite SAP 2005 entires. self.overwrite_sap05 = False # Be default, this is set to false. This flag indicates whether we should take the existing EPC, but use # the estimated EPC to clean missings self.clean_missing_on_expired = False def set_strict_property_type_search(self): """ This method sets the strict property type search flag to True. When this flag is set, the search will only return results that match the specified property type. :return: """ self.strict_property_type_search = True @staticmethod def get_house_number(address: str, postcode=None) -> str | None: """ This method uses the usaddress library to parse an address and extract the primary house or flat number. """ try: # Updated regex to catch house numbers including alphanumeric ones pattern = r'(?i)(?:flat|apartment|room)\s*(\d+\w*)|^\s*(\d+\w*)' match1 = re.search(pattern, address) if match1: return next(g for g in match1.groups() if g is not None) pattern2 = r'(?i)(flat|apartment|room)\s*([a-zA-Z]?\d+[a-zA-Z]?)' match2 = re.search(pattern2, address) if match2: return match2.group(2) parsed = usaddress.parse(address) # First, try to get the 'OccupancyIdentifier' if 'OccupancyType' is detected for part, type_ in parsed: if type_ == 'OccupancyIdentifier': if postcode is not None: if part == postcode.split(" ")[0]: continue if part == postcode.split(" ")[1]: continue return part.rstrip(",") # This assumes the first 'OccupancyIdentifier' after 'OccupancyType' is the primary # number # Fallback to 'AddressNumber' if no 'OccupancyIdentifier' is found address_number = next((part for part, type_ in parsed if type_ == 'AddressNumber'), None) if address_number: return address_number.replace(",", "") # Remove any trailing commas except Exception as e: raise Exception(f"Error parsing address: {e}") return None @staticmethod def extract_numeric_housenumber_part(house_number: str | None) -> int | None: # Regular expression to find the first occurrence of one or more digits if house_number is None: return None match = re.search(r'\d+', house_number) if match: return int(match.group()) else: return None def _get_epc(self, params, size): """ To be called by get_epc() - not for external usage """ url = os.path.join(self.client.domestic.host, "search") if size: url += "?" + urlencode({k: v for k, v in {"size": size}.items() if v}) for retry in range(self.max_retries): try: response = self.client.domestic.call(method="get", url=url, params=params) if response: self.data = response return { "response": response, "msg": self.SUCCESS } if retry > 0: logger.info("Failed previous attempt but retry successful") # If we got nothing, final try if not response: return { "response": response, "msg": self.NODATA } except Exception as e: if retry < self.max_retries - 1: # If not the last retry, wait for 3 seconds before retrying time.sleep(3) else: # If it's the last retry, we continue return { "response": {}, "msg": { "status": 500, "message": "Could not retrieve EPC data", "error": str(e) } } def get_epc(self, params=None, size=None): # Get the EPC data with retries size = size if size is not None else self.size if params: output = self._get_epc(params=params, size=size) if output["msg"]["status"] == 200: self.data = output["response"] return output["msg"] if not self.uprn and not self.address1 and not self.postcode: raise ValueError("No search parameters provided") uprn_params = {"uprn": self.uprn} if self.uprn else {} address_params = {} if self.address1: address_params["address"] = self.address1 if self.postcode: address_params["postcode"] = self.postcode if self.strict_property_type_search and self.property_type: address_params["property-type"] = self.property_type.lower() # We attempt the search with uprn params data = {"rows": []} api_response = {} if uprn_params: api_response = self._get_epc(params=uprn_params, size=size) if api_response["msg"]["status"] == 200: data["rows"].extend(api_response["response"]["rows"]) # If we were unsuccessful, we then make a second attempt to fetch the data. We find that # properties are sometimes listed under the wrong UPRN if address_params: api_response = self._get_epc(params=address_params, size=size) if api_response["msg"]["status"] == 200: # We update the data with the correct uprn if self.uprn: for x in api_response["response"]["rows"]: if pd.isnull(x["uprn"]): x["uprn"] = self.uprn data["rows"].extend(api_response["response"]["rows"]) # We no de-dupe on lmk-key to avoid duplicates seen = set() data["rows"] = [ row for row in data["rows"] if row["lmk-key"] not in seen and not seen.add(row["lmk-key"]) ] # Overwrite the data self.data = data if data["rows"]: api_response["msg"] = self.SUCCESS return api_response["msg"] def filter_rows(self, rows, property_type=None, address=None): """ This method should not be used when property_type and address are both not None :param rows: :param property_type: :param address: :return: """ # Given the results from the EPC api, attempts to reduce the number of rows uprns = {r["uprn"] for r in rows} if (property_type is None) and (address is None): return rows unique_property_types = {r["property-type"] for r in rows} is_just_a_house = (len(unique_property_types) == 1) & ( ("House" in unique_property_types) | ("Bungalow" in unique_property_types) ) # We allow for variation in property type across flats/maisonettes # If we know that we have a flat/maisonette, we allow for both property types # Make sure we have not JUST a house, or not JUST a flat/maisonette if property_type in ["Flat", "Maisonette"] and not is_just_a_house: if (((len(uprns) == 1) and ((len(unique_property_types) == 1) ) or unique_property_types == {"Flat", "Maisonette"})): return rows if property_type is not None: # We can do a filter on the property type rows_filtered = [r for r in rows if r["property-type"] == property_type] if rows_filtered: return rows_filtered return rows if address is not None: # We can do a filter on the property type # We check if the full address contains the postcode and if it does, remove if self.postcode in address: address = address.replace(self.postcode, "").strip().rstrip(",") # We check if post town is included in the address if any([r["posttown"].lower() in address.lower() for r in rows]): best_match1 = process.extractOne( address, [", ".join([r["address"], r["posttown"]]) for r in rows], score_cutoff=0 ) best_match2 = process.extractOne( address, [", ".join([r["address"]]) for r in rows], score_cutoff=0 ) # Pick the largest score if best_match1[1] == best_match2[1]: # if thery're the same, we'll work under the assumption that the addresses are the same and we'll # take whichever has the newest EPC rows_filtered = [ r for r in rows if (", ".join([r["address"], r["posttown"]]) == best_match1[0]) or (r["address"] == best_match2[0]) ] rows_filtered = [ r for r in rows_filtered if r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in rows_filtered]) ] elif best_match1[1] > best_match2[1]: # Get all of the scores - make sure we keep uprn rows_filtered = [ r for r in rows if ( (", ".join([r["address"], r["posttown"]]) == best_match1[0]) or (str(r["uprn"]) == str(self.uprn)) ) ] else: # Get all of the scores rows_filtered = [ r for r in rows if (r["address"] == best_match2[0]) or (str(r["uprn"]) == str(self.uprn)) ] # If we have multiple, we filter on newest lodgment date if len(rows_filtered) > 1: rows_filtered = [ r for r in rows_filtered if r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in rows_filtered]) ] else: best_match = process.extractOne(address, [r["address"] for r in rows], score_cutoff=0) # Get the UPRN for the best match best_match_uprn = {r["uprn"] for r in rows if r["address"] == best_match[0]}.pop() # Get all of the scores rows_filtered = [ r for r in rows if (r["address"] == best_match[0]) or (r["uprn"] == best_match_uprn) ] if rows_filtered: return rows_filtered return rows raise ValueError("property type and address cannot both be None, at least one must be provided") @staticmethod def format_address(newest_epc): """ Format address and postcode for storage in the database """ postcode = newest_epc["postcode"] address = newest_epc["address"] # Format them address = address.replace(postcode, "").strip() address = address.rstrip(",").strip() address = address.title() postcode = postcode.upper() # We also return a "postal town variant - useful for edge cases when fetching from find my EPC address_postal_town = ", ".join( [newest_epc["address1"], newest_epc["address2"], newest_epc["posttown"]]).strip().title() return address, postcode, address_postal_town def extract_epc_data(self, address=None): """ Given a successful search, this method will format the data and return it :return: """ if self.data is None: raise ValueError("data is missing, run search first") rows = self.data["rows"] # We perform some checks on the rows # Firstly, we should only have 1 urpn so if we have multiple, we'll need to filter down the # property further rows = self.filter_rows(rows, property_type=self.property_type, address=None) rows = self.filter_rows(rows, property_type=None, address=address) # We now check for a full sap epc: full_sap_epc = [r for r in rows if r["transaction-type"] == "new dwelling"] full_sap_epc = full_sap_epc[0] if full_sap_epc else {} # Finally, we identify the newest epc and the rest, and then return newest_epc, older_epcs = self.filter_newest_epc(list_of_epcs=rows) # Ge the uprn from the newest record for this home uprns = {r["uprn"] for r in rows if r["uprn"]} # We can sometimes have no uprn for a property if (len(uprns) == 0) and len(rows) > 0: logger.warning("Found data but missing uprn") elif len(uprns) != 1: # There is a possibility that we have multiple UPRNs for a single property, which is an error addresses = {r["address"] for r in rows} if len(addresses) == 1: # Take the uprn from the most recent uprns = {newest_epc["uprn"]} else: raise ValueError("Multiple UPRNs found - investigate me") if uprns: uprn = uprns.pop() # Convert to int if not pd.isnull(uprn): uprn = int(uprn) else: newest_epc["uprn-source"] = self.UPRN_SOURCE_SIMULATED uprn = hash(self.address1 + self.postcode) if self.fast: return newest_epc, [], {}, "", "", None # Retrieve postcode and address address_epc, postcode_epc, address_postal_town = self.format_address(newest_epc=newest_epc) return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, uprn, address_postal_town @staticmethod def filter_newest_epc(list_of_epcs: List): newest_response = [ r for r in list_of_epcs if r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in list_of_epcs]) ] if not newest_response: return {}, [] if len(newest_response) != 1: # It is possible (but rare, and likely an error on EPC lodgement) that we have multiple EPCs that # were lodged at the exact same time. In this case, we will take the first one newest_response = [newest_response[0]] older_epcs = [epc for epc in list_of_epcs if epc["lmk-key"] != newest_response[0]["lmk-key"]] return newest_response[0], older_epcs @staticmethod def _get_epc_mode(col: str, epc_data: pd.DataFrame): """ Simple method to extract the mode value from the EPC data :param col: name of the column to take the mode of :param epc_data: pandas dataframe of epc data """ mode_value = epc_data[[col]].mode(dropna=True) if len(mode_value) != 1: raise NotImplementedError("TODO: Handle multiple modes") mode_value = mode_value.iloc[0][col] return mode_value def fetch_nearby_epcs( self, initial_postcode: str, lmks_to_drop: list[str] | None = None, built_form: str = "", property_type: str = "", exclude_old: bool = False, heating_system: [str, None] = None, associated_uprns: [List[int] | None] = None ): """ Fetches and processes EPC data for a given initial postcode, applying successive trimming to the postcode and filtering the data until a non-empty result set is found. The function queries the EPC API with the provided postcode, and if no data is found or if the data doesn't meet certain criteria, it progressively shortens the postcode by removing the last character and retries the query. This process continues until a valid set of EPC data is obtained or the postcode is exhausted. Additional filtering is applied to the obtained EPC data based on 'lmk-key', 'built-form', and 'property-type'. The data is also processed to extract and numerically interpret house numbers, calculate house number distances, and apply weights based on these distances. :param initial_postcode: The initial full postcode for the EPC data query. :param lmks_to_drop: List of 'lmk-key' values to be excluded from the EPC data. :param built_form: The 'built-form' value to be used for filtering the EPC data. :param property_type: The 'property-type' value to be used for filtering the EPC data. :param exclude_old: Flag to exclude EPC data older than 10 years. :param heating_system: Optional heating system type for additional filtering. :param associated_uprns: Optional list of associated UPRNs for additional filtering. :return: """ associated_uprns_to_apply = [] if associated_uprns is None else associated_uprns.copy() property_type_api_map = { "Bungalow": "bungalow", "Flat": "flat", "House": "house", "Maisonette": "maisonette", "Park home": "park home", } postcode = initial_postcode while postcode: # Fetch data from EPC API params = {"postcode": postcode} if property_type: params["property-type"] = property_type_api_map[property_type] # We take the 20 nearest homes of the relevant type, so not to pull in too many irrelevant homes epc_response = self.get_epc(params=params, size=100) if epc_response["status"] == 200: epc_data = pd.DataFrame(self.data["rows"]) if lmks_to_drop is not None: epc_data = epc_data[~epc_data["lmk-key"].isin(lmks_to_drop)] try: epc_data['lodgement-datetime'] = pd.to_datetime( epc_data['lodgement-datetime'], format='%Y-%m-%d %H:%M:%S', errors='coerce' ) except Exception as e: logger.error("Problem formatting lodgement-datime, appling fallback: " + str(e)) epc_data['lodgement-datetime'] = pd.to_datetime(epc_data['lodgement-datetime'], errors='coerce') if exclude_old: # Exclude EPC data older than 10 years epc_data = epc_data[ epc_data["lodgement-datetime"] > (pd.Timestamp.now() - pd.DateOffset(years=10)) ] # Regardless of whether or not we exclude old, we drop any SAP05 entries, which will be problematic # if we include them if not epc_data.empty: epc_data = epc_data[~epc_data["mainheat-description"].str.lower().str.contains("sap05:")] if not epc_data.empty: # Further processing of the EPC data epc_data = epc_data.sort_values("lodgement-datetime", ascending=False).groupby("uprn").head(1) epc_data["house_number"] = epc_data["address"].apply(lambda add1: self.get_house_number(add1)) epc_data["numeric_house_number"] = epc_data["house_number"].apply( lambda house_num: self.extract_numeric_housenumber_part(house_num) ) if self.numeric_house_number is None: # If we don't have a house number, we treat all weights as equal epc_data["weight"] = 1 else: epc_data["house_number_distance"] = abs( epc_data["numeric_house_number"] - self.numeric_house_number ) # # We add 1, just in case we have a 0 weight (e.g. comparing house number 7a to 7b, or 9A to 9) # epc_data["weight"] = 1 / (epc_data["house_number_distance"] + 1) # # If we have a home without a house number, fill that weight with average # epc_data["weight"] = epc_data["weight"].fillna(epc_data["weight"].mean()) # # Finally, we might not have any house numbers whatsoever so everything could be # # missing, so we fill with 1 # epc_data["weight"] = epc_data["weight"].fillna(1) # TODO: Testing # If the postcode is different from the initial postcode, it doesn't make sense to have # any weightings if all(pd.isnull(epc_data["house_number_distance"])) or (postcode != initial_postcode): epc_data["weight"] = 1 else: epc_data["weight"] = 1 / np.sqrt(epc_data["house_number_distance"] + 1) epc_data["weight"] = epc_data["weight"].fillna(epc_data["weight"].mean()) estimation_property_type = self._estimate_str( key="property-type", estimation_data=epc_data ) if property_type == "" else property_type epc_built_form = self._estimate_str( key="built-form", estimation_data=epc_data[epc_data["property-type"] == estimation_property_type] ) if built_form == "Semi-Detached" and epc_built_form in ["End-Terraced", "Mid-Terraced"]: estimation_built_form = "End-Terraced" elif (built_form == "") or (pd.isnull(built_form)): estimation_built_form = epc_built_form elif built_form == "Enclosed Mid-Terrace": # We check if we have any enclosed and if not, we fall back to mid-terrace if sum(epc_data["built-form"] == "Enclosed Mid-Terrace") > 0: estimation_built_form = "Enclosed Mid-Terrace" else: estimation_built_form = "Mid-Terrace" elif built_form == "Enclosed End-Terrace": # An enclosed end terrace has three two external facing walls so we fall back to mid-terrace if sum(epc_data["built-form"] == "Enclosed End-Terrace") > 0: estimation_built_form = "Enclosed Mid-Terrace" else: estimation_built_form = "Mid-Terrace" else: estimation_built_form = built_form # We handle some edge cases experiences with maisonettes - if built form is detatched, just filter # on maisonette # We also add some additional logic for Park homes, because they are far less common than other # property types is_maisonette_with_bad_built_form = (estimation_property_type == "Maisonette") & ( estimation_built_form in ["Detached", "Semi-Detached"] ) is_park_home_without_built_form = (estimation_property_type == "Park home") & ( sum(epc_data["built-form"] == estimation_built_form) == 0 ) has_missing_built_form = not estimation_built_form # If we have associated UPRNS, we just filter as such, otherwise # we filter with built form and property type if any(str(x) in epc_data["uprn"].astype(str).values for x in associated_uprns_to_apply): # We check at least one UPRN is in the data epc_data = epc_data[epc_data["uprn"].isin(associated_uprns_to_apply)] # After we run this, we empty associated_uprns_to_apply. # That ensures we don't keep re-applying this filter if we shorten the postcode again # since we'll keep ending up in the same results associated_uprns_to_apply = [] elif is_maisonette_with_bad_built_form or is_park_home_without_built_form or has_missing_built_form: epc_data = epc_data[epc_data["property-type"] == estimation_property_type] else: epc_data = epc_data[ (epc_data["built-form"] == estimation_built_form) & ( epc_data["property-type"] == estimation_property_type) ] if heating_system is not None: epc_data = epc_data[ epc_data["mainheat-description"] == heating_system ] if not epc_data.empty: return epc_data # Return the filtered data if it's not empty # Shorten the postcode by one character for the next iteration postcode = postcode[:-1].rstrip() # If loop finishes without a valid response, raise an exception raise Exception("Unable to find postcode data after trimming - investigate me") def estimate_epc( self, property_type, built_form, lmks_to_drop=None, exclude_old=False, heating_system=None, associated_uprns=None ): """ For a property that does not have an EPC, we retrieve the EPC data for the closest properties and estimate the EPC for the property in question. Note - do we have postcodes with just a single address? We would need to use a different approach to find the closest homes :param property_type: This is the property type of the property we are estimating, that can be retrieved from the ordnance survey api :param built_form: This is the built form of the property we are estimating, that can be retrieved from the ordnance survey api :param lmks_to_drop: This is a list of LMK keys that should be dropped from the estimation process. This is used as an override for testing, to drop EPCs for the property we are testing :param exclude_old: Used to drop any expired EPCs (more than 10 years old) :param heating_system: The heating system of the property we are estimating, if known. Will aim to filter EPCs to matching heating systems :param associated_uprns: List of associated UPRNs for the property. E.g. other units in a block of flats :return: """ # From the ordnance survey data, we want to determine the property type and then use only similar property # types for the estimation process epc_data = self.fetch_nearby_epcs( initial_postcode=self.postcode, lmks_to_drop=lmks_to_drop, built_form=built_form, property_type=property_type, exclude_old=exclude_old, heating_system=heating_system, associated_uprns=associated_uprns ) # Check if it's a new build EPC. A property that doesn't have an EPC is not going to be a new build # so we avoid comparing it to new builds # TODO - this is experimental - if we have the year the property was built, we should utilise that # here newer_age_bands = [ "England and Wales: 1996-2002", "England and Wales: 2003-2006", "England and Wales: 2007-2011", "England and Wales: 2012 onwards" ] # We also remove EPCs that are for new dwellings if (~epc_data["construction-age-band"].isin(newer_age_bands)).sum(): # We have some older age bands, so we need to filter them out epc_data = epc_data[~epc_data["construction-age-band"].isin(newer_age_bands)].copy() # If we have missing lodgment date, we fill it with inspection-date epc_data["lodgement-datetime"] = epc_data["lodgement-datetime"].fillna(epc_data["inspection-date"]) # If we still have missing dates, we set it to the mean of the non NA dates epc_data["lodgement-datetime"] = epc_data["lodgement-datetime"].fillna(epc_data["lodgement-datetime"].mean()) # For each attribute, we need to determine the datatype and use an appropriate method # to estimate. estimated_epc = {} for key, vartype in vartypes.items(): epc_data[key] = np.where(pd.isnull(epc_data[key]), None, epc_data[key]) epc_data[key] = np.where(epc_data[key] == "", None, epc_data[key]) estimation_data = epc_data[[key, "weight", "lodgement-datetime"]].copy() estimation_data = estimation_data[~pd.isnull(estimation_data[key])] estimation_data = estimation_data[~estimation_data[key].isin(Definitions.DATA_ANOMALY_MATCHES)] if vartype == "Int64": # We have some edge cases where we get the error "invalid literal for int() with base 10: '1.0'" # so this handles this estimation_data[key] = estimation_data[key].astype(float).astype(vartype) else: estimation_data[key] = estimation_data[key].astype(vartype) if estimation_data.shape[0] == 0: estimated_epc[key] = None continue if key == "floor-height": # We speficially handle this, to avoid extreme values # We check if we have any rows less than 3.5m if estimation_data[estimation_data["floor-height"].astype(float) <= 3.5].shape[0] > 0: # Perform the filter estimation_data = estimation_data[estimation_data["floor-height"].astype(float) <= 3.5] if vartype == "Int64": estimated_value = self._estimate_int(estimation_data, key) elif vartype == "float": estimated_value = self._estimate_float(estimation_data, key) elif vartype == "str": estimated_value = self._estimate_str(estimation_data, key) else: raise NotImplementedError("estimation method not implemented for type") estimated_epc[key] = estimated_value # Insert an estimated lodgement datetime, with a weighted average estimated_epc["lodgement-datetime"] = self.calculate_weighted_lodgement_datetime(epc_data=epc_data) # Extract logement date # It is possible that there is still no lodgement date, so we need to handle this if pd.isnull(estimated_epc["lodgement-datetime"]): estimated_epc["lodgement-date"] = None else: estimated_epc["lodgement-date"] = estimated_epc["lodgement-datetime"].strftime("%Y-%m-%d") estimated_epc["current-energy-rating"] = sap_to_epc(estimated_epc["current-energy-efficiency"]) # Convert the cost current and potential variables - to string integers for variable in ["heating-cost-current", "hot-water-cost-current", "lighting-cost-current", "heating-cost-potential", "hot-water-cost-potential", "lighting-cost-potential"]: estimated_epc[variable] = str(int(estimated_epc[variable])) # This is a string estimated_epc["low-energy-fixed-light-count"] = ( str(estimated_epc["low-energy-fixed-light-count"]) if estimated_epc["low-energy-fixed-light-count"] else "" ) # This is an int estimated_epc["photo-supply"] = ( int(np.round(estimated_epc["photo-supply"])) if estimated_epc["photo-supply"] else estimated_epc[ "photo-supply"] ) estimated_epc["co2-emiss-curr-per-floor-area"] = ( estimated_epc["co2-emissions-current"] / estimated_epc["total-floor-area"] ) estimated_epc["postcode"] = self.postcode if not self.uprn: # Update self.uprn too self.uprn = hash(self.address1 + self.postcode) estimated_epc["uprn"] = self.uprn estimated_epc["address"] = self.full_address # Indicate that this epc was estimated estimated_epc["estimated"] = True return estimated_epc @staticmethod def calculate_weighted_lodgement_datetime(epc_data): numeric_dates = pd.to_datetime(epc_data['lodgement-datetime']).view('int64') # Calculate the weighted sum of dates weighted_sum = (numeric_dates * epc_data['weight']).sum() # Calculate the sum of weights total_weights = epc_data['weight'].sum() # Calculate the weighted mean in numeric format weighted_mean_numeric = weighted_sum / total_weights # Convert the numeric weighted mean back to datetime weighted_mean_datetime = pd.to_datetime(weighted_mean_numeric) return weighted_mean_datetime @staticmethod def _estimate_int(estimation_data, key): return round(np.average(a=estimation_data[key], weights=estimation_data["weight"])) @staticmethod def _estimate_float(estimation_data, key): return round(np.average(a=estimation_data[key], weights=estimation_data["weight"]), 2) @staticmethod def _estimate_str(estimation_data, key): agg = estimation_data.groupby(key)["weight"].sum().reset_index() agg = agg[agg["weight"] == agg["weight"].max()] if agg.shape[0] != 1: # If we have multiple modes, we take the more recent data on average recent_grouped = estimation_data[ estimation_data[key].isin(agg[key].values) ].groupby(key)["lodgement-datetime"].mean() newest_group = recent_grouped.idxmax() return newest_group return agg[key].values[0] def find_property(self, skip_os=False, api_data=None, overwrite_sap05=False): """ This method will attempt to identify a property. It will, at first, use the EPC api to try and find the EPC for the property and the associated UPRN. If this fails, it will use the Ordnance Survey API to find the UPRN of the address. Because no result may have been provided by the EPC api because of formatting issues with the address, if the ordnance survey api is used and the uprn retrieved, the EPC api is queried again with the UPRN, just as a final check to see if there is any EPC data. If there is no EPC data, the epc data will be estimated based on the surrounding properties :param skip_os: If True, the ordnance survey api will be skipped and only the EPC api will be used :param api_data: If provided, this data will be used instead of querying the EPC api :param overwrite_sap05: For extrememly old, SAP05 EPCs, we may wish to overwrite them with an estimated EPC. This is because the SAP05 EPCs will have missing information such as the main heating will be described as SAP05:Main-Heating, which isn't particularly useful for the purpose of providing recommendations. """ # Step 1: use the epc api to find the property and uprn if api_data: self.data = api_data response = {"status": 200} else: response = self.get_epc() if response["status"] == 200: ( self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn, self.address_postal_town ) = self.extract_epc_data(address=self.full_address) # Before we return, we check if we need to overwrite a SAP05 EPC # ---- SAP 05 overwriting logic ---- is_sap_05 = "SAP05:" in self.newest_epc.get("mainheat-description", "") needs_sap_05_overwrite = is_sap_05 and (response["status"] == 200) and overwrite_sap05 # ---- Cleaning expired EPC logic ---- epc_is_expired = (pd.Timestamp.now() - pd.Timestamp( self.newest_epc.get("lodgement-date", pd.Timestamp.now()))).days > 3650 epc_has_missing_key_data = any([self.newest_epc.get(k) in [None, ""] for k in self.CHECK_MISSING_KEYS]) epc_needs_cleaning = epc_is_expired and epc_has_missing_key_data # ---- We don't have an epc ---- no_epc = response["status"] != 200 # If we don't have to overwrite SAP05, or we don't have missing data on an expired EPC, we return if not needs_sap_05_overwrite and not epc_needs_cleaning and not no_epc: # If the data is fine, or we're preventing SAP05 overwrites, we just exit here return # By default, we don't exclude old but we will do, when we are estimating to overwrite a SAP05 EPC lmks_to_drop, exclude_old = [], False if needs_sap_05_overwrite or epc_needs_cleaning: self.overwrite_sap05 = needs_sap_05_overwrite self.clean_missing_on_expired = epc_needs_cleaning lmks_to_drop = [self.newest_epc["lmk-key"]] exclude_old = True self.heating_system = ( self.newest_epc["mainheat-description"] if self.clean_missing_on_expired and self.heating_system is None else self.heating_system ) self.ordnance_survey_client.property_type = self.newest_epc["property-type"] self.ordnance_survey_client.built_form = self.newest_epc["built-form"] # Step 2: If we don't have an EPC, we use the ordnance survey api to find the uprn if skip_os: if self.ordnance_survey_client.property_type is not None: # We can try and estimate estimated_epc = self.estimate_epc( property_type=self.ordnance_survey_client.property_type, built_form=self.ordnance_survey_client.built_form, heating_system=self.heating_system, associated_uprns=self.associated_uprns, lmks_to_drop=lmks_to_drop, exclude_old=exclude_old ) # If we have overwritten a SAP05 EPC, we need to update older_epcs too if self.overwrite_sap05: # We keep a record of the fact that we have performed a SAP05 overwrite estimated_epc["sap_05_overwritten"] = True self.older_epcs = [self.newest_epc.copy()] self.newest_epc = estimated_epc elif self.clean_missing_on_expired: # We perform the cleaning for k in self.CHECK_MISSING_KEYS: if self.newest_epc[k] in ["", None]: self.newest_epc[k] = estimated_epc[k] self.newest_epc["estimated"] = True self.older_epcs = [] else: self.older_epcs = [] self.newest_epc = estimated_epc self.full_sap_epc = {} # Finally, set a standardised address 1 and postcode self.address_clean = ( self.ordnance_survey_client.address_os if self.ordnance_survey_client.address_os else self.address1 ) self.postcode_clean = ( self.ordnance_survey_client.postcode_os if self.ordnance_survey_client.postcode_os else self.postcode ) return os_response = self.ordnance_survey_client.get_places_api() if os_response["status"] != 200: # Investigate this if it happens raise Exception("Unable to find property - investigate me") # Step 3: Now that we have a urpn, do another check against the epc api, this time searching with the uprn self.uprn = self.ordnance_survey_client.most_relevant_result["UPRN"] response = self.get_epc() if response["status"] == 200: ( self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn, self.address_postal_town ) = self.extract_epc_data() return # Step 4: If we still don't have an EPC, we estimate the EPC data self.full_address = self.ordnance_survey_client.most_relevant_result["ADDRESS"] estimated_epc = self.estimate_epc( property_type=self.ordnance_survey_client.property_type, built_form=self.ordnance_survey_client.built_form ) self.newest_epc = estimated_epc self.older_epcs = [] self.full_sap_epc = {} # Finally, set a standardised address 1 and postcode self.address_clean = self.ordnance_survey_client.address_os self.postcode_clean = self.ordnance_survey_client.postcode_os return def set_uprn_source(self, file_format): """ Utility function to set the uprn source based on the file format. Only works for domna_asset_lists and this is very much placeholder until we standardised our input data formats :param file_format: :return: """ if not self.newest_epc: raise ValueError("No EPC data available to set UPRN source - run find_property first") if (self.newest_epc.get("estimated") and (file_format == "domna_asset_list") and (float(self.newest_epc["uprn"]) < 0)): self.newest_epc["uprn-source"] = self.UPRN_SOURCE_SIMULATED def check_attribute_variations(self): attribute_map = { "walls-description": { "cleaner": WallAttributes, "attribute": [ "is_cavity_wall", "is_solid_brick", "is_system_built", "is_timber_frame", "is_granite_or_whinstone", "is_cob", "is_sandstone_or_limestone", "is_park_home" ], "name": "has_wall_type_ever_varied" }, "roof-description": { "cleaner": RoofAttributes, "attribute": [ "is_flat", "is_pitched", "is_roof_room", "is_thatched", "has_dwelling_above" ], "name": "has_roof_type_ever_varied" }, "floor-description": { "cleaner": FloorAttributes, "attribute": [ "is_to_unheated_space", "is_to_external_air", "is_suspended", "is_solid", "is_to_external_air", ], "name": "has_floor_type_ever_varied" } } attribute_variations = {} for attribute, attribute_objs in attribute_map.items(): attribute_variations[attribute_objs["name"]] = False cleaner = attribute_objs["cleaner"] type_timeline = pd.DataFrame([cleaner(epc[attribute]).process() for epc in self.older_epcs] + [ cleaner(self.newest_epc[attribute]).process() ]) # For eac col in attribute_objs["attribute"] we check if the timeline has ever varied, i.e has gone # from true to false for col in attribute_objs["attribute"]: if type_timeline[col].nunique() > 1: attribute_variations[attribute_objs["name"]] = True break return attribute_variations def identify_flat_floor(self): # If there is no dwelling above, it is a top floor flat processed_roof = RoofAttributes(self.newest_epc["roof-description"]).process() if not processed_roof["has_dwelling_above"]: return "top" # We know that there is a dwelling above. If there's also a drwelling below, it is a mid floor flat processed_floor = FloorAttributes(self.newest_epc["floor-description"]).process() if processed_floor["another_property_below"]: return "mid" # Otherwise ground floor return "ground" def get_metadata(self): if not self.newest_epc: raise ValueError("No EPC data available") # We check if the property has ever been downgraded on SAP has_sap_ever_downgraded = False sap_timeline = [int(epc["current-energy-efficiency"]) for epc in self.older_epcs] + [ int(self.newest_epc["current-energy-efficiency"]) ] # We check if there has ever been a decrease by differencing has_sap_ever_downgraded = any(np.diff(sap_timeline) < 0) # We check if the wall type has ever varied over time attribute_varations = self.check_attribute_variations() # If the property is a flat, we distinguish between top, mid, ground floor floor = None if self.newest_epc["property-type"] == "Flat": floor = self.identify_flat_floor() self.metadata = { "days_since_last_epc": (pd.Timestamp.now() - pd.Timestamp(self.newest_epc["lodgement-date"])).days, "has_sap_ever_downgraded": has_sap_ever_downgraded, "floor": floor, **attribute_varations }