diff --git a/backend/SearchEpc.py b/backend/SearchEpc.py index 087710b8..543fa6b8 100644 --- a/backend/SearchEpc.py +++ b/backend/SearchEpc.py @@ -398,7 +398,81 @@ class SearchEpc: return mode_value - def estimate_epc(self, property_type, built_form): + def fetch_nearby_epcs( + self, initial_postcode: str, + lmks_to_drop: list[str] | None = None, + built_form: str = "", + property_type: str = "" + ): + """ + Fetches and processes EPC data for a given initial postcode, applying successive trimming + to the postcode and filtering the data until a non-empty result set is found. + + The function queries the EPC API with the provided postcode, and if no data is found or + if the data doesn't meet certain criteria, it progressively shortens the postcode by + removing the last character and retries the query. This process continues until a valid + set of EPC data is obtained or the postcode is exhausted. + + Additional filtering is applied to the obtained EPC data based on 'lmk-key', 'built-form', + and 'property-type'. The data is also processed to extract and numerically interpret house + numbers, calculate house number distances, and apply weights based on these distances. + + :param initial_postcode: The initial full postcode for the EPC data query. + :param lmks_to_drop: List of 'lmk-key' values to be excluded from the EPC data. + :param built_form: The 'built-form' value to be used for filtering the EPC data. + :param property_type: The 'property-type' value to be used for filtering the EPC data. + :return: + """ + postcode = initial_postcode + while postcode: + # Fetch data from EPC API + epc_response = self.get_epc(params={"postcode": postcode}, size=100) + + if epc_response["status"] == 200: + epc_data = pd.DataFrame(self.data["rows"]) + + if lmks_to_drop is not None: + epc_data = epc_data[~epc_data["lmk-key"].isin(lmks_to_drop)] + + if not epc_data.empty: + # Further processing of the EPC data + epc_data = epc_data.sort_values("lodgement-datetime", ascending=False).groupby("uprn").head(1) + epc_data["house_number"] = epc_data["address"].apply(lambda add1: self.get_house_number(add1)) + epc_data["numeric_house_number"] = epc_data["house_number"].apply( + lambda house_num: self.extract_numeric_housenumber_part(house_num) + ) + epc_data["house_number_distance"] = abs( + epc_data["numeric_house_number"] - self.numeric_house_number + ) + epc_data["weight"] = 1 / epc_data["house_number_distance"] + + epc_built_form = self._estimate_str(key="built-form", estimation_data=epc_data) + epc_property_type = self._estimate_str(key="property-type", estimation_data=epc_data) + + if built_form == "Semi-Detached" and epc_built_form in ["End-Terraced", "Mid-Terraced"]: + estimation_built_form = "End-Terraced" + elif built_form == "": + estimation_built_form = epc_built_form + else: + estimation_built_form = built_form + + estimation_property_type = epc_property_type if property_type == "" else property_type + + epc_data = epc_data[ + (epc_data["built-form"] == estimation_built_form) & ( + epc_data["property-type"] == estimation_property_type) + ] + + if not epc_data.empty: + return epc_data # Return the filtered data if it's not empty + + # Shorten the postcode by one character for the next iteration + postcode = postcode[:-1] + + # If loop finishes without a valid response, raise an exception + raise Exception("Unable to find postcode data after trimming - investigate me") + + def estimate_epc(self, property_type, built_form, lmks_to_drop=None): """ For a property that does not have an EPC, we retrieve the EPC data for the closest properties and estimate the EPC for the property in question. @@ -409,52 +483,20 @@ class SearchEpc: the ordnance survey api :param built_form: This is the built form of the property we are estimating, that can be retrieved from the ordnance survey api + :param lmks_to_drop: This is a list of LMK keys that should be dropped from the estimation process. This + is used as an override for testing, to drop EPCs for the property we are testing :return: """ # From the ordnance survey data, we want to determine the property type and then use only similar property - # types for the estimation - - # We firstly get the first 100 properties for the postcode, from the EPC api - epc_reponse = self.get_epc(params={"postcode": self.postcode}, size=100) - if epc_reponse["status"] != 200: - raise Exception("Unable to find postcode data - investigate me") - - epc_data = pd.DataFrame(self.data["rows"]) - - # We now get the newest EPC per uprn - epc_data = epc_data.sort_values("lodgement-datetime", ascending=False).groupby("uprn").head(1) - - # For each record, parse the house number. We'll use this to identify the closest properties - epc_data["house_number"] = epc_data["address"].apply(lambda add1: self.get_house_number(add1)) - - # We convert the house number fo a purely numeric format - This numeric house number will be used as - # a distance weight when estimating the EPC - epc_data["numeric_house_number"] = epc_data["house_number"].apply( - lambda house_num: self.extract_numeric_housenumber_part(house_num) + # types for the estimation process + epc_data = self.fetch_nearby_epcs( + initial_postcode=self.postcode, + lmks_to_drop=lmks_to_drop, + built_form=built_form, + property_type=property_type ) - - epc_data["house_number_distance"] = abs(epc_data["numeric_house_number"] - self.numeric_house_number) - epc_data["weight"] = 1 / epc_data["house_number_distance"] - - epc_built_form = self._get_epc_mode(col="built-form", epc_data=epc_data) - epc_property_type = self._get_epc_mode(col="property-type", epc_data=epc_data) - - # We check if the EPC built form is one of the terraced values. If the os_built_form is semi-detached, - # then we set it to be end terraced - if built_form == "Semi-Detached" and epc_built_form in ["End-Terraced", "Mid-Terraced"]: - estimation_built_form = "End-Terraced" - elif built_form == "": - estimation_built_form = epc_built_form - else: - estimation_built_form = built_form - - estimation_property_type = epc_property_type if property_type == "" else property_type - - # We filter the EPC data on just the property types we want to use - epc_data = epc_data[ - (epc_data["built-form"] == estimation_built_form) & (epc_data["property-type"] == estimation_property_type) - ] + epc_data['lodgement-datetime'] = pd.to_datetime(epc_data['lodgement-datetime']) # For each attribute, we need to determine the datatype and use an appropriate method # to estimate. @@ -463,7 +505,7 @@ class SearchEpc: epc_data[key] = np.where(pd.isnull(epc_data[key]), None, epc_data[key]) epc_data[key] = np.where(epc_data[key] == "", None, epc_data[key]) epc_data[key] = epc_data[key].astype(vartype) - estimation_data = epc_data[[key, "weight"]] + estimation_data = epc_data[[key, "weight", "lodgement-datetime"]] estimation_data = estimation_data[~pd.isnull(estimation_data[key])] if estimation_data.shape[0] == 0: @@ -501,7 +543,13 @@ class SearchEpc: agg = estimation_data.groupby(key)["weight"].sum().reset_index() agg = agg[agg["weight"] == agg["weight"].max()] if agg.shape[0] != 1: - raise NotImplementedError("implement me") + # If we have multiple modes, we take the more recent data on average + recent_grouped = estimation_data[ + estimation_data[key].isin(agg[key].values) + ].groupby(key)["lodgement-datetime"].mean() + + newest_group = recent_grouped.idxmax() + return newest_group return agg[key].values[0] diff --git a/etl/testing_data/estimate_epc.py b/etl/testing_data/estimate_epc.py new file mode 100644 index 00000000..8041e38d --- /dev/null +++ b/etl/testing_data/estimate_epc.py @@ -0,0 +1,108 @@ +from pathlib import Path +from random import choices, sample + +import os +import pandas as pd +from tqdm import tqdm +from dotenv import load_dotenv +from utils.logger import setup_logger +from backend.SearchEpc import SearchEpc, vartypes + +ENV_FILE = Path(__file__).parent / "backend" / ".env" + +logger = setup_logger() + +DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates" +DIR_SAMPLE_SIZE = 50 +N_DIRECTORIES = 25 + +EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") + +load_dotenv(ENV_FILE) + + +def check_numeric_performance(estimated_value, actual_value): + # If we don't have anything to compare against, return None + if pd.isnull(actual_value): + return None + + if pd.isnull(estimated_value): + return 1 + + if actual_value == 0 and estimated_value == 0: + return 0 + + return abs(estimated_value - actual_value) / actual_value + + +def app(): + """ + This script is used to test the EPC estimation process. + """ + + numerical_vartypes = {key: value for key, value in vartypes.items() if value in ["float", "Int64"]} + str_var_types = {key: value for key, value in vartypes.items() if value == "str"} + # Make sure we have missed any keys + if len(numerical_vartypes) + len(str_var_types) != len(vartypes): + raise ValueError("Not all vartypes have been accounted for") + + directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] + + directory_sample = choices(directories, k=N_DIRECTORIES) + + results = [] + + for directory in tqdm(directory_sample): + filepath = directory / "certificates.csv" + df = pd.read_csv(filepath, low_memory=False) + df["UPRN"] = df["UPRN"].astype("Int64").astype("str") + df = df[~pd.isnull(df["UPRN"])] + + uprn_sample = sample(df["UPRN"].unique().tolist(), DIR_SAMPLE_SIZE) + df_sample = df[df["UPRN"].isin(uprn_sample)] + # Take the record with the newest LODGEMENT_DATETIME by uprn + df_sample = df_sample.sort_values("LODGEMENT_DATETIME", ascending=False).drop_duplicates("UPRN") + # Convert the columns to lower case and replace underscores with hyphens, the same as the api + df_sample.columns = df_sample.columns.str.lower().str.replace("_", "-") + + # For each epc, we test the estimation process + for _, epc in df_sample.iterrows(): + epc = epc.to_dict() + address1 = epc["address1"] + postcode = epc["postcode"] + + # Get all EPCs for this urpn and we make sure they get dropped from the estimate_epc function + epcs_for_uprn = df[df["UPRN"] == epc["uprn"]] + lmks_to_drop = epcs_for_uprn["LMK_KEY"].tolist() + searcher = SearchEpc(address1, postcode, auth_token=EPC_AUTH_TOKEN, os_api_key="") + searcher.uprn = epc["uprn"] + + estimated_epc = searcher.estimate_epc( + property_type=epc["property-type"], built_form=epc["built-form"], lmks_to_drop=lmks_to_drop + ) + + # We now compare the difference between the estimated and original + numeric_performance = { + key: check_numeric_performance(estimated_epc[key], epc[key]) for key, value in + numerical_vartypes.items() + } + + # Remove Nones + numeric_performance = {key: value for key, value in numeric_performance.items() if value is not None} + # Get an average + numeric_performance = sum(numeric_performance.values()) / len(numeric_performance) + + # categorical performance + categorical_performance = { + key: 0 if estimated_epc[key] != epc[key] else 1 for key, value in str_var_types.items() + } + # Get an average + categorical_performance = sum(categorical_performance.values()) / len(categorical_performance) + + results.append( + { + "uprn": epc["uprn"], + "numeric_performance": numeric_performance, + "categorical_performance": categorical_performance + } + )