expanding search method to find epcs if there are none in the postcode

This commit is contained in:
Khalim Conn-Kowlessar 2024-01-02 17:54:14 +00:00
parent 68458f1b91
commit e46a3d0a19
2 changed files with 200 additions and 44 deletions

View file

@ -398,7 +398,81 @@ class SearchEpc:
return mode_value
def estimate_epc(self, property_type, built_form):
def fetch_nearby_epcs(
self, initial_postcode: str,
lmks_to_drop: list[str] | None = None,
built_form: str = "",
property_type: str = ""
):
"""
Fetches and processes EPC data for a given initial postcode, applying successive trimming
to the postcode and filtering the data until a non-empty result set is found.
The function queries the EPC API with the provided postcode, and if no data is found or
if the data doesn't meet certain criteria, it progressively shortens the postcode by
removing the last character and retries the query. This process continues until a valid
set of EPC data is obtained or the postcode is exhausted.
Additional filtering is applied to the obtained EPC data based on 'lmk-key', 'built-form',
and 'property-type'. The data is also processed to extract and numerically interpret house
numbers, calculate house number distances, and apply weights based on these distances.
:param initial_postcode: The initial full postcode for the EPC data query.
:param lmks_to_drop: List of 'lmk-key' values to be excluded from the EPC data.
:param built_form: The 'built-form' value to be used for filtering the EPC data.
:param property_type: The 'property-type' value to be used for filtering the EPC data.
:return:
"""
postcode = initial_postcode
while postcode:
# Fetch data from EPC API
epc_response = self.get_epc(params={"postcode": postcode}, size=100)
if epc_response["status"] == 200:
epc_data = pd.DataFrame(self.data["rows"])
if lmks_to_drop is not None:
epc_data = epc_data[~epc_data["lmk-key"].isin(lmks_to_drop)]
if not epc_data.empty:
# Further processing of the EPC data
epc_data = epc_data.sort_values("lodgement-datetime", ascending=False).groupby("uprn").head(1)
epc_data["house_number"] = epc_data["address"].apply(lambda add1: self.get_house_number(add1))
epc_data["numeric_house_number"] = epc_data["house_number"].apply(
lambda house_num: self.extract_numeric_housenumber_part(house_num)
)
epc_data["house_number_distance"] = abs(
epc_data["numeric_house_number"] - self.numeric_house_number
)
epc_data["weight"] = 1 / epc_data["house_number_distance"]
epc_built_form = self._estimate_str(key="built-form", estimation_data=epc_data)
epc_property_type = self._estimate_str(key="property-type", estimation_data=epc_data)
if built_form == "Semi-Detached" and epc_built_form in ["End-Terraced", "Mid-Terraced"]:
estimation_built_form = "End-Terraced"
elif built_form == "":
estimation_built_form = epc_built_form
else:
estimation_built_form = built_form
estimation_property_type = epc_property_type if property_type == "" else property_type
epc_data = epc_data[
(epc_data["built-form"] == estimation_built_form) & (
epc_data["property-type"] == estimation_property_type)
]
if not epc_data.empty:
return epc_data # Return the filtered data if it's not empty
# Shorten the postcode by one character for the next iteration
postcode = postcode[:-1]
# If loop finishes without a valid response, raise an exception
raise Exception("Unable to find postcode data after trimming - investigate me")
def estimate_epc(self, property_type, built_form, lmks_to_drop=None):
"""
For a property that does not have an EPC, we retrieve the EPC data for the closest properties
and estimate the EPC for the property in question.
@ -409,52 +483,20 @@ class SearchEpc:
the ordnance survey api
:param built_form: This is the built form of the property we are estimating, that can be retrieved from
the ordnance survey api
:param lmks_to_drop: This is a list of LMK keys that should be dropped from the estimation process. This
is used as an override for testing, to drop EPCs for the property we are testing
:return:
"""
# From the ordnance survey data, we want to determine the property type and then use only similar property
# types for the estimation
# We firstly get the first 100 properties for the postcode, from the EPC api
epc_reponse = self.get_epc(params={"postcode": self.postcode}, size=100)
if epc_reponse["status"] != 200:
raise Exception("Unable to find postcode data - investigate me")
epc_data = pd.DataFrame(self.data["rows"])
# We now get the newest EPC per uprn
epc_data = epc_data.sort_values("lodgement-datetime", ascending=False).groupby("uprn").head(1)
# For each record, parse the house number. We'll use this to identify the closest properties
epc_data["house_number"] = epc_data["address"].apply(lambda add1: self.get_house_number(add1))
# We convert the house number fo a purely numeric format - This numeric house number will be used as
# a distance weight when estimating the EPC
epc_data["numeric_house_number"] = epc_data["house_number"].apply(
lambda house_num: self.extract_numeric_housenumber_part(house_num)
# types for the estimation process
epc_data = self.fetch_nearby_epcs(
initial_postcode=self.postcode,
lmks_to_drop=lmks_to_drop,
built_form=built_form,
property_type=property_type
)
epc_data["house_number_distance"] = abs(epc_data["numeric_house_number"] - self.numeric_house_number)
epc_data["weight"] = 1 / epc_data["house_number_distance"]
epc_built_form = self._get_epc_mode(col="built-form", epc_data=epc_data)
epc_property_type = self._get_epc_mode(col="property-type", epc_data=epc_data)
# We check if the EPC built form is one of the terraced values. If the os_built_form is semi-detached,
# then we set it to be end terraced
if built_form == "Semi-Detached" and epc_built_form in ["End-Terraced", "Mid-Terraced"]:
estimation_built_form = "End-Terraced"
elif built_form == "":
estimation_built_form = epc_built_form
else:
estimation_built_form = built_form
estimation_property_type = epc_property_type if property_type == "" else property_type
# We filter the EPC data on just the property types we want to use
epc_data = epc_data[
(epc_data["built-form"] == estimation_built_form) & (epc_data["property-type"] == estimation_property_type)
]
epc_data['lodgement-datetime'] = pd.to_datetime(epc_data['lodgement-datetime'])
# For each attribute, we need to determine the datatype and use an appropriate method
# to estimate.
@ -463,7 +505,7 @@ class SearchEpc:
epc_data[key] = np.where(pd.isnull(epc_data[key]), None, epc_data[key])
epc_data[key] = np.where(epc_data[key] == "", None, epc_data[key])
epc_data[key] = epc_data[key].astype(vartype)
estimation_data = epc_data[[key, "weight"]]
estimation_data = epc_data[[key, "weight", "lodgement-datetime"]]
estimation_data = estimation_data[~pd.isnull(estimation_data[key])]
if estimation_data.shape[0] == 0:
@ -501,7 +543,13 @@ class SearchEpc:
agg = estimation_data.groupby(key)["weight"].sum().reset_index()
agg = agg[agg["weight"] == agg["weight"].max()]
if agg.shape[0] != 1:
raise NotImplementedError("implement me")
# If we have multiple modes, we take the more recent data on average
recent_grouped = estimation_data[
estimation_data[key].isin(agg[key].values)
].groupby(key)["lodgement-datetime"].mean()
newest_group = recent_grouped.idxmax()
return newest_group
return agg[key].values[0]

View file

@ -0,0 +1,108 @@
from pathlib import Path
from random import choices, sample
import os
import pandas as pd
from tqdm import tqdm
from dotenv import load_dotenv
from utils.logger import setup_logger
from backend.SearchEpc import SearchEpc, vartypes
ENV_FILE = Path(__file__).parent / "backend" / ".env"
logger = setup_logger()
DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates"
DIR_SAMPLE_SIZE = 50
N_DIRECTORIES = 25
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
load_dotenv(ENV_FILE)
def check_numeric_performance(estimated_value, actual_value):
# If we don't have anything to compare against, return None
if pd.isnull(actual_value):
return None
if pd.isnull(estimated_value):
return 1
if actual_value == 0 and estimated_value == 0:
return 0
return abs(estimated_value - actual_value) / actual_value
def app():
"""
This script is used to test the EPC estimation process.
"""
numerical_vartypes = {key: value for key, value in vartypes.items() if value in ["float", "Int64"]}
str_var_types = {key: value for key, value in vartypes.items() if value == "str"}
# Make sure we have missed any keys
if len(numerical_vartypes) + len(str_var_types) != len(vartypes):
raise ValueError("Not all vartypes have been accounted for")
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
directory_sample = choices(directories, k=N_DIRECTORIES)
results = []
for directory in tqdm(directory_sample):
filepath = directory / "certificates.csv"
df = pd.read_csv(filepath, low_memory=False)
df["UPRN"] = df["UPRN"].astype("Int64").astype("str")
df = df[~pd.isnull(df["UPRN"])]
uprn_sample = sample(df["UPRN"].unique().tolist(), DIR_SAMPLE_SIZE)
df_sample = df[df["UPRN"].isin(uprn_sample)]
# Take the record with the newest LODGEMENT_DATETIME by uprn
df_sample = df_sample.sort_values("LODGEMENT_DATETIME", ascending=False).drop_duplicates("UPRN")
# Convert the columns to lower case and replace underscores with hyphens, the same as the api
df_sample.columns = df_sample.columns.str.lower().str.replace("_", "-")
# For each epc, we test the estimation process
for _, epc in df_sample.iterrows():
epc = epc.to_dict()
address1 = epc["address1"]
postcode = epc["postcode"]
# Get all EPCs for this urpn and we make sure they get dropped from the estimate_epc function
epcs_for_uprn = df[df["UPRN"] == epc["uprn"]]
lmks_to_drop = epcs_for_uprn["LMK_KEY"].tolist()
searcher = SearchEpc(address1, postcode, auth_token=EPC_AUTH_TOKEN, os_api_key="")
searcher.uprn = epc["uprn"]
estimated_epc = searcher.estimate_epc(
property_type=epc["property-type"], built_form=epc["built-form"], lmks_to_drop=lmks_to_drop
)
# We now compare the difference between the estimated and original
numeric_performance = {
key: check_numeric_performance(estimated_epc[key], epc[key]) for key, value in
numerical_vartypes.items()
}
# Remove Nones
numeric_performance = {key: value for key, value in numeric_performance.items() if value is not None}
# Get an average
numeric_performance = sum(numeric_performance.values()) / len(numeric_performance)
# categorical performance
categorical_performance = {
key: 0 if estimated_epc[key] != epc[key] else 1 for key, value in str_var_types.items()
}
# Get an average
categorical_performance = sum(categorical_performance.values()) / len(categorical_performance)
results.append(
{
"uprn": epc["uprn"],
"numeric_performance": numeric_performance,
"categorical_performance": categorical_performance
}
)