mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
1077 lines
46 KiB
Python
1077 lines
46 KiB
Python
import os
|
|
import time
|
|
import re
|
|
|
|
from urllib.parse import urlencode
|
|
import usaddress
|
|
import pandas as pd
|
|
import numpy as np
|
|
from epc_api.client import EpcClient
|
|
from backend.OrdnanceSurvey import OrdnanceSuveyClient
|
|
from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes
|
|
from etl.epc_clean.epc_attributes.FloorAttributes import FloorAttributes
|
|
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
|
|
from BaseUtility import Definitions
|
|
from utils.logger import setup_logger
|
|
from typing import List
|
|
from thefuzz import process
|
|
from backend.app.utils import sap_to_epc
|
|
|
|
logger = setup_logger()
|
|
|
|
vartypes = {
|
|
'low-energy-fixed-light-count': "Int64",
|
|
# 'address': 'str',
|
|
# 'uprn-source': 'str',
|
|
'floor-height': 'float',
|
|
'heating-cost-potential': 'float',
|
|
'unheated-corridor-length': 'float',
|
|
'hot-water-cost-potential': 'float',
|
|
'construction-age-band': 'str',
|
|
'potential-energy-rating': 'str',
|
|
'mainheat-energy-eff': 'str',
|
|
'windows-env-eff': 'str',
|
|
'lighting-energy-eff': 'str',
|
|
'environment-impact-potential': "Int64",
|
|
'glazed-type': 'str',
|
|
'heating-cost-current': 'float',
|
|
# 'address3': 'str',
|
|
'mainheatcont-description': 'str',
|
|
'sheating-energy-eff': 'str',
|
|
'property-type': 'str',
|
|
'local-authority-label': 'str',
|
|
'fixed-lighting-outlets-count': "Int64",
|
|
'energy-tariff': 'str',
|
|
'mechanical-ventilation': 'str',
|
|
'hot-water-cost-current': 'str',
|
|
'county': 'str',
|
|
# 'postcode': 'str',
|
|
'solar-water-heating-flag': 'str',
|
|
'constituency': 'str',
|
|
'co2-emissions-potential': 'float',
|
|
'number-heated-rooms': 'float',
|
|
'floor-description': 'str',
|
|
'energy-consumption-potential': 'float',
|
|
'local-authority': 'str',
|
|
'built-form': 'str',
|
|
'number-open-fireplaces': "Int64",
|
|
'windows-description': 'str',
|
|
'glazed-area': 'str',
|
|
# 'inspection-date': str,
|
|
'mains-gas-flag': 'str',
|
|
'co2-emiss-curr-per-floor-area': 'float',
|
|
# 'address1': 'str',
|
|
'heat-loss-corridor': 'str',
|
|
'flat-storey-count': "Int64",
|
|
'constituency-label': 'str',
|
|
'roof-energy-eff': 'str',
|
|
'total-floor-area': 'float',
|
|
'building-reference-number': 'str',
|
|
'environment-impact-current': 'float',
|
|
'co2-emissions-current': 'float',
|
|
'roof-description': 'str',
|
|
'floor-energy-eff': 'str',
|
|
'number-habitable-rooms': 'float',
|
|
# 'address2': 'str',
|
|
'hot-water-env-eff': 'str',
|
|
'posttown': 'str',
|
|
'mainheatc-energy-eff': 'str',
|
|
'main-fuel': 'str',
|
|
'lighting-env-eff': 'str',
|
|
'windows-energy-eff': 'str',
|
|
'floor-env-eff': 'str',
|
|
'sheating-env-eff': 'str',
|
|
'lighting-description': 'str',
|
|
'roof-env-eff': 'str',
|
|
'walls-energy-eff': 'str',
|
|
'photo-supply': 'float',
|
|
'lighting-cost-potential': 'float',
|
|
'mainheat-env-eff': 'str',
|
|
'multi-glaze-proportion': 'float',
|
|
'main-heating-controls': 'str',
|
|
# 'lodgement-datetime',
|
|
'flat-top-storey': 'str',
|
|
'current-energy-rating': 'str',
|
|
'secondheat-description': 'str',
|
|
'walls-env-eff': 'str',
|
|
'transaction-type': 'str',
|
|
# 'uprn': "Int64",
|
|
'current-energy-efficiency': 'Int64',
|
|
'energy-consumption-current': 'float',
|
|
'mainheat-description': 'str',
|
|
'lighting-cost-current': 'float',
|
|
# 'lodgement-date',
|
|
'extension-count': "Int64",
|
|
'mainheatc-env-eff': 'str',
|
|
# 'lmk-key': 'str',
|
|
'wind-turbine-count': "Int64",
|
|
'tenure': 'str',
|
|
'floor-level': 'str',
|
|
'potential-energy-efficiency': "Int64",
|
|
'hot-water-energy-eff': 'str',
|
|
'low-energy-lighting': 'float',
|
|
'walls-description': 'str',
|
|
'hotwater-description': 'str'
|
|
}
|
|
|
|
|
|
class SearchEpc:
|
|
"""
|
|
Given address information about a home, this class is responsible for retrieving the EPC data associated
|
|
to the property.
|
|
|
|
For a home, we might have address lines 1, 2, 3 and 4, as well as a postcode.
|
|
|
|
Often, simply searching the EPC database with address line 1 and postcode will be enough to find
|
|
the property, but there are some cases where this is not true and we might need to utilise other
|
|
combinations about the home to find the property
|
|
"""
|
|
|
|
# If we create the uprn based on a hash, we mark it as simulated
|
|
UPRN_SOURCE_SIMULATED = "SIMULATED"
|
|
|
|
MAX_RETRIES = 5
|
|
|
|
SUCCESS = {
|
|
"status": 200,
|
|
"message": "success",
|
|
"error": None
|
|
}
|
|
|
|
NODATA = {
|
|
"status": 204,
|
|
"message": "no data",
|
|
"error": None
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
address1: str,
|
|
postcode: str,
|
|
auth_token: str,
|
|
os_api_key: str,
|
|
full_address: str | None = None,
|
|
max_retries: int = None,
|
|
uprn: [int, None] = None,
|
|
size=None,
|
|
property_type=None,
|
|
fast=False,
|
|
heating_system: [str, None] = None,
|
|
associated_uprns: [List[int] | None] = None
|
|
):
|
|
"""
|
|
Address lines 1 and postcode are mandatory fields. The other address lines are optional
|
|
but can be used to find the epc for the home, if address1 and postcode are insufficient
|
|
|
|
If you wish to run a strict property type search, please run set_strict_property_type_search()
|
|
|
|
:param address1: string, propery's address line 1
|
|
:param postcode: string, propery's postcode
|
|
:param full_address: string, optional parameter, the full address of the property
|
|
:param max_retries: int, optional, number of retries to make when searching the api
|
|
:param uprn: int, optional, the uprn of the property
|
|
:param size: int, optional, the number of results to return. If not provided, defaults to 25 which is the api's
|
|
default
|
|
:param property_type: str, optional, the property type of the property, if known before hand
|
|
:param fast: bool, optional, if true, the extract_epc_data method will skip some processing to return
|
|
results faster
|
|
:param heating_system: str, optional, the heating system of the property, if known before hand
|
|
:param associated_uprns: list of int, optional, list of associated uprns for the property. E.g. other
|
|
units in a block of flats
|
|
"""
|
|
|
|
self.address1 = address1
|
|
self.postcode = postcode
|
|
self.full_address = full_address if full_address is not None else self.address1
|
|
self.uprn = uprn
|
|
self.house_number = self.get_house_number(self.address1)
|
|
self.numeric_house_number = self.extract_numeric_housenumber_part(self.house_number)
|
|
self.associated_uprns = associated_uprns if associated_uprns is not None else []
|
|
|
|
# property attributes
|
|
self.heating_system = heating_system
|
|
|
|
self.max_retries = max_retries if max_retries is not None else self.MAX_RETRIES
|
|
|
|
self.client = EpcClient(auth_token=auth_token)
|
|
self.ordnance_survey_client = OrdnanceSuveyClient(
|
|
address=self.address1, postcode=self.postcode, api_key=os_api_key
|
|
)
|
|
|
|
self.data = None
|
|
self.newest_epc = None
|
|
self.older_epcs = None
|
|
self.full_sap_epc = None
|
|
self.metadata = None
|
|
self.strict_property_type_search = False
|
|
|
|
# These are the address and postcode values, which we store in the database
|
|
self.address_clean = None
|
|
self.postcode_clean = None
|
|
|
|
self.size = size if size is not None else 25
|
|
|
|
self.property_type = property_type
|
|
self.fast = fast
|
|
|
|
def set_strict_property_type_search(self):
|
|
"""
|
|
This method sets the strict property type search flag to True. When this flag is set, the search will
|
|
only return results that match the specified property type.
|
|
:return:
|
|
"""
|
|
self.strict_property_type_search = True
|
|
|
|
@staticmethod
|
|
def get_house_number(address: str, postcode=None) -> str | None:
|
|
"""
|
|
This method uses the usaddress library to parse an address and extract the primary house or flat number.
|
|
"""
|
|
|
|
try:
|
|
# Updated regex to catch house numbers including alphanumeric ones
|
|
pattern = r'(?i)(?:flat|apartment|room)\s*(\d+\w*)|^\s*(\d+\w*)'
|
|
match1 = re.search(pattern, address)
|
|
if match1:
|
|
return next(g for g in match1.groups() if g is not None)
|
|
|
|
pattern2 = r'(?i)(flat|apartment|room)\s*([a-zA-Z]?\d+[a-zA-Z]?)'
|
|
match2 = re.search(pattern2, address)
|
|
if match2:
|
|
return match2.group(2)
|
|
|
|
parsed = usaddress.parse(address)
|
|
# First, try to get the 'OccupancyIdentifier' if 'OccupancyType' is detected
|
|
for part, type_ in parsed:
|
|
if type_ == 'OccupancyIdentifier':
|
|
if postcode is not None:
|
|
if part == postcode.split(" ")[0]:
|
|
continue
|
|
if part == postcode.split(" ")[1]:
|
|
continue
|
|
return part.rstrip(",")
|
|
# This assumes the first 'OccupancyIdentifier' after 'OccupancyType' is the primary
|
|
# number
|
|
|
|
# Fallback to 'AddressNumber' if no 'OccupancyIdentifier' is found
|
|
address_number = next((part for part, type_ in parsed if type_ == 'AddressNumber'), None)
|
|
if address_number:
|
|
return address_number.replace(",", "") # Remove any trailing commas
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Error parsing address: {e}")
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def extract_numeric_housenumber_part(house_number: str | None) -> int | None:
|
|
# Regular expression to find the first occurrence of one or more digits
|
|
|
|
if house_number is None:
|
|
return None
|
|
|
|
match = re.search(r'\d+', house_number)
|
|
|
|
if match:
|
|
return int(match.group())
|
|
else:
|
|
return None
|
|
|
|
def _get_epc(self, params, size):
|
|
"""
|
|
To be called by get_epc() - not for external usage
|
|
"""
|
|
|
|
url = os.path.join(self.client.domestic.host, "search")
|
|
if size:
|
|
url += "?" + urlencode({k: v for k, v in {"size": size}.items() if v})
|
|
|
|
for retry in range(self.max_retries):
|
|
try:
|
|
response = self.client.domestic.call(method="get", url=url, params=params)
|
|
if response:
|
|
self.data = response
|
|
return {
|
|
"response": response,
|
|
"msg": self.SUCCESS
|
|
}
|
|
|
|
if retry > 0:
|
|
logger.info("Failed previous attempt but retry successful")
|
|
# If we got nothing, final try
|
|
if not response:
|
|
return {
|
|
"response": response,
|
|
"msg": self.NODATA
|
|
}
|
|
|
|
except Exception as e:
|
|
if retry < self.max_retries - 1:
|
|
# If not the last retry, wait for 3 seconds before retrying
|
|
time.sleep(3)
|
|
else:
|
|
# If it's the last retry, we continue
|
|
return {
|
|
"response": {},
|
|
"msg": {
|
|
"status": 500,
|
|
"message": "Could not retrieve EPC data",
|
|
"error": str(e)
|
|
}
|
|
}
|
|
|
|
def get_epc(self, params=None, size=None):
|
|
# Get the EPC data with retries
|
|
size = size if size is not None else self.size
|
|
if params:
|
|
output = self._get_epc(params=params, size=size)
|
|
if output["msg"]["status"] == 200:
|
|
self.data = output["response"]
|
|
return output["msg"]
|
|
|
|
if not self.uprn and not self.address1 and not self.postcode:
|
|
raise ValueError("No search parameters provided")
|
|
|
|
uprn_params = {"uprn": self.uprn} if self.uprn else {}
|
|
address_params = {}
|
|
if self.address1:
|
|
address_params["address"] = self.address1
|
|
if self.postcode:
|
|
address_params["postcode"] = self.postcode
|
|
if self.strict_property_type_search and self.property_type:
|
|
address_params["property-type"] = self.property_type.lower()
|
|
|
|
# We attempt the search with uprn params
|
|
|
|
data = {"rows": []}
|
|
api_response = {}
|
|
if uprn_params:
|
|
api_response = self._get_epc(params=uprn_params, size=size)
|
|
if api_response["msg"]["status"] == 200:
|
|
data["rows"].extend(api_response["response"]["rows"])
|
|
|
|
# If we were unsuccessful, we then make a second attempt to fetch the data. We find that
|
|
# properties are sometimes listed under the wrong UPRN
|
|
if address_params:
|
|
api_response = self._get_epc(params=address_params, size=size)
|
|
if api_response["msg"]["status"] == 200:
|
|
# We update the data with the correct uprn
|
|
if self.uprn:
|
|
for x in api_response["response"]["rows"]:
|
|
if pd.isnull(x["uprn"]):
|
|
x["uprn"] = self.uprn
|
|
|
|
data["rows"].extend(api_response["response"]["rows"])
|
|
|
|
# We no de-dupe on lmk-key to avoid duplicates
|
|
seen = set()
|
|
data["rows"] = [
|
|
row for row in data["rows"]
|
|
if row["lmk-key"] not in seen and not seen.add(row["lmk-key"])
|
|
]
|
|
# Overwrite the data
|
|
self.data = data
|
|
|
|
if data["rows"]:
|
|
api_response["msg"] = self.SUCCESS
|
|
|
|
return api_response["msg"]
|
|
|
|
def filter_rows(self, rows, property_type=None, address=None):
|
|
"""
|
|
This method should not be used when property_type and address are both not None
|
|
:param rows:
|
|
:param property_type:
|
|
:param address:
|
|
:return:
|
|
"""
|
|
# Given the results from the EPC api, attempts to reduce the number of rows
|
|
uprns = {r["uprn"] for r in rows}
|
|
|
|
if (property_type is None) and (address is None):
|
|
return rows
|
|
|
|
unique_property_types = {r["property-type"] for r in rows}
|
|
|
|
is_just_a_house = (len(unique_property_types) == 1) & (
|
|
("House" in unique_property_types) | ("Bungalow" in unique_property_types)
|
|
)
|
|
|
|
# We allow for variation in property type across flats/maisonettes
|
|
# If we know that we have a flat/maisonette, we allow for both property types
|
|
# Make sure we have not JUST a house, or not JUST a flat/maisonette
|
|
if property_type in ["Flat", "Maisonette"] and not is_just_a_house:
|
|
if (((len(uprns) == 1) and ((len(unique_property_types) == 1)
|
|
) or unique_property_types == {"Flat", "Maisonette"})):
|
|
return rows
|
|
|
|
if property_type is not None:
|
|
# We can do a filter on the property type
|
|
rows_filtered = [r for r in rows if r["property-type"] == property_type]
|
|
|
|
if rows_filtered:
|
|
return rows_filtered
|
|
|
|
return rows
|
|
|
|
if address is not None:
|
|
# We can do a filter on the property type
|
|
# We check if the full address contains the postcode and if it does, remove
|
|
if self.postcode in address:
|
|
address = address.replace(self.postcode, "").strip().rstrip(",")
|
|
|
|
# We check if post town is included in the address
|
|
if any([r["posttown"].lower() in address.lower() for r in rows]):
|
|
best_match1 = process.extractOne(
|
|
address, [", ".join([r["address"], r["posttown"]]) for r in rows], score_cutoff=0
|
|
)
|
|
best_match2 = process.extractOne(
|
|
address, [", ".join([r["address"]]) for r in rows], score_cutoff=0
|
|
)
|
|
# Pick the largest score
|
|
if best_match1[1] == best_match2[1]:
|
|
# if thery're the same, we'll work under the assumption that the addresses are the same and we'll
|
|
# take whichever has the newest EPC
|
|
rows_filtered = [
|
|
r for r in rows
|
|
if (", ".join([r["address"], r["posttown"]]) == best_match1[0]) or
|
|
(r["address"] == best_match2[0])
|
|
]
|
|
rows_filtered = [
|
|
r for r in rows_filtered
|
|
if r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in rows_filtered])
|
|
]
|
|
|
|
elif best_match1[1] > best_match2[1]:
|
|
# Get all of the scores
|
|
rows_filtered = [r for r in rows if ", ".join([r["address"], r["posttown"]]) == best_match1[0]]
|
|
else:
|
|
# Get all of the scores
|
|
rows_filtered = [r for r in rows if r["address"] == best_match2[0]]
|
|
|
|
# If we have multiple, we filter on newest lodgment date
|
|
if len(rows_filtered) > 1:
|
|
rows_filtered = [
|
|
r for r in rows_filtered
|
|
if r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in rows_filtered])
|
|
]
|
|
|
|
else:
|
|
best_match = process.extractOne(address, [r["address"] for r in rows], score_cutoff=0)
|
|
# Get the UPRN for the best match
|
|
best_match_uprn = {r["uprn"] for r in rows if r["address"] == best_match[0]}.pop()
|
|
# Get all of the scores
|
|
rows_filtered = [
|
|
r for r in rows if (r["address"] == best_match[0]) or (r["uprn"] == best_match_uprn)
|
|
]
|
|
|
|
if rows_filtered:
|
|
return rows_filtered
|
|
|
|
return rows
|
|
|
|
raise ValueError("property type and address cannot both be None, at least one must be provided")
|
|
|
|
@staticmethod
|
|
def format_address(newest_epc):
|
|
"""
|
|
Format address and postcode for storage in the database
|
|
"""
|
|
postcode = newest_epc["postcode"]
|
|
address = newest_epc["address"]
|
|
|
|
# Format them
|
|
address = address.replace(postcode, "").strip()
|
|
address = address.rstrip(",").strip()
|
|
address = address.title()
|
|
|
|
postcode = postcode.upper()
|
|
|
|
return address, postcode
|
|
|
|
def extract_epc_data(self, address=None):
|
|
|
|
"""
|
|
Given a successful search, this method will format the data and return it
|
|
:return:
|
|
"""
|
|
|
|
if self.data is None:
|
|
raise ValueError("data is missing, run search first")
|
|
|
|
rows = self.data["rows"]
|
|
|
|
# We perform some checks on the rows
|
|
# Firstly, we should only have 1 urpn so if we have multiple, we'll need to filter down the
|
|
# property further
|
|
|
|
rows = self.filter_rows(rows, property_type=self.property_type, address=None)
|
|
rows = self.filter_rows(rows, property_type=None, address=address)
|
|
|
|
# We now check for a full sap epc:
|
|
full_sap_epc = [r for r in rows if r["transaction-type"] == "new dwelling"]
|
|
full_sap_epc = full_sap_epc[0] if full_sap_epc else {}
|
|
|
|
# Finally, we identify the newest epc and the rest, and then return
|
|
newest_epc, older_epcs = self.filter_newest_epc(list_of_epcs=rows)
|
|
|
|
# Ge the uprn from the newest record for this home
|
|
uprns = {r["uprn"] for r in rows if r["uprn"]}
|
|
# We can sometimes have no uprn for a property
|
|
if (len(uprns) == 0) and len(rows) > 0:
|
|
logger.warning("Found data but missing uprn")
|
|
elif len(uprns) != 1:
|
|
# There is a possibility that we have multiple UPRNs for a single property, which is an error
|
|
addresses = {r["address"] for r in rows}
|
|
if len(addresses) == 1:
|
|
# Take the uprn from the most recent
|
|
uprns = {newest_epc["uprn"]}
|
|
else:
|
|
raise ValueError("Multiple UPRNs found - investigate me")
|
|
|
|
if uprns:
|
|
uprn = uprns.pop()
|
|
else:
|
|
newest_epc["uprn-source"] = self.UPRN_SOURCE_SIMULATED
|
|
uprn = hash(self.address1 + self.postcode)
|
|
|
|
if self.fast:
|
|
return newest_epc, [], {}, "", "", None
|
|
|
|
# Retrieve postcode and address
|
|
address_epc, postcode_epc = self.format_address(newest_epc=newest_epc)
|
|
|
|
return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, uprn
|
|
|
|
@staticmethod
|
|
def filter_newest_epc(list_of_epcs: List):
|
|
newest_response = [
|
|
r for r in list_of_epcs if
|
|
r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in list_of_epcs])
|
|
]
|
|
|
|
if not newest_response:
|
|
return {}, []
|
|
|
|
if len(newest_response) != 1:
|
|
# It is possible (but rare, and likely an error on EPC lodgement) that we have multiple EPCs that
|
|
# were lodged at the exact same time. In this case, we will take the first one
|
|
newest_response = [newest_response[0]]
|
|
|
|
older_epcs = [epc for epc in list_of_epcs if epc["lmk-key"] != newest_response[0]["lmk-key"]]
|
|
|
|
return newest_response[0], older_epcs
|
|
|
|
@staticmethod
|
|
def _get_epc_mode(col: str, epc_data: pd.DataFrame):
|
|
"""
|
|
Simple method to extract the mode value from the EPC data
|
|
:param col: name of the column to take the mode of
|
|
:param epc_data: pandas dataframe of epc data
|
|
"""
|
|
|
|
mode_value = epc_data[[col]].mode(dropna=True)
|
|
if len(mode_value) != 1:
|
|
raise NotImplementedError("TODO: Handle multiple modes")
|
|
mode_value = mode_value.iloc[0][col]
|
|
|
|
return mode_value
|
|
|
|
def fetch_nearby_epcs(
|
|
self, initial_postcode: str,
|
|
lmks_to_drop: list[str] | None = None,
|
|
built_form: str = "",
|
|
property_type: str = "",
|
|
exclude_old: bool = False,
|
|
heating_system: [str, None] = None,
|
|
associated_uprns: [List[int] | None] = None
|
|
):
|
|
"""
|
|
Fetches and processes EPC data for a given initial postcode, applying successive trimming
|
|
to the postcode and filtering the data until a non-empty result set is found.
|
|
|
|
The function queries the EPC API with the provided postcode, and if no data is found or
|
|
if the data doesn't meet certain criteria, it progressively shortens the postcode by
|
|
removing the last character and retries the query. This process continues until a valid
|
|
set of EPC data is obtained or the postcode is exhausted.
|
|
|
|
Additional filtering is applied to the obtained EPC data based on 'lmk-key', 'built-form',
|
|
and 'property-type'. The data is also processed to extract and numerically interpret house
|
|
numbers, calculate house number distances, and apply weights based on these distances.
|
|
|
|
:param initial_postcode: The initial full postcode for the EPC data query.
|
|
:param lmks_to_drop: List of 'lmk-key' values to be excluded from the EPC data.
|
|
:param built_form: The 'built-form' value to be used for filtering the EPC data.
|
|
:param property_type: The 'property-type' value to be used for filtering the EPC data.
|
|
:param exclude_old: Flag to exclude EPC data older than 10 years.
|
|
:param heating_system: Optional heating system type for additional filtering.
|
|
:param associated_uprns: Optional list of associated UPRNs for additional filtering.
|
|
:return:
|
|
"""
|
|
|
|
associated_uprns_to_apply = [] if associated_uprns is None else associated_uprns.copy()
|
|
|
|
property_type_api_map = {
|
|
"Bungalow": "bungalow",
|
|
"Flat": "flat",
|
|
"House": "house",
|
|
"Maisonette": "maisonette",
|
|
"Park home": "park home",
|
|
}
|
|
|
|
postcode = initial_postcode
|
|
while postcode:
|
|
# Fetch data from EPC API
|
|
params = {"postcode": postcode}
|
|
if property_type:
|
|
params["property-type"] = property_type_api_map[property_type]
|
|
|
|
# We take the 20 nearest homes of the relevant type, so not to pull in too many irrelevant homes
|
|
epc_response = self.get_epc(params=params, size=100)
|
|
|
|
if epc_response["status"] == 200:
|
|
epc_data = pd.DataFrame(self.data["rows"])
|
|
|
|
if lmks_to_drop is not None:
|
|
epc_data = epc_data[~epc_data["lmk-key"].isin(lmks_to_drop)]
|
|
|
|
try:
|
|
epc_data['lodgement-datetime'] = pd.to_datetime(
|
|
epc_data['lodgement-datetime'], format='%Y-%m-%d %H:%M:%S', errors='coerce'
|
|
)
|
|
except Exception as e:
|
|
logger.error("Problem formatting lodgement-datime, appling fallback: " + str(e))
|
|
epc_data['lodgement-datetime'] = pd.to_datetime(epc_data['lodgement-datetime'], errors='coerce')
|
|
|
|
if exclude_old:
|
|
# Exclude EPC data older than 10 years
|
|
epc_data = epc_data[
|
|
epc_data["lodgement-datetime"] > (pd.Timestamp.now() - pd.DateOffset(years=10))
|
|
]
|
|
|
|
if not epc_data.empty:
|
|
# Further processing of the EPC data
|
|
|
|
epc_data = epc_data.sort_values("lodgement-datetime", ascending=False).groupby("uprn").head(1)
|
|
epc_data["house_number"] = epc_data["address"].apply(lambda add1: self.get_house_number(add1))
|
|
epc_data["numeric_house_number"] = epc_data["house_number"].apply(
|
|
lambda house_num: self.extract_numeric_housenumber_part(house_num)
|
|
)
|
|
|
|
if self.numeric_house_number is None:
|
|
# If we don't have a house number, we treat all weights as equal
|
|
epc_data["weight"] = 1
|
|
else:
|
|
epc_data["house_number_distance"] = abs(
|
|
epc_data["numeric_house_number"] - self.numeric_house_number
|
|
)
|
|
# # We add 1, just in case we have a 0 weight (e.g. comparing house number 7a to 7b, or 9A to 9)
|
|
# epc_data["weight"] = 1 / (epc_data["house_number_distance"] + 1)
|
|
# # If we have a home without a house number, fill that weight with average
|
|
# epc_data["weight"] = epc_data["weight"].fillna(epc_data["weight"].mean())
|
|
# # Finally, we might not have any house numbers whatsoever so everything could be
|
|
# # missing, so we fill with 1
|
|
# epc_data["weight"] = epc_data["weight"].fillna(1)
|
|
# TODO: Testing
|
|
# If the postcode is different from the initial postcode, it doesn't make sense to have
|
|
# any weightings
|
|
if all(pd.isnull(epc_data["house_number_distance"])) or (postcode != initial_postcode):
|
|
epc_data["weight"] = 1
|
|
else:
|
|
epc_data["weight"] = 1 / np.sqrt(epc_data["house_number_distance"] + 1)
|
|
epc_data["weight"] = epc_data["weight"].fillna(epc_data["weight"].mean())
|
|
|
|
estimation_property_type = self._estimate_str(
|
|
key="property-type", estimation_data=epc_data
|
|
) if property_type == "" else property_type
|
|
|
|
epc_built_form = self._estimate_str(
|
|
key="built-form",
|
|
estimation_data=epc_data[epc_data["property-type"] == estimation_property_type]
|
|
)
|
|
|
|
if built_form == "Semi-Detached" and epc_built_form in ["End-Terraced", "Mid-Terraced"]:
|
|
estimation_built_form = "End-Terraced"
|
|
elif (built_form == "") or (pd.isnull(built_form)):
|
|
estimation_built_form = epc_built_form
|
|
else:
|
|
estimation_built_form = built_form
|
|
|
|
# We handle some edge cases experiences with maisonettes - if built form is detatched, just filter
|
|
# on maisonette
|
|
# We also add some additional logic for Park homes, because they are far less common than other
|
|
# property types
|
|
|
|
is_maisonette_with_bad_built_form = (estimation_property_type == "Maisonette") & (
|
|
estimation_built_form in ["Detached", "Semi-Detached"]
|
|
)
|
|
|
|
is_park_home_without_built_form = (estimation_property_type == "Park home") & (
|
|
sum(epc_data["built-form"] == estimation_built_form) == 0
|
|
)
|
|
|
|
has_missing_built_form = not estimation_built_form
|
|
|
|
# If we have associated UPRNS, we just filter as such, otherwise
|
|
# we filter with built form and property type
|
|
if any(str(x) in epc_data["uprn"].astype(str).values for x in associated_uprns_to_apply):
|
|
# We check at least one UPRN is in the data
|
|
epc_data = epc_data[epc_data["uprn"].isin(associated_uprns_to_apply)]
|
|
# After we run this, we empty associated_uprns_to_apply.
|
|
# That ensures we don't keep re-applying this filter if we shorten the postcode again
|
|
# since we'll keep ending up in the same results
|
|
associated_uprns_to_apply = []
|
|
elif is_maisonette_with_bad_built_form or is_park_home_without_built_form or has_missing_built_form:
|
|
epc_data = epc_data[epc_data["property-type"] == estimation_property_type]
|
|
else:
|
|
epc_data = epc_data[
|
|
(epc_data["built-form"] == estimation_built_form) & (
|
|
epc_data["property-type"] == estimation_property_type)
|
|
]
|
|
|
|
if heating_system is not None:
|
|
epc_data = epc_data[
|
|
epc_data["mainheat-description"] == heating_system
|
|
]
|
|
|
|
if not epc_data.empty:
|
|
return epc_data # Return the filtered data if it's not empty
|
|
|
|
# Shorten the postcode by one character for the next iteration
|
|
postcode = postcode[:-1].rstrip()
|
|
|
|
# If loop finishes without a valid response, raise an exception
|
|
raise Exception("Unable to find postcode data after trimming - investigate me")
|
|
|
|
def estimate_epc(
|
|
self, property_type, built_form, lmks_to_drop=None, exclude_old=False, heating_system=None,
|
|
associated_uprns=None
|
|
):
|
|
"""
|
|
For a property that does not have an EPC, we retrieve the EPC data for the closest properties
|
|
and estimate the EPC for the property in question.
|
|
|
|
Note - do we have postcodes with just a single address? We would need to use a different approach
|
|
to find the closest homes
|
|
:param property_type: This is the property type of the property we are estimating, that can be retrieved from
|
|
the ordnance survey api
|
|
:param built_form: This is the built form of the property we are estimating, that can be retrieved from
|
|
the ordnance survey api
|
|
:param lmks_to_drop: This is a list of LMK keys that should be dropped from the estimation process. This
|
|
is used as an override for testing, to drop EPCs for the property we are testing
|
|
:param exclude_old: Used to drop any expired EPCs (more than 10 years old)
|
|
:param heating_system: The heating system of the property we are estimating, if known. Will aim to filter EPCs
|
|
to matching heating systems
|
|
:param associated_uprns: List of associated UPRNs for the property. E.g. other units in a block of flats
|
|
:return:
|
|
"""
|
|
|
|
# From the ordnance survey data, we want to determine the property type and then use only similar property
|
|
# types for the estimation process
|
|
epc_data = self.fetch_nearby_epcs(
|
|
initial_postcode=self.postcode,
|
|
lmks_to_drop=lmks_to_drop,
|
|
built_form=built_form,
|
|
property_type=property_type,
|
|
exclude_old=exclude_old,
|
|
heating_system=heating_system,
|
|
associated_uprns=associated_uprns
|
|
)
|
|
|
|
# Check if it's a new build EPC. A property that doesn't have an EPC is not going to be a new build
|
|
# so we avoid comparing it to new builds
|
|
# TODO - this is experimental
|
|
newer_age_bands = [
|
|
"England and Wales: 1996-2002", "England and Wales: 2003-2006", "England and Wales: 2007-2011",
|
|
"England and Wales: 2012 onwards"
|
|
]
|
|
|
|
if (~epc_data["construction-age-band"].isin(newer_age_bands)).sum():
|
|
# We have some older age bands, so we need to filter them out
|
|
epc_data = epc_data[~epc_data["construction-age-band"].isin(newer_age_bands)].copy()
|
|
|
|
# If we have missing lodgment date, we fill it with inspection-date
|
|
epc_data["lodgement-datetime"] = epc_data["lodgement-datetime"].fillna(epc_data["inspection-date"])
|
|
# If we still have missing dates, we set it to the mean of the non NA dates
|
|
epc_data["lodgement-datetime"] = epc_data["lodgement-datetime"].fillna(epc_data["lodgement-datetime"].mean())
|
|
|
|
# For each attribute, we need to determine the datatype and use an appropriate method
|
|
# to estimate.
|
|
estimated_epc = {}
|
|
for key, vartype in vartypes.items():
|
|
epc_data[key] = np.where(pd.isnull(epc_data[key]), None, epc_data[key])
|
|
epc_data[key] = np.where(epc_data[key] == "", None, epc_data[key])
|
|
estimation_data = epc_data[[key, "weight", "lodgement-datetime"]].copy()
|
|
estimation_data = estimation_data[~pd.isnull(estimation_data[key])]
|
|
estimation_data = estimation_data[~estimation_data[key].isin(Definitions.DATA_ANOMALY_MATCHES)]
|
|
|
|
if vartype == "Int64":
|
|
# We have some edge cases where we get the error "invalid literal for int() with base 10: '1.0'"
|
|
# so this handles this
|
|
estimation_data[key] = estimation_data[key].astype(float).astype(vartype)
|
|
else:
|
|
estimation_data[key] = estimation_data[key].astype(vartype)
|
|
|
|
if estimation_data.shape[0] == 0:
|
|
estimated_epc[key] = None
|
|
continue
|
|
|
|
if key == "floor-height":
|
|
# We speficially handle this, to avoid extreme values
|
|
# We check if we have any rows less than 3.5m
|
|
if estimation_data[estimation_data["floor-height"].astype(float) <= 3.5].shape[0] > 0:
|
|
# Perform the filter
|
|
estimation_data = estimation_data[estimation_data["floor-height"].astype(float) <= 3.5]
|
|
|
|
if vartype == "Int64":
|
|
estimated_value = self._estimate_int(estimation_data, key)
|
|
elif vartype == "float":
|
|
estimated_value = self._estimate_float(estimation_data, key)
|
|
elif vartype == "str":
|
|
estimated_value = self._estimate_str(estimation_data, key)
|
|
else:
|
|
raise NotImplementedError("estimation method not implemented for type")
|
|
|
|
estimated_epc[key] = estimated_value
|
|
|
|
# Insert an estimated lodgement datetime, with a weighted average
|
|
estimated_epc["lodgement-datetime"] = self.calculate_weighted_lodgement_datetime(epc_data=epc_data)
|
|
# Extract logement date
|
|
# It is possible that there is still no lodgement date, so we need to handle this
|
|
if pd.isnull(estimated_epc["lodgement-datetime"]):
|
|
estimated_epc["lodgement-date"] = None
|
|
else:
|
|
estimated_epc["lodgement-date"] = estimated_epc["lodgement-datetime"].strftime("%Y-%m-%d")
|
|
|
|
estimated_epc["current-energy-rating"] = sap_to_epc(estimated_epc["current-energy-efficiency"])
|
|
|
|
# Convert the cost current and potential variables - to string integers
|
|
for variable in ["heating-cost-current", "hot-water-cost-current", "lighting-cost-current",
|
|
"heating-cost-potential", "hot-water-cost-potential", "lighting-cost-potential"]:
|
|
estimated_epc[variable] = str(int(estimated_epc[variable]))
|
|
|
|
# This is a string
|
|
estimated_epc["low-energy-fixed-light-count"] = (
|
|
str(estimated_epc["low-energy-fixed-light-count"]) if estimated_epc["low-energy-fixed-light-count"] else ""
|
|
)
|
|
# This is an int
|
|
estimated_epc["photo-supply"] = (
|
|
int(np.round(estimated_epc["photo-supply"])) if estimated_epc["photo-supply"] else estimated_epc[
|
|
"photo-supply"]
|
|
)
|
|
|
|
estimated_epc["co2-emiss-curr-per-floor-area"] = (
|
|
estimated_epc["co2-emissions-current"] / estimated_epc["total-floor-area"]
|
|
)
|
|
|
|
estimated_epc["postcode"] = self.postcode
|
|
if not self.uprn:
|
|
# Update self.uprn too
|
|
self.uprn = hash(self.address1 + self.postcode)
|
|
|
|
estimated_epc["uprn"] = self.uprn
|
|
estimated_epc["address"] = self.full_address
|
|
# Indicate that this epc was estimated
|
|
estimated_epc["estimated"] = True
|
|
|
|
return estimated_epc
|
|
|
|
@staticmethod
|
|
def calculate_weighted_lodgement_datetime(epc_data):
|
|
numeric_dates = pd.to_datetime(epc_data['lodgement-datetime']).view('int64')
|
|
|
|
# Calculate the weighted sum of dates
|
|
weighted_sum = (numeric_dates * epc_data['weight']).sum()
|
|
|
|
# Calculate the sum of weights
|
|
total_weights = epc_data['weight'].sum()
|
|
|
|
# Calculate the weighted mean in numeric format
|
|
weighted_mean_numeric = weighted_sum / total_weights
|
|
|
|
# Convert the numeric weighted mean back to datetime
|
|
weighted_mean_datetime = pd.to_datetime(weighted_mean_numeric)
|
|
|
|
return weighted_mean_datetime
|
|
|
|
@staticmethod
|
|
def _estimate_int(estimation_data, key):
|
|
return round(np.average(a=estimation_data[key], weights=estimation_data["weight"]))
|
|
|
|
@staticmethod
|
|
def _estimate_float(estimation_data, key):
|
|
return round(np.average(a=estimation_data[key], weights=estimation_data["weight"]), 2)
|
|
|
|
@staticmethod
|
|
def _estimate_str(estimation_data, key):
|
|
agg = estimation_data.groupby(key)["weight"].sum().reset_index()
|
|
agg = agg[agg["weight"] == agg["weight"].max()]
|
|
if agg.shape[0] != 1:
|
|
# If we have multiple modes, we take the more recent data on average
|
|
recent_grouped = estimation_data[
|
|
estimation_data[key].isin(agg[key].values)
|
|
].groupby(key)["lodgement-datetime"].mean()
|
|
|
|
newest_group = recent_grouped.idxmax()
|
|
return newest_group
|
|
|
|
return agg[key].values[0]
|
|
|
|
def find_property(self, skip_os=False):
|
|
"""
|
|
This method will attempt to identify a property. It will, at first, use the EPC api to try and
|
|
find the EPC for the property and the associated UPRN. If this fails, it will use the Ordnance Survey API to
|
|
find the UPRN of the address.
|
|
|
|
Because no result may have been provided by the EPC api because of formatting issues with the address,
|
|
if the ordnance survey api is used and the uprn retrieved, the EPC api is queried again with the UPRN, just
|
|
as a final check to see if there is any EPC data.
|
|
|
|
If there is no EPC data, the epc data will be estimated based on the surrounding properties
|
|
"""
|
|
|
|
# Step 1: use the epc api to find the property and uprn
|
|
response = self.get_epc()
|
|
|
|
if response["status"] == 200:
|
|
(
|
|
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn
|
|
) = self.extract_epc_data(address=self.full_address)
|
|
return
|
|
|
|
# Step 2: If we don't have an EPC, we use the ordnance survey api to find the uprn
|
|
if skip_os:
|
|
if self.ordnance_survey_client.property_type is not None:
|
|
# We can try and estimate
|
|
estimated_epc = self.estimate_epc(
|
|
property_type=self.ordnance_survey_client.property_type,
|
|
built_form=self.ordnance_survey_client.built_form,
|
|
heating_system=self.heating_system,
|
|
associated_uprns=self.associated_uprns
|
|
)
|
|
self.newest_epc = estimated_epc
|
|
self.older_epcs = []
|
|
self.full_sap_epc = {}
|
|
|
|
# Finally, set a standardised address 1 and postcode
|
|
self.address_clean = (
|
|
self.ordnance_survey_client.address_os if self.ordnance_survey_client.address_os else self.address1
|
|
)
|
|
self.postcode_clean = (
|
|
self.ordnance_survey_client.postcode_os if self.ordnance_survey_client.postcode_os else
|
|
self.postcode
|
|
)
|
|
return
|
|
|
|
os_response = self.ordnance_survey_client.get_places_api()
|
|
|
|
if os_response["status"] != 200:
|
|
# Investigate this if it happens
|
|
raise Exception("Unable to find property - investigate me")
|
|
|
|
# Step 3: Now that we have a urpn, do another check against the epc api, this time searching with the uprn
|
|
self.uprn = self.ordnance_survey_client.most_relevant_result["UPRN"]
|
|
response = self.get_epc()
|
|
if response["status"] == 200:
|
|
(
|
|
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn
|
|
) = self.extract_epc_data()
|
|
return
|
|
|
|
# Step 4: If we still don't have an EPC, we estimate the EPC data
|
|
self.full_address = self.ordnance_survey_client.most_relevant_result["ADDRESS"]
|
|
estimated_epc = self.estimate_epc(
|
|
property_type=self.ordnance_survey_client.property_type,
|
|
built_form=self.ordnance_survey_client.built_form
|
|
)
|
|
self.newest_epc = estimated_epc
|
|
self.older_epcs = []
|
|
self.full_sap_epc = {}
|
|
|
|
# Finally, set a standardised address 1 and postcode
|
|
self.address_clean = self.ordnance_survey_client.address_os
|
|
self.postcode_clean = self.ordnance_survey_client.postcode_os
|
|
return
|
|
|
|
def check_attribute_variations(self):
|
|
attribute_map = {
|
|
"walls-description": {
|
|
"cleaner": WallAttributes,
|
|
"attribute": [
|
|
"is_cavity_wall", "is_solid_brick", "is_system_built", "is_timber_frame",
|
|
"is_granite_or_whinstone", "is_cob", "is_sandstone_or_limestone", "is_park_home"
|
|
],
|
|
"name": "has_wall_type_ever_varied"
|
|
},
|
|
"roof-description": {
|
|
"cleaner": RoofAttributes,
|
|
"attribute": [
|
|
"is_flat", "is_pitched", "is_roof_room", "is_thatched", "has_dwelling_above"
|
|
],
|
|
"name": "has_roof_type_ever_varied"
|
|
},
|
|
"floor-description": {
|
|
"cleaner": FloorAttributes,
|
|
"attribute": [
|
|
"is_to_unheated_space", "is_to_external_air", "is_suspended", "is_solid", "is_to_external_air",
|
|
],
|
|
"name": "has_floor_type_ever_varied"
|
|
}
|
|
}
|
|
|
|
attribute_variations = {}
|
|
for attribute, attribute_objs in attribute_map.items():
|
|
attribute_variations[attribute_objs["name"]] = False
|
|
cleaner = attribute_objs["cleaner"]
|
|
type_timeline = pd.DataFrame([cleaner(epc[attribute]).process() for epc in self.older_epcs] + [
|
|
cleaner(self.newest_epc[attribute]).process()
|
|
])
|
|
# For eac col in attribute_objs["attribute"] we check if the timeline has ever varied, i.e has gone
|
|
# from true to false
|
|
for col in attribute_objs["attribute"]:
|
|
if type_timeline[col].nunique() > 1:
|
|
attribute_variations[attribute_objs["name"]] = True
|
|
break
|
|
|
|
return attribute_variations
|
|
|
|
def identify_flat_floor(self):
|
|
# If there is no dwelling above, it is a top floor flat
|
|
processed_roof = RoofAttributes(self.newest_epc["roof-description"]).process()
|
|
if not processed_roof["has_dwelling_above"]:
|
|
return "top"
|
|
|
|
# We know that there is a dwelling above. If there's also a drwelling below, it is a mid floor flat
|
|
processed_floor = FloorAttributes(self.newest_epc["floor-description"]).process()
|
|
if processed_floor["another_property_below"]:
|
|
return "mid"
|
|
|
|
# Otherwise ground floor
|
|
return "ground"
|
|
|
|
def get_metadata(self):
|
|
if self.newest_epc is None:
|
|
raise ValueError("No EPC data available")
|
|
|
|
# We check if the property has ever been downgraded on SAP
|
|
has_sap_ever_downgraded = False
|
|
sap_timeline = [int(epc["current-energy-efficiency"]) for epc in self.older_epcs] + [
|
|
int(self.newest_epc["current-energy-efficiency"])
|
|
]
|
|
# We check if there has ever been a decrease by differencing
|
|
has_sap_ever_downgraded = any(np.diff(sap_timeline) < 0)
|
|
|
|
# We check if the wall type has ever varied over time
|
|
attribute_varations = self.check_attribute_variations()
|
|
|
|
# If the property is a flat, we distinguish between top, mid, ground floor
|
|
floor = None
|
|
if self.newest_epc["property-type"] == "Flat":
|
|
floor = self.identify_flat_floor()
|
|
|
|
self.metadata = {
|
|
"days_since_last_epc": (pd.Timestamp.now() - pd.Timestamp(self.newest_epc["lodgement-date"])).days,
|
|
"has_sap_ever_downgraded": has_sap_ever_downgraded,
|
|
"floor": floor,
|
|
**attribute_varations
|
|
}
|