import os import time import re from urllib.parse import urlencode import usaddress import pandas as pd import numpy as np from epc_api.client import EpcClient from backend.OrdnanceSurvey import OrdnanceSuveyClient from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes from etl.epc_clean.epc_attributes.FloorAttributes import FloorAttributes from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes from BaseUtility import Definitions from utils.logger import setup_logger from typing import List from fuzzywuzzy import process from backend.app.utils import sap_to_epc logger = setup_logger() vartypes = { 'low-energy-fixed-light-count': "Int64", # 'address': 'str', # 'uprn-source': 'str', 'floor-height': 'float', 'heating-cost-potential': 'float', 'unheated-corridor-length': 'float', 'hot-water-cost-potential': 'float', 'construction-age-band': 'str', 'potential-energy-rating': 'str', 'mainheat-energy-eff': 'str', 'windows-env-eff': 'str', 'lighting-energy-eff': 'str', 'environment-impact-potential': "Int64", 'glazed-type': 'str', 'heating-cost-current': 'float', # 'address3': 'str', 'mainheatcont-description': 'str', 'sheating-energy-eff': 'str', 'property-type': 'str', 'local-authority-label': 'str', 'fixed-lighting-outlets-count': "Int64", 'energy-tariff': 'str', 'mechanical-ventilation': 'str', 'hot-water-cost-current': 'str', 'county': 'str', # 'postcode': 'str', 'solar-water-heating-flag': 'str', 'constituency': 'str', 'co2-emissions-potential': 'float', 'number-heated-rooms': 'float', 'floor-description': 'str', 'energy-consumption-potential': 'float', 'local-authority': 'str', 'built-form': 'str', 'number-open-fireplaces': "Int64", 'windows-description': 'str', 'glazed-area': 'str', # 'inspection-date': str, 'mains-gas-flag': 'str', 'co2-emiss-curr-per-floor-area': 'float', # 'address1': 'str', 'heat-loss-corridor': 'str', 'flat-storey-count': "Int64", 'constituency-label': 'str', 'roof-energy-eff': 'str', 'total-floor-area': 'float', 'building-reference-number': 'str', 'environment-impact-current': 'float', 'co2-emissions-current': 'float', 'roof-description': 'str', 'floor-energy-eff': 'str', 'number-habitable-rooms': 'float', # 'address2': 'str', 'hot-water-env-eff': 'str', 'posttown': 'str', 'mainheatc-energy-eff': 'str', 'main-fuel': 'str', 'lighting-env-eff': 'str', 'windows-energy-eff': 'str', 'floor-env-eff': 'str', 'sheating-env-eff': 'str', 'lighting-description': 'str', 'roof-env-eff': 'str', 'walls-energy-eff': 'str', 'photo-supply': 'float', 'lighting-cost-potential': 'float', 'mainheat-env-eff': 'str', 'multi-glaze-proportion': 'float', 'main-heating-controls': 'str', # 'lodgement-datetime', 'flat-top-storey': 'str', 'current-energy-rating': 'str', 'secondheat-description': 'str', 'walls-env-eff': 'str', 'transaction-type': 'str', # 'uprn': "Int64", 'current-energy-efficiency': 'Int64', 'energy-consumption-current': 'float', 'mainheat-description': 'str', 'lighting-cost-current': 'float', # 'lodgement-date', 'extension-count': "Int64", 'mainheatc-env-eff': 'str', # 'lmk-key': 'str', 'wind-turbine-count': "Int64", 'tenure': 'str', 'floor-level': 'str', 'potential-energy-efficiency': "Int64", 'hot-water-energy-eff': 'str', 'low-energy-lighting': 'float', 'walls-description': 'str', 'hotwater-description': 'str' } class SearchEpc: """ Given address information about a home, this class is responsible for retrieving the EPC data associated to the property. For a home, we might have address lines 1, 2, 3 and 4, as well as a postcode. Often, simply searching the EPC database with address line 1 and postcode will be enough to find the property, but there are some cases where this is not true and we might need to utilise other combinations about the home to find the property """ # If we create the uprn based on a hash, we mark it as simulated UPRN_SOURCE_SIMULATED = "SIMULATED" MAX_RETRIES = 5 SUCCESS = { "status": 200, "message": "success", "error": None } NODATA = { "status": 204, "message": "no data", "error": None } def __init__( self, address1: str, postcode: str, auth_token: str, os_api_key: str, full_address: str | None = None, max_retries: int = None, uprn: [int, None] = None, size=None, property_type=None, fast=False, ): """ Address lines 1 and postcode are mandatory fields. The other address lines are optional but can be used to find the epc for the home, if address1 and postcode are insufficient :param address1: string, propery's address line 1 :param postcode: string, propery's postcode :param full_address: string, optional parameter, the full address of the property :param max_retries: int, optional, number of retries to make when searching the api :param uprn: int, optional, the uprn of the property :param size: int, optional, the number of results to return. If not provided, defaults to 25 which is the api's default :param property_type: str, optional, the property type of the property, if known before hand """ self.address1 = address1 self.postcode = postcode self.full_address = full_address self.uprn = uprn self.house_number = self.get_house_number(self.address1) self.numeric_house_number = self.extract_numeric_housenumber_part(self.house_number) self.max_retries = max_retries if max_retries is not None else self.MAX_RETRIES self.client = EpcClient(auth_token=auth_token) self.ordnance_survey_client = OrdnanceSuveyClient( address=self.address1, postcode=self.postcode, api_key=os_api_key ) self.data = None self.newest_epc = None self.older_epcs = None self.full_sap_epc = None self.metadata = None # These are the address and postcode values, which we store in the database self.address_clean = None self.postcode_clean = None self.size = size if size is not None else 25 self.property_type = property_type self.fast = fast @staticmethod def get_house_number(address: str, postcode=None) -> str | None: """ This method uses the usaddress library to parse an address and extract the primary house or flat number. """ try: # Updated regex to catch house numbers including alphanumeric ones pattern = r'(?i)(?:flat|apartment)\s*(\d+\w*)|^\s*(\d+\w*)' match = re.search(pattern, address) if match: return next(g for g in match.groups() if g is not None) parsed = usaddress.parse(address) # First, try to get the 'OccupancyIdentifier' if 'OccupancyType' is detected for part, type_ in parsed: if type_ == 'OccupancyIdentifier': if postcode is not None: if part == postcode.split(" ")[0]: continue if part == postcode.split(" ")[1]: continue return part # This assumes the first 'OccupancyIdentifier' after 'OccupancyType' is the primary # number # Fallback to 'AddressNumber' if no 'OccupancyIdentifier' is found address_number = next((part for part, type_ in parsed if type_ == 'AddressNumber'), None) if address_number: return address_number.replace(",", "") # Remove any trailing commas except Exception as e: raise Exception(f"Error parsing address: {e}") return None @staticmethod def extract_numeric_housenumber_part(house_number: str | None) -> int | None: # Regular expression to find the first occurrence of one or more digits if house_number is None: return None match = re.search(r'\d+', house_number) if match: return int(match.group()) else: return None def _get_epc(self, params, size): """ To be called by get_epc() - not for external usage """ url = os.path.join(self.client.domestic.host, "search") if size: url += "?" + urlencode({k: v for k, v in {"size": size}.items() if v}) for retry in range(self.max_retries): try: response = self.client.domestic.call(method="get", url=url, params=params) if response: self.data = response return { "response": response, "msg": self.SUCCESS } if retry > 0: logger.info("Failed previous attempt but retry successful") # If we got nothing, final try if not response: return { "response": response, "msg": self.NODATA } except Exception as e: if retry < self.max_retries - 1: # If not the last retry, wait for 3 seconds before retrying time.sleep(3) else: # If it's the last retry, we continue return { "response": {}, "msg": { "status": 500, "message": "Could not retrieve EPC data", "error": str(e) } } def get_epc(self, params=None, size=None): # Get the EPC data with retries size = size if size is not None else self.size if params: output = self._get_epc(params=params, size=size) if output["msg"]["status"] == 200: self.data = output["response"] return output["msg"] uprn_params = {"uprn": self.uprn} if self.uprn else {} address_params = {"address": self.address1, "postcode": self.postcode} # We attempt the search with uprn params data = {"rows": []} if uprn_params: api_response = self._get_epc(params=uprn_params, size=size) if api_response["msg"]["status"] == 200: data["rows"].extend(api_response["response"]["rows"]) # If we were unsuccessful, we then make a second attempt to fetch the data. We find that # properties are sometimes listed under the wrong UPRN api_response = self._get_epc(params=address_params, size=size) if api_response["msg"]["status"] == 200: # We update the data with the correct uprn if self.uprn: for x in api_response["response"]["rows"]: x["uprn"] = self.uprn data["rows"].extend(api_response["response"]["rows"]) # We no de-dupe on lmk-key to avoid duplicates seen = set() data["rows"] = [ row for row in data["rows"] if row["lmk-key"] not in seen and not seen.add(row["lmk-key"]) ] if data: api_response["msg"] = self.SUCCESS return api_response["msg"] def filter_rows(self, rows, property_type=None, address=None): """ This method should not be used when property_type and address are both not None :param rows: :param property_type: :param address: :return: """ # Given the results from the EPC api, attempts to reduce the number of rows uprns = {r["uprn"] for r in rows} if (property_type is None) and (address is None): return rows unique_property_types = {r["property-type"] for r in rows} # We allow for variation in property type across flats/maisonettes if (len(uprns) == 1) and ((len(unique_property_types) == 1) or unique_property_types == {"Flat", "Maisonette"}): return rows if property_type is not None: # We can do a filter on the property type rows_filtered = [r for r in rows if r["property-type"] == property_type] if rows_filtered: return rows_filtered return rows if address is not None: # We can do a filter on the property type # We check if the full address contains the postcode and if it does, remove if self.postcode in address: address = address.replace(self.postcode, "").strip().rstrip(",") # We check if post town is included in the address if any([r["posttown"].lower() in address.lower() for r in rows]): best_match = process.extractOne( address, [", ".join([r["address"], r["posttown"]]) for r in rows], score_cutoff=0 ) # Get all of the scores rows_filtered = [r for r in rows if ", ".join([r["address"], r["posttown"]]) == best_match[0]] else: best_match = process.extractOne(address, [r["address"] for r in rows], score_cutoff=0) # Get the UPRN for the best match best_match_uprn = {r["uprn"] for r in rows if r["address"] == best_match[0]}.pop() # Get all of the scores rows_filtered = [ r for r in rows if (r["address"] == best_match[0]) or (r["uprn"] == best_match_uprn) ] if rows_filtered: return rows_filtered return rows @staticmethod def format_address(newest_epc): """ Format address and postcode for storage in the database """ postcode = newest_epc["postcode"] address = newest_epc["address"] # Format them address = address.replace(postcode, "").strip() address = address.rstrip(",").strip() address = address.title() postcode = postcode.upper() return address, postcode def extract_epc_data(self, address=None): """ Given a successful search, this method will format the data and return it :return: """ if self.data is None: raise ValueError("data is missing, run search first") rows = self.data["rows"] # We perform some checks on the rows # Firstly, we should only have 1 urpn so if we have multiple, we'll need to filter down the # property further rows = self.filter_rows(rows, property_type=self.property_type, address=None) rows = self.filter_rows(rows, property_type=None, address=address) # We now check for a full sap epc: full_sap_epc = [r for r in rows if r["transaction-type"] == "new dwelling"] full_sap_epc = full_sap_epc[0] if full_sap_epc else {} # Finally, we identify the newest epc and the rest, and then return newest_epc, older_epcs = self.filter_newest_epc(list_of_epcs=rows) # Ge the uprn from the newest record for this home uprns = {r["uprn"] for r in rows if r["uprn"]} # We can sometimes have no uprn for a property if (len(uprns) == 0) and len(rows) > 0: logger.warning("Found data but missing uprn") elif len(uprns) != 1: # There is a possibility that we have multiple UPRNs for a single property, which is an error addresses = {r["address"] for r in rows} if len(addresses) == 1: # Take the uprn from the most recent uprns = {newest_epc["uprn"]} else: raise ValueError("Multiple UPRNs found - investigate me") if uprns: uprn = uprns.pop() else: newest_epc["uprn-source"] = self.UPRN_SOURCE_SIMULATED uprn = hash(self.address1 + self.postcode) if self.fast: return newest_epc, [], {}, "", "", None # Retrieve postcode and address address_epc, postcode_epc = self.format_address(newest_epc=newest_epc) return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, uprn @staticmethod def filter_newest_epc(list_of_epcs: List): newest_response = [ r for r in list_of_epcs if r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in list_of_epcs]) ] if not newest_response: return {}, [] if len(newest_response) != 1: # It is possible (but rare, and likely an error on EPC lodgement) that we have multiple EPCs that # were lodged at the exact same time. In this case, we will take the first one newest_response = [newest_response[0]] older_epcs = [epc for epc in list_of_epcs if epc["lmk-key"] != newest_response[0]["lmk-key"]] return newest_response[0], older_epcs @staticmethod def _get_epc_mode(col: str, epc_data: pd.DataFrame): """ Simple method to extract the mode value from the EPC data :param col: name of the column to take the mode of :param epc_data: pandas dataframe of epc data """ mode_value = epc_data[[col]].mode(dropna=True) if len(mode_value) != 1: raise NotImplementedError("TODO: Handle multiple modes") mode_value = mode_value.iloc[0][col] return mode_value def fetch_nearby_epcs( self, initial_postcode: str, lmks_to_drop: list[str] | None = None, built_form: str = "", property_type: str = "", exclude_old: bool = False ): """ Fetches and processes EPC data for a given initial postcode, applying successive trimming to the postcode and filtering the data until a non-empty result set is found. The function queries the EPC API with the provided postcode, and if no data is found or if the data doesn't meet certain criteria, it progressively shortens the postcode by removing the last character and retries the query. This process continues until a valid set of EPC data is obtained or the postcode is exhausted. Additional filtering is applied to the obtained EPC data based on 'lmk-key', 'built-form', and 'property-type'. The data is also processed to extract and numerically interpret house numbers, calculate house number distances, and apply weights based on these distances. :param initial_postcode: The initial full postcode for the EPC data query. :param lmks_to_drop: List of 'lmk-key' values to be excluded from the EPC data. :param built_form: The 'built-form' value to be used for filtering the EPC data. :param property_type: The 'property-type' value to be used for filtering the EPC data. :param exclude_old: Flag to exclude EPC data older than 10 years. :return: """ property_type_api_map = { "Bungalow": "bungalow", "Flat": "flat", "House": "house", "Maisonette": "maisonette", "Park home": "park home", } postcode = initial_postcode while postcode: # Fetch data from EPC API params = {"postcode": postcode} if property_type: params["property-type"] = property_type_api_map[property_type] # We take the 20 nearest homes of the relevant type, so not to pull in too many irrelevant homes epc_response = self.get_epc(params=params, size=100) if epc_response["status"] == 200: epc_data = pd.DataFrame(self.data["rows"]) if lmks_to_drop is not None: epc_data = epc_data[~epc_data["lmk-key"].isin(lmks_to_drop)] try: epc_data['lodgement-datetime'] = pd.to_datetime( epc_data['lodgement-datetime'], format='%Y-%m-%d %H:%M:%S', errors='coerce' ) except Exception as e: logger.error("Problem formatting lodgement-datime, appling fallback: " + str(e)) epc_data['lodgement-datetime'] = pd.to_datetime(epc_data['lodgement-datetime'], errors='coerce') if exclude_old: # Exclude EPC data older than 10 years epc_data = epc_data[ epc_data["lodgement-datetime"] > (pd.Timestamp.now() - pd.DateOffset(years=10)) ] if not epc_data.empty: # Further processing of the EPC data epc_data = epc_data.sort_values("lodgement-datetime", ascending=False).groupby("uprn").head(1) epc_data["house_number"] = epc_data["address"].apply(lambda add1: self.get_house_number(add1)) epc_data["numeric_house_number"] = epc_data["house_number"].apply( lambda house_num: self.extract_numeric_housenumber_part(house_num) ) if self.numeric_house_number is None: # If we don't have a house number, we treat all weights as equal epc_data["weight"] = 1 else: epc_data["house_number_distance"] = abs( epc_data["numeric_house_number"] - self.numeric_house_number ) # # We add 1, just in case we have a 0 weight (e.g. comparing house number 7a to 7b, or 9A to 9) # epc_data["weight"] = 1 / (epc_data["house_number_distance"] + 1) # # If we have a home without a house number, fill that weight with average # epc_data["weight"] = epc_data["weight"].fillna(epc_data["weight"].mean()) # # Finally, we might not have any house numbers whatsoever so everything could be # # missing, so we fill with 1 # epc_data["weight"] = epc_data["weight"].fillna(1) # TODO: Testing # If the postcode is different from the initial postcode, it doesn't make sense to have # any weightings if all(pd.isnull(epc_data["house_number_distance"])) or (postcode != initial_postcode): epc_data["weight"] = 1 else: epc_data["weight"] = 1 / np.sqrt(epc_data["house_number_distance"] + 1) epc_data["weight"] = epc_data["weight"].fillna(epc_data["weight"].mean()) estimation_property_type = self._estimate_str( key="property-type", estimation_data=epc_data ) if property_type == "" else property_type epc_built_form = self._estimate_str( key="built-form", estimation_data=epc_data[epc_data["property-type"] == estimation_property_type] ) if built_form == "Semi-Detached" and epc_built_form in ["End-Terraced", "Mid-Terraced"]: estimation_built_form = "End-Terraced" elif (built_form == "") or (pd.isnull(built_form)): estimation_built_form = epc_built_form else: estimation_built_form = built_form # We handle some edge cases experiences with maisonettes - if built form is detatched, just filter # on maisonette # We also add some additional logic for Park homes, because they are far less common than other # property types is_maisonette_with_bad_built_form = (estimation_property_type == "Maisonette") & ( estimation_built_form in ["Detached", "Semi-Detached"] ) is_park_home_without_built_form = (estimation_property_type == "Park home") & ( sum(epc_data["built-form"] == estimation_built_form) == 0 ) has_missing_built_form = not estimation_built_form if is_maisonette_with_bad_built_form or is_park_home_without_built_form or has_missing_built_form: epc_data = epc_data[epc_data["property-type"] == estimation_property_type] else: epc_data = epc_data[ (epc_data["built-form"] == estimation_built_form) & ( epc_data["property-type"] == estimation_property_type) ] if not epc_data.empty: return epc_data # Return the filtered data if it's not empty # Shorten the postcode by one character for the next iteration postcode = postcode[:-1].rstrip() # If loop finishes without a valid response, raise an exception raise Exception("Unable to find postcode data after trimming - investigate me") def estimate_epc(self, property_type, built_form, lmks_to_drop=None, exclude_old=False): """ For a property that does not have an EPC, we retrieve the EPC data for the closest properties and estimate the EPC for the property in question. Note - do we have postcodes with just a single address? We would need to use a different approach to find the closest homes :param property_type: This is the property type of the property we are estimating, that can be retrieved from the ordnance survey api :param built_form: This is the built form of the property we are estimating, that can be retrieved from the ordnance survey api :param lmks_to_drop: This is a list of LMK keys that should be dropped from the estimation process. This is used as an override for testing, to drop EPCs for the property we are testing :param exclude_old: Used to drop any expired EPCs (more than 10 years old) :return: """ # From the ordnance survey data, we want to determine the property type and then use only similar property # types for the estimation process epc_data = self.fetch_nearby_epcs( initial_postcode=self.postcode, lmks_to_drop=lmks_to_drop, built_form=built_form, property_type=property_type, exclude_old=exclude_old ) # If we have missing lodgment date, we fill it with inspection-date epc_data["lodgement-datetime"] = epc_data["lodgement-datetime"].fillna(epc_data["inspection-date"]) # If we still have missing dates, we set it to the mean of the non NA dates epc_data["lodgement-datetime"] = epc_data["lodgement-datetime"].fillna(epc_data["lodgement-datetime"].mean()) # For each attribute, we need to determine the datatype and use an appropriate method # to estimate. estimated_epc = {} for key, vartype in vartypes.items(): epc_data[key] = np.where(pd.isnull(epc_data[key]), None, epc_data[key]) epc_data[key] = np.where(epc_data[key] == "", None, epc_data[key]) estimation_data = epc_data[[key, "weight", "lodgement-datetime"]].copy() estimation_data = estimation_data[~pd.isnull(estimation_data[key])] estimation_data = estimation_data[~estimation_data[key].isin(Definitions.DATA_ANOMALY_MATCHES)] if vartype == "Int64": # We have some edge cases where we get the error "invalid literal for int() with base 10: '1.0'" # so this handles this estimation_data[key] = estimation_data[key].astype(float).astype(vartype) else: estimation_data[key] = estimation_data[key].astype(vartype) if estimation_data.shape[0] == 0: estimated_epc[key] = None continue if key == "floor-height": # We speficially handle this, to avoid extreme values # We check if we have any rows less than 3.5m if estimation_data[estimation_data["floor-height"].astype(float) <= 3.5].shape[0] > 0: # Perform the filter estimation_data = estimation_data[estimation_data["floor-height"].astype(float) <= 3.5] if vartype == "Int64": estimated_value = self._estimate_int(estimation_data, key) elif vartype == "float": estimated_value = self._estimate_float(estimation_data, key) elif vartype == "str": estimated_value = self._estimate_str(estimation_data, key) else: raise NotImplementedError("estimation method not implemented for type") estimated_epc[key] = estimated_value # Insert an estimated lodgement datetime, with a weighted average estimated_epc["lodgement-datetime"] = self.calculate_weighted_lodgement_datetime(epc_data=epc_data) # Extract logement date # It is possible that there is still no lodgement date, so we need to handle this if pd.isnull(estimated_epc["lodgement-datetime"]): estimated_epc["lodgement-date"] = None else: estimated_epc["lodgement-date"] = estimated_epc["lodgement-datetime"].strftime("%Y-%m-%d") estimated_epc["current-energy-rating"] = sap_to_epc(estimated_epc["current-energy-efficiency"]) # Convert the cost current and potential variables - to string integers for variable in ["heating-cost-current", "hot-water-cost-current", "lighting-cost-current", "heating-cost-potential", "hot-water-cost-potential", "lighting-cost-potential"]: estimated_epc[variable] = str(int(estimated_epc[variable])) # This is a string estimated_epc["low-energy-fixed-light-count"] = ( str(estimated_epc["low-energy-fixed-light-count"]) if estimated_epc["low-energy-fixed-light-count"] else "" ) # This is an int estimated_epc["photo-supply"] = ( int(np.round(estimated_epc["photo-supply"])) if estimated_epc["photo-supply"] else estimated_epc[ "photo-supply"] ) estimated_epc["postcode"] = self.postcode if not self.uprn: # Update self.uprn too self.uprn = hash(self.address1 + self.postcode) estimated_epc["uprn"] = self.uprn estimated_epc["address"] = self.full_address # Indicate that this epc was estimated estimated_epc["estimated"] = True return estimated_epc @staticmethod def calculate_weighted_lodgement_datetime(epc_data): numeric_dates = pd.to_datetime(epc_data['lodgement-datetime']).view('int64') # Calculate the weighted sum of dates weighted_sum = (numeric_dates * epc_data['weight']).sum() # Calculate the sum of weights total_weights = epc_data['weight'].sum() # Calculate the weighted mean in numeric format weighted_mean_numeric = weighted_sum / total_weights # Convert the numeric weighted mean back to datetime weighted_mean_datetime = pd.to_datetime(weighted_mean_numeric) return weighted_mean_datetime @staticmethod def _estimate_int(estimation_data, key): return round(np.average(a=estimation_data[key], weights=estimation_data["weight"])) @staticmethod def _estimate_float(estimation_data, key): return round(np.average(a=estimation_data[key], weights=estimation_data["weight"]), 2) @staticmethod def _estimate_str(estimation_data, key): agg = estimation_data.groupby(key)["weight"].sum().reset_index() agg = agg[agg["weight"] == agg["weight"].max()] if agg.shape[0] != 1: # If we have multiple modes, we take the more recent data on average recent_grouped = estimation_data[ estimation_data[key].isin(agg[key].values) ].groupby(key)["lodgement-datetime"].mean() newest_group = recent_grouped.idxmax() return newest_group return agg[key].values[0] def find_property(self, skip_os=False): """ This method will attempt to identify a property. It will, at first, use the EPC api to try and find the EPC for the property and the associated UPRN. If this fails, it will use the Ordnance Survey API to find the UPRN of the address. Because no result may have been provided by the EPC api because of formatting issues with the address, if the ordnance survey api is used and the uprn retrieved, the EPC api is queried again with the UPRN, just as a final check to see if there is any EPC data. If there is no EPC data, the epc data will be estimated based on the surrounding properties """ # Step 1: use the epc api to find the property and uprn response = self.get_epc() if response["status"] == 200: ( self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn ) = self.extract_epc_data(address=self.full_address) return # Step 2: If we don't have an EPC, we use the ordnance survey api to find the uprn if skip_os: if self.ordnance_survey_client.property_type is not None: # We can try and estimate estimated_epc = self.estimate_epc( property_type=self.ordnance_survey_client.property_type, built_form=self.ordnance_survey_client.built_form ) self.newest_epc = estimated_epc self.older_epcs = [] self.full_sap_epc = {} # Finally, set a standardised address 1 and postcode self.address_clean = ( self.ordnance_survey_client.address_os if self.ordnance_survey_client.address_os else self.address1 ) self.postcode_clean = ( self.ordnance_survey_client.postcode_os if self.ordnance_survey_client.postcode_os else self.postcode ) return os_response = self.ordnance_survey_client.get_places_api() if os_response["status"] != 200: # Investigate this if it happens raise Exception("Unable to find property - investigate me") # Step 3: Now that we have a urpn, do another check against the epc api, this time searching with the uprn self.uprn = self.ordnance_survey_client.most_relevant_result["UPRN"] response = self.get_epc() if response["status"] == 200: ( self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn ) = self.extract_epc_data() return # Step 4: If we still don't have an EPC, we estimate the EPC data self.full_address = self.ordnance_survey_client.most_relevant_result["ADDRESS"] estimated_epc = self.estimate_epc( property_type=self.ordnance_survey_client.property_type, built_form=self.ordnance_survey_client.built_form ) self.newest_epc = estimated_epc self.older_epcs = [] self.full_sap_epc = {} # Finally, set a standardised address 1 and postcode self.address_clean = self.ordnance_survey_client.address_os self.postcode_clean = self.ordnance_survey_client.postcode_os return def check_attribute_variations(self): attribute_map = { "walls-description": { "cleaner": WallAttributes, "attribute": [ "is_cavity_wall", "is_solid_brick", "is_system_built", "is_timber_frame", "is_granite_or_whinstone", "is_cob", "is_sandstone_or_limestone", "is_park_home" ], "name": "has_wall_type_ever_varied" }, "roof-description": { "cleaner": RoofAttributes, "attribute": [ "is_flat", "is_pitched", "is_roof_room", "is_thatched", "has_dwelling_above" ], "name": "has_roof_type_ever_varied" }, "floor-description": { "cleaner": FloorAttributes, "attribute": [ "is_to_unheated_space", "is_to_external_air", "is_suspended", "is_solid", "is_to_external_air", ], "name": "has_floor_type_ever_varied" } } attribute_variations = {} for attribute, attribute_objs in attribute_map.items(): attribute_variations[attribute_objs["name"]] = False cleaner = attribute_objs["cleaner"] type_timeline = pd.DataFrame([cleaner(epc[attribute]).process() for epc in self.older_epcs] + [ cleaner(self.newest_epc[attribute]).process() ]) # For eac col in attribute_objs["attribute"] we check if the timeline has ever varied, i.e has gone # from true to false for col in attribute_objs["attribute"]: if type_timeline[col].nunique() > 1: attribute_variations[attribute_objs["name"]] = True break return attribute_variations def identify_flat_floor(self): # If there is no dwelling above, it is a top floor flat processed_roof = RoofAttributes(self.newest_epc["roof-description"]).process() if not processed_roof["has_dwelling_above"]: return "top" # We know that there is a dwelling above. If there's also a drwelling below, it is a mid floor flat processed_floor = FloorAttributes(self.newest_epc["floor-description"]).process() if processed_floor["another_property_below"]: return "mid" # Otherwise ground floor return "ground" def get_metadata(self): if self.newest_epc is None: raise ValueError("No EPC data available") # We check if the property has ever been downgraded on SAP has_sap_ever_downgraded = False sap_timeline = [int(epc["current-energy-efficiency"]) for epc in self.older_epcs] + [ int(self.newest_epc["current-energy-efficiency"]) ] # We check if there has ever been a decrease by differencing has_sap_ever_downgraded = any(np.diff(sap_timeline) < 0) # We check if the wall type has ever varied over time attribute_varations = self.check_attribute_variations() # If the property is a flat, we distinguish between top, mid, ground floor floor = None if self.newest_epc["property-type"] == "Flat": floor = self.identify_flat_floor() self.metadata = { "days_since_last_epc": (pd.Timestamp.now() - pd.Timestamp(self.newest_epc["lodgement-date"])).days, "has_sap_ever_downgraded": has_sap_ever_downgraded, "floor": floor, **attribute_varations }