Model/backend/SearchEpc.py
Khalim Conn-Kowlessar 852420a8fa handling more cases
2025-11-30 20:12:50 +00:00

1156 lines
50 KiB
Python

import os
import time
import re
from urllib.parse import urlencode
import usaddress
import pandas as pd
import numpy as np
from epc_api.client import EpcClient
from backend.OrdnanceSurvey import OrdnanceSuveyClient
from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes
from etl.epc_clean.epc_attributes.FloorAttributes import FloorAttributes
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
from BaseUtility import Definitions
from utils.logger import setup_logger
from typing import List
from thefuzz import process
from backend.app.utils import sap_to_epc
logger = setup_logger()
vartypes = {
'low-energy-fixed-light-count': "Int64",
# 'address': 'str',
# 'uprn-source': 'str',
'floor-height': 'float',
'heating-cost-potential': 'float',
'unheated-corridor-length': 'float',
'hot-water-cost-potential': 'float',
'construction-age-band': 'str',
'potential-energy-rating': 'str',
'mainheat-energy-eff': 'str',
'windows-env-eff': 'str',
'lighting-energy-eff': 'str',
'environment-impact-potential': "Int64",
'glazed-type': 'str',
'heating-cost-current': 'float',
# 'address3': 'str',
'mainheatcont-description': 'str',
'sheating-energy-eff': 'str',
'property-type': 'str',
'local-authority-label': 'str',
'fixed-lighting-outlets-count': "Int64",
'energy-tariff': 'str',
'mechanical-ventilation': 'str',
'hot-water-cost-current': 'str',
'county': 'str',
# 'postcode': 'str',
'solar-water-heating-flag': 'str',
'constituency': 'str',
'co2-emissions-potential': 'float',
'number-heated-rooms': 'float',
'floor-description': 'str',
'energy-consumption-potential': 'float',
'local-authority': 'str',
'built-form': 'str',
'number-open-fireplaces': "Int64",
'windows-description': 'str',
'glazed-area': 'str',
# 'inspection-date': str,
'mains-gas-flag': 'str',
'co2-emiss-curr-per-floor-area': 'float',
# 'address1': 'str',
'heat-loss-corridor': 'str',
'flat-storey-count': "Int64",
'constituency-label': 'str',
'roof-energy-eff': 'str',
'total-floor-area': 'float',
'building-reference-number': 'str',
'environment-impact-current': 'float',
'co2-emissions-current': 'float',
'roof-description': 'str',
'floor-energy-eff': 'str',
'number-habitable-rooms': 'float',
# 'address2': 'str',
'hot-water-env-eff': 'str',
'posttown': 'str',
'mainheatc-energy-eff': 'str',
'main-fuel': 'str',
'lighting-env-eff': 'str',
'windows-energy-eff': 'str',
'floor-env-eff': 'str',
'sheating-env-eff': 'str',
'lighting-description': 'str',
'roof-env-eff': 'str',
'walls-energy-eff': 'str',
'photo-supply': 'float',
'lighting-cost-potential': 'float',
'mainheat-env-eff': 'str',
'multi-glaze-proportion': 'float',
'main-heating-controls': 'str',
# 'lodgement-datetime',
'flat-top-storey': 'str',
'current-energy-rating': 'str',
'secondheat-description': 'str',
'walls-env-eff': 'str',
'transaction-type': 'str',
# 'uprn': "Int64",
'current-energy-efficiency': 'Int64',
'energy-consumption-current': 'float',
'mainheat-description': 'str',
'lighting-cost-current': 'float',
# 'lodgement-date',
'extension-count': "Int64",
'mainheatc-env-eff': 'str',
# 'lmk-key': 'str',
'wind-turbine-count': "Int64",
'tenure': 'str',
'floor-level': 'str',
'potential-energy-efficiency': "Int64",
'hot-water-energy-eff': 'str',
'low-energy-lighting': 'float',
'walls-description': 'str',
'hotwater-description': 'str'
}
class SearchEpc:
"""
Given address information about a home, this class is responsible for retrieving the EPC data associated
to the property.
For a home, we might have address lines 1, 2, 3 and 4, as well as a postcode.
Often, simply searching the EPC database with address line 1 and postcode will be enough to find
the property, but there are some cases where this is not true and we might need to utilise other
combinations about the home to find the property
"""
# If we create the uprn based on a hash, we mark it as simulated
UPRN_SOURCE_SIMULATED = "SIMULATED"
MAX_RETRIES = 5
SUCCESS = {
"status": 200,
"message": "success",
"error": None
}
NODATA = {
"status": 204,
"message": "no data",
"error": None
}
def __init__(
self,
address1: str,
postcode: str,
auth_token: str,
os_api_key: str,
full_address: str | None = None,
max_retries: int = None,
uprn: [int, None] = None,
size=None,
property_type=None,
fast=False,
heating_system: [str, None] = None,
associated_uprns: [List[int] | None] = None
):
"""
Address lines 1 and postcode are mandatory fields. The other address lines are optional
but can be used to find the epc for the home, if address1 and postcode are insufficient
If you wish to run a strict property type search, please run set_strict_property_type_search()
:param address1: string, propery's address line 1
:param postcode: string, propery's postcode
:param full_address: string, optional parameter, the full address of the property
:param max_retries: int, optional, number of retries to make when searching the api
:param uprn: int, optional, the uprn of the property
:param size: int, optional, the number of results to return. If not provided, defaults to 25 which is the api's
default
:param property_type: str, optional, the property type of the property, if known before hand
:param fast: bool, optional, if true, the extract_epc_data method will skip some processing to return
results faster
:param heating_system: str, optional, the heating system of the property, if known before hand
:param associated_uprns: list of int, optional, list of associated uprns for the property. E.g. other
units in a block of flats
"""
self.address1 = address1
self.postcode = postcode
self.full_address = full_address if full_address is not None else self.address1
self.uprn = uprn
self.house_number = self.get_house_number(self.address1)
self.numeric_house_number = self.extract_numeric_housenumber_part(self.house_number)
self.associated_uprns = associated_uprns if associated_uprns is not None else []
# property attributes
self.heating_system = heating_system
self.max_retries = max_retries if max_retries is not None else self.MAX_RETRIES
self.client = EpcClient(auth_token=auth_token)
self.ordnance_survey_client = OrdnanceSuveyClient(
address=self.address1, postcode=self.postcode, api_key=os_api_key
)
self.data = None
self.newest_epc = {}
self.older_epcs = None
self.full_sap_epc = None
self.metadata = None
self.strict_property_type_search = False
# These are the address and postcode values, which we store in the database
self.address_clean = None
self.postcode_clean = None
self.address_postal_town = None
self.size = size if size is not None else 25
self.property_type = property_type
self.fast = fast
# By default, this is set to false. This flag indicates whether we should overwrite SAP 2005 entires.
self.overwrite_sap05 = False
def set_strict_property_type_search(self):
"""
This method sets the strict property type search flag to True. When this flag is set, the search will
only return results that match the specified property type.
:return:
"""
self.strict_property_type_search = True
@staticmethod
def get_house_number(address: str, postcode=None) -> str | None:
"""
This method uses the usaddress library to parse an address and extract the primary house or flat number.
"""
try:
# Updated regex to catch house numbers including alphanumeric ones
pattern = r'(?i)(?:flat|apartment|room)\s*(\d+\w*)|^\s*(\d+\w*)'
match1 = re.search(pattern, address)
if match1:
return next(g for g in match1.groups() if g is not None)
pattern2 = r'(?i)(flat|apartment|room)\s*([a-zA-Z]?\d+[a-zA-Z]?)'
match2 = re.search(pattern2, address)
if match2:
return match2.group(2)
parsed = usaddress.parse(address)
# First, try to get the 'OccupancyIdentifier' if 'OccupancyType' is detected
for part, type_ in parsed:
if type_ == 'OccupancyIdentifier':
if postcode is not None:
if part == postcode.split(" ")[0]:
continue
if part == postcode.split(" ")[1]:
continue
return part.rstrip(",")
# This assumes the first 'OccupancyIdentifier' after 'OccupancyType' is the primary
# number
# Fallback to 'AddressNumber' if no 'OccupancyIdentifier' is found
address_number = next((part for part, type_ in parsed if type_ == 'AddressNumber'), None)
if address_number:
return address_number.replace(",", "") # Remove any trailing commas
except Exception as e:
raise Exception(f"Error parsing address: {e}")
return None
@staticmethod
def extract_numeric_housenumber_part(house_number: str | None) -> int | None:
# Regular expression to find the first occurrence of one or more digits
if house_number is None:
return None
match = re.search(r'\d+', house_number)
if match:
return int(match.group())
else:
return None
def _get_epc(self, params, size):
"""
To be called by get_epc() - not for external usage
"""
url = os.path.join(self.client.domestic.host, "search")
if size:
url += "?" + urlencode({k: v for k, v in {"size": size}.items() if v})
for retry in range(self.max_retries):
try:
response = self.client.domestic.call(method="get", url=url, params=params)
if response:
self.data = response
return {
"response": response,
"msg": self.SUCCESS
}
if retry > 0:
logger.info("Failed previous attempt but retry successful")
# If we got nothing, final try
if not response:
return {
"response": response,
"msg": self.NODATA
}
except Exception as e:
if retry < self.max_retries - 1:
# If not the last retry, wait for 3 seconds before retrying
time.sleep(3)
else:
# If it's the last retry, we continue
return {
"response": {},
"msg": {
"status": 500,
"message": "Could not retrieve EPC data",
"error": str(e)
}
}
def get_epc(self, params=None, size=None):
# Get the EPC data with retries
size = size if size is not None else self.size
if params:
output = self._get_epc(params=params, size=size)
if output["msg"]["status"] == 200:
self.data = output["response"]
return output["msg"]
if not self.uprn and not self.address1 and not self.postcode:
raise ValueError("No search parameters provided")
uprn_params = {"uprn": self.uprn} if self.uprn else {}
address_params = {}
if self.address1:
address_params["address"] = self.address1
if self.postcode:
address_params["postcode"] = self.postcode
if self.strict_property_type_search and self.property_type:
address_params["property-type"] = self.property_type.lower()
# We attempt the search with uprn params
data = {"rows": []}
api_response = {}
if uprn_params:
api_response = self._get_epc(params=uprn_params, size=size)
if api_response["msg"]["status"] == 200:
data["rows"].extend(api_response["response"]["rows"])
# If we were unsuccessful, we then make a second attempt to fetch the data. We find that
# properties are sometimes listed under the wrong UPRN
if address_params:
api_response = self._get_epc(params=address_params, size=size)
if api_response["msg"]["status"] == 200:
# We update the data with the correct uprn
if self.uprn:
for x in api_response["response"]["rows"]:
if pd.isnull(x["uprn"]):
x["uprn"] = self.uprn
data["rows"].extend(api_response["response"]["rows"])
# We no de-dupe on lmk-key to avoid duplicates
seen = set()
data["rows"] = [
row for row in data["rows"]
if row["lmk-key"] not in seen and not seen.add(row["lmk-key"])
]
# Overwrite the data
self.data = data
if data["rows"]:
api_response["msg"] = self.SUCCESS
return api_response["msg"]
def filter_rows(self, rows, property_type=None, address=None):
"""
This method should not be used when property_type and address are both not None
:param rows:
:param property_type:
:param address:
:return:
"""
# Given the results from the EPC api, attempts to reduce the number of rows
uprns = {r["uprn"] for r in rows}
if (property_type is None) and (address is None):
return rows
unique_property_types = {r["property-type"] for r in rows}
is_just_a_house = (len(unique_property_types) == 1) & (
("House" in unique_property_types) | ("Bungalow" in unique_property_types)
)
# We allow for variation in property type across flats/maisonettes
# If we know that we have a flat/maisonette, we allow for both property types
# Make sure we have not JUST a house, or not JUST a flat/maisonette
if property_type in ["Flat", "Maisonette"] and not is_just_a_house:
if (((len(uprns) == 1) and ((len(unique_property_types) == 1)
) or unique_property_types == {"Flat", "Maisonette"})):
return rows
if property_type is not None:
# We can do a filter on the property type
rows_filtered = [r for r in rows if r["property-type"] == property_type]
if rows_filtered:
return rows_filtered
return rows
if address is not None:
# We can do a filter on the property type
# We check if the full address contains the postcode and if it does, remove
if self.postcode in address:
address = address.replace(self.postcode, "").strip().rstrip(",")
# We check if post town is included in the address
if any([r["posttown"].lower() in address.lower() for r in rows]):
best_match1 = process.extractOne(
address, [", ".join([r["address"], r["posttown"]]) for r in rows], score_cutoff=0
)
best_match2 = process.extractOne(
address, [", ".join([r["address"]]) for r in rows], score_cutoff=0
)
# Pick the largest score
if best_match1[1] == best_match2[1]:
# if thery're the same, we'll work under the assumption that the addresses are the same and we'll
# take whichever has the newest EPC
rows_filtered = [
r for r in rows
if (", ".join([r["address"], r["posttown"]]) == best_match1[0]) or
(r["address"] == best_match2[0])
]
rows_filtered = [
r for r in rows_filtered
if r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in rows_filtered])
]
elif best_match1[1] > best_match2[1]:
# Get all of the scores
rows_filtered = [r for r in rows if ", ".join([r["address"], r["posttown"]]) == best_match1[0]]
else:
# Get all of the scores
rows_filtered = [r for r in rows if r["address"] == best_match2[0]]
# If we have multiple, we filter on newest lodgment date
if len(rows_filtered) > 1:
rows_filtered = [
r for r in rows_filtered
if r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in rows_filtered])
]
else:
best_match = process.extractOne(address, [r["address"] for r in rows], score_cutoff=0)
# Get the UPRN for the best match
best_match_uprn = {r["uprn"] for r in rows if r["address"] == best_match[0]}.pop()
# Get all of the scores
rows_filtered = [
r for r in rows if (r["address"] == best_match[0]) or (r["uprn"] == best_match_uprn)
]
if rows_filtered:
return rows_filtered
return rows
raise ValueError("property type and address cannot both be None, at least one must be provided")
@staticmethod
def format_address(newest_epc):
"""
Format address and postcode for storage in the database
"""
postcode = newest_epc["postcode"]
address = newest_epc["address"]
# Format them
address = address.replace(postcode, "").strip()
address = address.rstrip(",").strip()
address = address.title()
postcode = postcode.upper()
# We also return a "postal town variant - useful for edge cases when fetching from find my EPC
address_postal_town = ", ".join(
[newest_epc["address1"], newest_epc["address2"], newest_epc["posttown"]]).strip().title()
return address, postcode, address_postal_town
def extract_epc_data(self, address=None):
"""
Given a successful search, this method will format the data and return it
:return:
"""
if self.data is None:
raise ValueError("data is missing, run search first")
rows = self.data["rows"]
# We perform some checks on the rows
# Firstly, we should only have 1 urpn so if we have multiple, we'll need to filter down the
# property further
rows = self.filter_rows(rows, property_type=self.property_type, address=None)
rows = self.filter_rows(rows, property_type=None, address=address)
# We now check for a full sap epc:
full_sap_epc = [r for r in rows if r["transaction-type"] == "new dwelling"]
full_sap_epc = full_sap_epc[0] if full_sap_epc else {}
# Finally, we identify the newest epc and the rest, and then return
newest_epc, older_epcs = self.filter_newest_epc(list_of_epcs=rows)
# Ge the uprn from the newest record for this home
uprns = {r["uprn"] for r in rows if r["uprn"]}
# We can sometimes have no uprn for a property
if (len(uprns) == 0) and len(rows) > 0:
logger.warning("Found data but missing uprn")
elif len(uprns) != 1:
# There is a possibility that we have multiple UPRNs for a single property, which is an error
addresses = {r["address"] for r in rows}
if len(addresses) == 1:
# Take the uprn from the most recent
uprns = {newest_epc["uprn"]}
else:
raise ValueError("Multiple UPRNs found - investigate me")
if uprns:
uprn = uprns.pop()
# Convert to int
if not pd.isnull(uprn):
uprn = int(uprn)
else:
newest_epc["uprn-source"] = self.UPRN_SOURCE_SIMULATED
uprn = hash(self.address1 + self.postcode)
if self.fast:
return newest_epc, [], {}, "", "", None
# Retrieve postcode and address
address_epc, postcode_epc, address_postal_town = self.format_address(newest_epc=newest_epc)
return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, uprn, address_postal_town
@staticmethod
def filter_newest_epc(list_of_epcs: List):
newest_response = [
r for r in list_of_epcs if
r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in list_of_epcs])
]
if not newest_response:
return {}, []
if len(newest_response) != 1:
# It is possible (but rare, and likely an error on EPC lodgement) that we have multiple EPCs that
# were lodged at the exact same time. In this case, we will take the first one
newest_response = [newest_response[0]]
older_epcs = [epc for epc in list_of_epcs if epc["lmk-key"] != newest_response[0]["lmk-key"]]
return newest_response[0], older_epcs
@staticmethod
def _get_epc_mode(col: str, epc_data: pd.DataFrame):
"""
Simple method to extract the mode value from the EPC data
:param col: name of the column to take the mode of
:param epc_data: pandas dataframe of epc data
"""
mode_value = epc_data[[col]].mode(dropna=True)
if len(mode_value) != 1:
raise NotImplementedError("TODO: Handle multiple modes")
mode_value = mode_value.iloc[0][col]
return mode_value
def fetch_nearby_epcs(
self, initial_postcode: str,
lmks_to_drop: list[str] | None = None,
built_form: str = "",
property_type: str = "",
exclude_old: bool = False,
heating_system: [str, None] = None,
associated_uprns: [List[int] | None] = None
):
"""
Fetches and processes EPC data for a given initial postcode, applying successive trimming
to the postcode and filtering the data until a non-empty result set is found.
The function queries the EPC API with the provided postcode, and if no data is found or
if the data doesn't meet certain criteria, it progressively shortens the postcode by
removing the last character and retries the query. This process continues until a valid
set of EPC data is obtained or the postcode is exhausted.
Additional filtering is applied to the obtained EPC data based on 'lmk-key', 'built-form',
and 'property-type'. The data is also processed to extract and numerically interpret house
numbers, calculate house number distances, and apply weights based on these distances.
:param initial_postcode: The initial full postcode for the EPC data query.
:param lmks_to_drop: List of 'lmk-key' values to be excluded from the EPC data.
:param built_form: The 'built-form' value to be used for filtering the EPC data.
:param property_type: The 'property-type' value to be used for filtering the EPC data.
:param exclude_old: Flag to exclude EPC data older than 10 years.
:param heating_system: Optional heating system type for additional filtering.
:param associated_uprns: Optional list of associated UPRNs for additional filtering.
:return:
"""
associated_uprns_to_apply = [] if associated_uprns is None else associated_uprns.copy()
property_type_api_map = {
"Bungalow": "bungalow",
"Flat": "flat",
"House": "house",
"Maisonette": "maisonette",
"Park home": "park home",
}
postcode = initial_postcode
while postcode:
# Fetch data from EPC API
params = {"postcode": postcode}
if property_type:
params["property-type"] = property_type_api_map[property_type]
# We take the 20 nearest homes of the relevant type, so not to pull in too many irrelevant homes
epc_response = self.get_epc(params=params, size=100)
if epc_response["status"] == 200:
epc_data = pd.DataFrame(self.data["rows"])
if lmks_to_drop is not None:
epc_data = epc_data[~epc_data["lmk-key"].isin(lmks_to_drop)]
try:
epc_data['lodgement-datetime'] = pd.to_datetime(
epc_data['lodgement-datetime'], format='%Y-%m-%d %H:%M:%S', errors='coerce'
)
except Exception as e:
logger.error("Problem formatting lodgement-datime, appling fallback: " + str(e))
epc_data['lodgement-datetime'] = pd.to_datetime(epc_data['lodgement-datetime'], errors='coerce')
if exclude_old:
# Exclude EPC data older than 10 years
epc_data = epc_data[
epc_data["lodgement-datetime"] > (pd.Timestamp.now() - pd.DateOffset(years=10))
]
# Regardless of whether or not we exclude old, we drop any SAP05 entries, which will be problematic
# if we include them
if not epc_data.empty:
epc_data = epc_data[~epc_data["mainheat-description"].str.lower().str.contains("sap05:")]
if not epc_data.empty:
# Further processing of the EPC data
epc_data = epc_data.sort_values("lodgement-datetime", ascending=False).groupby("uprn").head(1)
epc_data["house_number"] = epc_data["address"].apply(lambda add1: self.get_house_number(add1))
epc_data["numeric_house_number"] = epc_data["house_number"].apply(
lambda house_num: self.extract_numeric_housenumber_part(house_num)
)
if self.numeric_house_number is None:
# If we don't have a house number, we treat all weights as equal
epc_data["weight"] = 1
else:
epc_data["house_number_distance"] = abs(
epc_data["numeric_house_number"] - self.numeric_house_number
)
# # We add 1, just in case we have a 0 weight (e.g. comparing house number 7a to 7b, or 9A to 9)
# epc_data["weight"] = 1 / (epc_data["house_number_distance"] + 1)
# # If we have a home without a house number, fill that weight with average
# epc_data["weight"] = epc_data["weight"].fillna(epc_data["weight"].mean())
# # Finally, we might not have any house numbers whatsoever so everything could be
# # missing, so we fill with 1
# epc_data["weight"] = epc_data["weight"].fillna(1)
# TODO: Testing
# If the postcode is different from the initial postcode, it doesn't make sense to have
# any weightings
if all(pd.isnull(epc_data["house_number_distance"])) or (postcode != initial_postcode):
epc_data["weight"] = 1
else:
epc_data["weight"] = 1 / np.sqrt(epc_data["house_number_distance"] + 1)
epc_data["weight"] = epc_data["weight"].fillna(epc_data["weight"].mean())
estimation_property_type = self._estimate_str(
key="property-type", estimation_data=epc_data
) if property_type == "" else property_type
epc_built_form = self._estimate_str(
key="built-form",
estimation_data=epc_data[epc_data["property-type"] == estimation_property_type]
)
if built_form == "Semi-Detached" and epc_built_form in ["End-Terraced", "Mid-Terraced"]:
estimation_built_form = "End-Terraced"
elif (built_form == "") or (pd.isnull(built_form)):
estimation_built_form = epc_built_form
elif built_form == "Enclosed Mid-Terrace":
# We check if we have any enclosed and if not, we fall back to mid-terrace
if sum(epc_data["built-form"] == "Enclosed Mid-Terrace") > 0:
estimation_built_form = "Enclosed Mid-Terrace"
else:
estimation_built_form = "Mid-Terrace"
elif built_form == "Enclosed End-Terrace":
# An enclosed end terrace has three two external facing walls so we fall back to mid-terrace
if sum(epc_data["built-form"] == "Enclosed End-Terrace") > 0:
estimation_built_form = "Enclosed Mid-Terrace"
else:
estimation_built_form = "Mid-Terrace"
else:
estimation_built_form = built_form
# We handle some edge cases experiences with maisonettes - if built form is detatched, just filter
# on maisonette
# We also add some additional logic for Park homes, because they are far less common than other
# property types
is_maisonette_with_bad_built_form = (estimation_property_type == "Maisonette") & (
estimation_built_form in ["Detached", "Semi-Detached"]
)
is_park_home_without_built_form = (estimation_property_type == "Park home") & (
sum(epc_data["built-form"] == estimation_built_form) == 0
)
has_missing_built_form = not estimation_built_form
# If we have associated UPRNS, we just filter as such, otherwise
# we filter with built form and property type
if any(str(x) in epc_data["uprn"].astype(str).values for x in associated_uprns_to_apply):
# We check at least one UPRN is in the data
epc_data = epc_data[epc_data["uprn"].isin(associated_uprns_to_apply)]
# After we run this, we empty associated_uprns_to_apply.
# That ensures we don't keep re-applying this filter if we shorten the postcode again
# since we'll keep ending up in the same results
associated_uprns_to_apply = []
elif is_maisonette_with_bad_built_form or is_park_home_without_built_form or has_missing_built_form:
epc_data = epc_data[epc_data["property-type"] == estimation_property_type]
else:
epc_data = epc_data[
(epc_data["built-form"] == estimation_built_form) & (
epc_data["property-type"] == estimation_property_type)
]
if heating_system is not None:
epc_data = epc_data[
epc_data["mainheat-description"] == heating_system
]
if not epc_data.empty:
return epc_data # Return the filtered data if it's not empty
# Shorten the postcode by one character for the next iteration
postcode = postcode[:-1].rstrip()
# If loop finishes without a valid response, raise an exception
raise Exception("Unable to find postcode data after trimming - investigate me")
def estimate_epc(
self, property_type, built_form, lmks_to_drop=None, exclude_old=False, heating_system=None,
associated_uprns=None
):
"""
For a property that does not have an EPC, we retrieve the EPC data for the closest properties
and estimate the EPC for the property in question.
Note - do we have postcodes with just a single address? We would need to use a different approach
to find the closest homes
:param property_type: This is the property type of the property we are estimating, that can be retrieved from
the ordnance survey api
:param built_form: This is the built form of the property we are estimating, that can be retrieved from
the ordnance survey api
:param lmks_to_drop: This is a list of LMK keys that should be dropped from the estimation process. This
is used as an override for testing, to drop EPCs for the property we are testing
:param exclude_old: Used to drop any expired EPCs (more than 10 years old)
:param heating_system: The heating system of the property we are estimating, if known. Will aim to filter EPCs
to matching heating systems
:param associated_uprns: List of associated UPRNs for the property. E.g. other units in a block of flats
:return:
"""
# From the ordnance survey data, we want to determine the property type and then use only similar property
# types for the estimation process
epc_data = self.fetch_nearby_epcs(
initial_postcode=self.postcode,
lmks_to_drop=lmks_to_drop,
built_form=built_form,
property_type=property_type,
exclude_old=exclude_old,
heating_system=heating_system,
associated_uprns=associated_uprns
)
# Check if it's a new build EPC. A property that doesn't have an EPC is not going to be a new build
# so we avoid comparing it to new builds
# TODO - this is experimental - if we have the year the property was built, we should utilise that
# here
newer_age_bands = [
"England and Wales: 1996-2002", "England and Wales: 2003-2006", "England and Wales: 2007-2011",
"England and Wales: 2012 onwards"
]
# We also remove EPCs that are for new dwellings
if (~epc_data["construction-age-band"].isin(newer_age_bands)).sum():
# We have some older age bands, so we need to filter them out
epc_data = epc_data[~epc_data["construction-age-band"].isin(newer_age_bands)].copy()
# If we have missing lodgment date, we fill it with inspection-date
epc_data["lodgement-datetime"] = epc_data["lodgement-datetime"].fillna(epc_data["inspection-date"])
# If we still have missing dates, we set it to the mean of the non NA dates
epc_data["lodgement-datetime"] = epc_data["lodgement-datetime"].fillna(epc_data["lodgement-datetime"].mean())
# For each attribute, we need to determine the datatype and use an appropriate method
# to estimate.
estimated_epc = {}
for key, vartype in vartypes.items():
epc_data[key] = np.where(pd.isnull(epc_data[key]), None, epc_data[key])
epc_data[key] = np.where(epc_data[key] == "", None, epc_data[key])
estimation_data = epc_data[[key, "weight", "lodgement-datetime"]].copy()
estimation_data = estimation_data[~pd.isnull(estimation_data[key])]
estimation_data = estimation_data[~estimation_data[key].isin(Definitions.DATA_ANOMALY_MATCHES)]
if vartype == "Int64":
# We have some edge cases where we get the error "invalid literal for int() with base 10: '1.0'"
# so this handles this
estimation_data[key] = estimation_data[key].astype(float).astype(vartype)
else:
estimation_data[key] = estimation_data[key].astype(vartype)
if estimation_data.shape[0] == 0:
estimated_epc[key] = None
continue
if key == "floor-height":
# We speficially handle this, to avoid extreme values
# We check if we have any rows less than 3.5m
if estimation_data[estimation_data["floor-height"].astype(float) <= 3.5].shape[0] > 0:
# Perform the filter
estimation_data = estimation_data[estimation_data["floor-height"].astype(float) <= 3.5]
if vartype == "Int64":
estimated_value = self._estimate_int(estimation_data, key)
elif vartype == "float":
estimated_value = self._estimate_float(estimation_data, key)
elif vartype == "str":
estimated_value = self._estimate_str(estimation_data, key)
else:
raise NotImplementedError("estimation method not implemented for type")
estimated_epc[key] = estimated_value
# Insert an estimated lodgement datetime, with a weighted average
estimated_epc["lodgement-datetime"] = self.calculate_weighted_lodgement_datetime(epc_data=epc_data)
# Extract logement date
# It is possible that there is still no lodgement date, so we need to handle this
if pd.isnull(estimated_epc["lodgement-datetime"]):
estimated_epc["lodgement-date"] = None
else:
estimated_epc["lodgement-date"] = estimated_epc["lodgement-datetime"].strftime("%Y-%m-%d")
estimated_epc["current-energy-rating"] = sap_to_epc(estimated_epc["current-energy-efficiency"])
# Convert the cost current and potential variables - to string integers
for variable in ["heating-cost-current", "hot-water-cost-current", "lighting-cost-current",
"heating-cost-potential", "hot-water-cost-potential", "lighting-cost-potential"]:
estimated_epc[variable] = str(int(estimated_epc[variable]))
# This is a string
estimated_epc["low-energy-fixed-light-count"] = (
str(estimated_epc["low-energy-fixed-light-count"]) if estimated_epc["low-energy-fixed-light-count"] else ""
)
# This is an int
estimated_epc["photo-supply"] = (
int(np.round(estimated_epc["photo-supply"])) if estimated_epc["photo-supply"] else estimated_epc[
"photo-supply"]
)
estimated_epc["co2-emiss-curr-per-floor-area"] = (
estimated_epc["co2-emissions-current"] / estimated_epc["total-floor-area"]
)
estimated_epc["postcode"] = self.postcode
if not self.uprn:
# Update self.uprn too
self.uprn = hash(self.address1 + self.postcode)
estimated_epc["uprn"] = self.uprn
estimated_epc["address"] = self.full_address
# Indicate that this epc was estimated
estimated_epc["estimated"] = True
return estimated_epc
@staticmethod
def calculate_weighted_lodgement_datetime(epc_data):
numeric_dates = pd.to_datetime(epc_data['lodgement-datetime']).view('int64')
# Calculate the weighted sum of dates
weighted_sum = (numeric_dates * epc_data['weight']).sum()
# Calculate the sum of weights
total_weights = epc_data['weight'].sum()
# Calculate the weighted mean in numeric format
weighted_mean_numeric = weighted_sum / total_weights
# Convert the numeric weighted mean back to datetime
weighted_mean_datetime = pd.to_datetime(weighted_mean_numeric)
return weighted_mean_datetime
@staticmethod
def _estimate_int(estimation_data, key):
return round(np.average(a=estimation_data[key], weights=estimation_data["weight"]))
@staticmethod
def _estimate_float(estimation_data, key):
return round(np.average(a=estimation_data[key], weights=estimation_data["weight"]), 2)
@staticmethod
def _estimate_str(estimation_data, key):
agg = estimation_data.groupby(key)["weight"].sum().reset_index()
agg = agg[agg["weight"] == agg["weight"].max()]
if agg.shape[0] != 1:
# If we have multiple modes, we take the more recent data on average
recent_grouped = estimation_data[
estimation_data[key].isin(agg[key].values)
].groupby(key)["lodgement-datetime"].mean()
newest_group = recent_grouped.idxmax()
return newest_group
return agg[key].values[0]
def find_property(self, skip_os=False, api_data=None, overwrite_sap05=False):
"""
This method will attempt to identify a property. It will, at first, use the EPC api to try and
find the EPC for the property and the associated UPRN. If this fails, it will use the Ordnance Survey API to
find the UPRN of the address.
Because no result may have been provided by the EPC api because of formatting issues with the address,
if the ordnance survey api is used and the uprn retrieved, the EPC api is queried again with the UPRN, just
as a final check to see if there is any EPC data.
If there is no EPC data, the epc data will be estimated based on the surrounding properties
:param skip_os: If True, the ordnance survey api will be skipped and only the EPC api will be used
:param api_data: If provided, this data will be used instead of querying the EPC api
:param overwrite_sap05: For extrememly old, SAP05 EPCs, we may wish to overwrite them with an estimated EPC.
This is because the SAP05 EPCs will have missing information such as the main heating
will be described as SAP05:Main-Heating, which isn't particularly useful for the
purpose of providing recommendations.
"""
# Step 1: use the epc api to find the property and uprn
if api_data:
self.data = api_data
response = {"status": 200}
else:
response = self.get_epc()
if response["status"] == 200:
(
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn,
self.address_postal_town
) = self.extract_epc_data(address=self.full_address)
# Before we return, we check if we need to overwrite a SAP05 EPC
# If we have don't have SAP05 in the heating description and overwrite_sap05 is False, we return
is_sap_o5 = "SAP05:" in self.newest_epc.get("mainheat-description", "")
good_data = not is_sap_o5 and (response["status"] == 200)
if good_data or not overwrite_sap05:
# If the data is fine, or we're preventing SAP05 overwrites, we just exit here
return
# By default, we don't exclude old but we will do, when we are estimating to overwrite a SAP05 EPC
lmks_to_drop, exclude_old = [], False
if is_sap_o5:
self.overwrite_sap05 = True
lmks_to_drop = [self.newest_epc["lmk-key"]]
exclude_old = True
# Step 2: If we don't have an EPC, we use the ordnance survey api to find the uprn
if skip_os:
if self.ordnance_survey_client.property_type is not None:
# We can try and estimate
estimated_epc = self.estimate_epc(
property_type=self.ordnance_survey_client.property_type,
built_form=self.ordnance_survey_client.built_form,
heating_system=self.heating_system,
associated_uprns=self.associated_uprns,
lmks_to_drop=lmks_to_drop,
exclude_old=exclude_old
)
if self.overwrite_sap05:
# We keep a record of the fact that we have performed a SAP05 overwrite
estimated_epc["sap_05_overwritten"] = True
# If we have overwritten a SAP05 EPC, we need to update older_epcs too
self.older_epcs = [] if not self.overwrite_sap05 else [self.newest_epc.copy()]
self.newest_epc = estimated_epc
self.full_sap_epc = {}
# Finally, set a standardised address 1 and postcode
self.address_clean = (
self.ordnance_survey_client.address_os if self.ordnance_survey_client.address_os else self.address1
)
self.postcode_clean = (
self.ordnance_survey_client.postcode_os if self.ordnance_survey_client.postcode_os else
self.postcode
)
return
os_response = self.ordnance_survey_client.get_places_api()
if os_response["status"] != 200:
# Investigate this if it happens
raise Exception("Unable to find property - investigate me")
# Step 3: Now that we have a urpn, do another check against the epc api, this time searching with the uprn
self.uprn = self.ordnance_survey_client.most_relevant_result["UPRN"]
response = self.get_epc()
if response["status"] == 200:
(
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn,
self.address_postal_town
) = self.extract_epc_data()
return
# Step 4: If we still don't have an EPC, we estimate the EPC data
self.full_address = self.ordnance_survey_client.most_relevant_result["ADDRESS"]
estimated_epc = self.estimate_epc(
property_type=self.ordnance_survey_client.property_type,
built_form=self.ordnance_survey_client.built_form
)
self.newest_epc = estimated_epc
self.older_epcs = []
self.full_sap_epc = {}
# Finally, set a standardised address 1 and postcode
self.address_clean = self.ordnance_survey_client.address_os
self.postcode_clean = self.ordnance_survey_client.postcode_os
return
def set_uprn_source(self, file_format):
"""
Utility function to set the uprn source based on the file format. Only works for domna_asset_lists
and this is very much placeholder until we standardised our input data formats
:param file_format:
:return:
"""
if not self.newest_epc:
raise ValueError("No EPC data available to set UPRN source - run find_property first")
if self.newest_epc.get("estimated") and file_format == "domna_asset_list" and (self.newest_epc["uprn"] < 0):
self.newest_epc["uprn-source"] = self.UPRN_SOURCE_SIMULATED
def check_attribute_variations(self):
attribute_map = {
"walls-description": {
"cleaner": WallAttributes,
"attribute": [
"is_cavity_wall", "is_solid_brick", "is_system_built", "is_timber_frame",
"is_granite_or_whinstone", "is_cob", "is_sandstone_or_limestone", "is_park_home"
],
"name": "has_wall_type_ever_varied"
},
"roof-description": {
"cleaner": RoofAttributes,
"attribute": [
"is_flat", "is_pitched", "is_roof_room", "is_thatched", "has_dwelling_above"
],
"name": "has_roof_type_ever_varied"
},
"floor-description": {
"cleaner": FloorAttributes,
"attribute": [
"is_to_unheated_space", "is_to_external_air", "is_suspended", "is_solid", "is_to_external_air",
],
"name": "has_floor_type_ever_varied"
}
}
attribute_variations = {}
for attribute, attribute_objs in attribute_map.items():
attribute_variations[attribute_objs["name"]] = False
cleaner = attribute_objs["cleaner"]
type_timeline = pd.DataFrame([cleaner(epc[attribute]).process() for epc in self.older_epcs] + [
cleaner(self.newest_epc[attribute]).process()
])
# For eac col in attribute_objs["attribute"] we check if the timeline has ever varied, i.e has gone
# from true to false
for col in attribute_objs["attribute"]:
if type_timeline[col].nunique() > 1:
attribute_variations[attribute_objs["name"]] = True
break
return attribute_variations
def identify_flat_floor(self):
# If there is no dwelling above, it is a top floor flat
processed_roof = RoofAttributes(self.newest_epc["roof-description"]).process()
if not processed_roof["has_dwelling_above"]:
return "top"
# We know that there is a dwelling above. If there's also a drwelling below, it is a mid floor flat
processed_floor = FloorAttributes(self.newest_epc["floor-description"]).process()
if processed_floor["another_property_below"]:
return "mid"
# Otherwise ground floor
return "ground"
def get_metadata(self):
if not self.newest_epc:
raise ValueError("No EPC data available")
# We check if the property has ever been downgraded on SAP
has_sap_ever_downgraded = False
sap_timeline = [int(epc["current-energy-efficiency"]) for epc in self.older_epcs] + [
int(self.newest_epc["current-energy-efficiency"])
]
# We check if there has ever been a decrease by differencing
has_sap_ever_downgraded = any(np.diff(sap_timeline) < 0)
# We check if the wall type has ever varied over time
attribute_varations = self.check_attribute_variations()
# If the property is a flat, we distinguish between top, mid, ground floor
floor = None
if self.newest_epc["property-type"] == "Flat":
floor = self.identify_flat_floor()
self.metadata = {
"days_since_last_epc": (pd.Timestamp.now() - pd.Timestamp(self.newest_epc["lodgement-date"])).days,
"has_sap_ever_downgraded": has_sap_ever_downgraded,
"floor": floor,
**attribute_varations
}