Merge pull request #268 from Hestia-Homes/interpolate-epc

Interpolate epc
This commit is contained in:
KhalimCK 2024-01-04 14:57:29 +00:00 committed by GitHub
commit 7af6be355e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 895 additions and 104 deletions

2
.idea/Model.iml generated
View file

@ -7,7 +7,7 @@
<sourceFolder url="file://$MODULE_DIR$/open_uprn" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/recommendations" isTestSource="false" />
</content>
<orderEntry type="jdk" jdkName="Python 3.10 (model_data)" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="Python 3.10 (backend)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyNamespacePackagesService">

2
.idea/misc.xml generated
View file

@ -3,7 +3,7 @@
<component name="Black">
<option name="sdkName" value="Python 3.10 (backend)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (model_data)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (backend)" project-jdk-type="Python SDK" />
<component name="PythonCompatibilityInspectionAdvertiser">
<option name="version" value="3" />
</component>

7
backend/DbClient.py Normal file
View file

@ -0,0 +1,7 @@
class DbClient:
def __init__(self):
"""
This class handles interaction with the database
"""
pass

105
backend/OrdnanceSurvey.py Normal file
View file

@ -0,0 +1,105 @@
from functools import lru_cache
import urllib.parse
import requests
from utils.logger import setup_logger
logger = setup_logger()
class OrdnanceSuveyClient:
def __init__(self, address, postcode, api_key):
"""
This class is tasked with interaction with the ordnance survey API.
:param address: The address for the property to search for
:param postcode: The postcode for the property to search for
"""
self.address = address
self.postcode = postcode
self.full_address = ", ".join([self.address, self.postcode])
self.api_key = api_key
self.results = None
self.most_relevant_result = None
self.property_type = None
self.built_form = None
# This will be postcode and address, as returned by the ordnance survey
self.address_os = None
self.postcode_os = None
def set_places_address(self):
"""
Given a response from the places api, this function will set the address and postcode of the property
"""
if self.most_relevant_result is None:
raise ValueError("No results found - run get_places_api first")
self.address_os = self.most_relevant_result["ADDRESS"]
self.postcode_os = self.most_relevant_result["POSTCODE"]
# We strip out the postcode from the address as this is already stored separately
self.address_os = self.address_os.replace(self.postcode_os, "").strip()
# Remove trailing comma
self.address_os = self.address_os.rstrip(",").strip()
# Convert to title case
self.address_os = self.address_os.title()
# Make sure postcode is upper case
self.postcode_os = self.postcode_os.upper()
@lru_cache(maxsize=128)
def get_places_api(self):
"""
This method is tasked with getting the places api from the Ordnance Survey.
"""
if not self.api_key:
raise ValueError("Ordnance Survey API key not specified")
encoded_address_query = urllib.parse.quote(self.full_address)
url = (f"https://api.os.uk/search/places/v1/find?query={encoded_address_query}&key="
f"{self.api_key}")
response = requests.get(url)
if response.status_code == 200:
data = response.json()
results = data['results']
self.results = results
# Extract some details about the best match
self.most_relevant_result = self.results[0]["DPA"]
self.parse_classification_code(self.most_relevant_result["CLASSIFICATION_CODE"])
self.set_places_address()
else:
logger.info("Could not find any results for the provided address and postcode")
return {"status": response.status_code}
def parse_classification_code(self, classification_code: str):
"""
This function will convert the classification code, returned by the OS places api, to a property type that is
compatible with the EPC database.
The various classifications cane be found here:
https://osdatahub.os.uk/docs/places/technicalSpecification
Under LPI Output, CLASSIFICATION_CODE is described, and a link is provided to the full table of classifications
For these purposes, we do not need the full classification as this includes non-residential properties. We only
parse the ones of interest to us
:return:
"""
value_map = {
# In the OS api, "RD" is a "Dwelling" however this is not valid property type in the EPC database
'RD': {},
'RD02': {'property_type': 'House', 'built_form': 'Detatched'},
'RD03': {'property_type': 'House', 'built_form': 'Semi-Detatched'},
'RD04': {'property_type': 'House', 'built_form': 'Mid-Terrace'},
'RD06': {'property_type': 'Flat'},
}
mapped = value_map.get(classification_code, {})
self.property_type = mapped.get("property_type", "")
self.built_form = mapped.get("built_form", "")

View file

@ -18,7 +18,6 @@ from recommendations.recommendation_utils import (
)
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev')
EPC_AUTH_TOKEN = os.environ.get('EPC_AUTH_TOKEN')
DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None)
logger = setup_logger()
@ -49,16 +48,18 @@ class Property(Definitions):
spatial = None
def __init__(self, id, postcode, address1, epc_client=None, data=None):
def __init__(self, id, address, postcode, data=None, old_data=None, full_sap_epc=None):
self.id = id
self.address = address
self.postcode = postcode
self.address1 = address1
self.data = data
self.old_data = None
self.old_data = old_data
self.full_sap_epc = full_sap_epc
self.property_dimensions = None
self.uprn = None
self.full_sap_epc = None
self.uprn = None if data is None else int(data["uprn"])
self.in_conservation_area, self.is_listed, self.is_heritage = None, None, None
self.restricted_measures = False
self.year_built = None
@ -92,47 +93,6 @@ class Property(Definitions):
self.current_adjusted_energy = None
self.expected_adjusted_energy = None
if epc_client:
self.epc_client = epc_client
else:
self.epc_client = EpcClient(auth_token=EPC_AUTH_TOKEN)
def search_address_epc(self):
"""
This method searches for an address in the EPC database and returns the first result
:return: property data
"""
if self.data:
return
# This will fail if a property does not have an EPC - this has been documented as a case to handle
response = self.epc_client.domestic.search(params={"address": self.address1, "postcode": self.postcode})
# Check if we have a full sap EPC
self.full_sap_epc = [r for r in response["rows"] if r["transaction-type"] == "new dwelling"]
self.full_sap_epc = self.full_sap_epc[0] if self.full_sap_epc else self.full_sap_epc
if len(response["rows"]) > 1:
newest_response = [
r for r in response["rows"] if
r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in response["rows"]])
]
if len(newest_response) > 1:
raise Exception("More than one result found for this address - investigate me")
# We'll keep old EPCs in case it contains information, not present on the newest one
self.old_data = [epc for epc in response["rows"] if epc["lmk-key"] != newest_response[0]["lmk-key"]]
response["rows"] = newest_response
self.data = response["rows"][0]
# For the moment, if we don't have a UPRN, we don't do anything about it, however we'll handle this in
# the future by using the Ordnance Survey places API
if not self.data["uprn"]:
logger.warning("We do not have a UPRN for this property")
else:
self.uprn = int(self.data["uprn"])
def set_energy(self):
"""
Extracts and formats data about the home's energy and co2 consumption
@ -282,6 +242,7 @@ class Property(Definitions):
if self.data["property-type"] == "Flat":
self.data["built-form"] = "Semi-Detached"
self.set_year_built()
self.set_energy()
self.set_ventilation()
self.set_solar_pv()
@ -498,7 +459,7 @@ class Property(Definitions):
"""
Utility function for usage in the lambda, for preparing the _rating fields
"""
return rating_lookup[field].value if field not in cls.DATA_ANOMALY_MATCHES else None
return rating_lookup[field].value if (field not in cls.DATA_ANOMALY_MATCHES) and (field is not None) else None
def get_property_details_epc(self, portfolio_id: int, rating_lookup):
@ -539,6 +500,7 @@ class Property(Definitions):
"primary_energy_consumption": self.energy["primary_energy_consumption"],
"co2_emissions": self.energy["co2_emissions"],
"adjusted_energy_consumption": self.current_adjusted_energy,
"estimated": self.data.get("estimated", False)
}
return property_details_epc

View file

@ -1,12 +1,114 @@
import os
import time
import re
import usaddress
import pandas as pd
import numpy as np
from epc_api.client import EpcClient
from backend.OrdnanceSurvey import OrdnanceSuveyClient
from BaseUtility import Definitions
from utils.logger import setup_logger
from typing import List
from fuzzywuzzy import process
logger = setup_logger()
vartypes = {
'low-energy-fixed-light-count': "Int64",
# 'address': 'str',
# 'uprn-source': 'str',
'floor-height': 'float',
'heating-cost-potential': 'float',
'unheated-corridor-length': 'float',
'hot-water-cost-potential': 'float',
'construction-age-band': 'str',
'potential-energy-rating': 'str',
'mainheat-energy-eff': 'str',
'windows-env-eff': 'str',
'lighting-energy-eff': 'str',
'environment-impact-potential': "Int64",
'glazed-type': 'str',
'heating-cost-current': 'float',
'address3': 'str',
'mainheatcont-description': 'str',
'sheating-energy-eff': 'str',
'property-type': 'str',
'local-authority-label': 'str',
'fixed-lighting-outlets-count': "Int64",
'energy-tariff': 'str',
'mechanical-ventilation': 'str',
'hot-water-cost-current': 'str',
'county': 'str',
'postcode': 'str',
'solar-water-heating-flag': 'str',
'constituency': 'str',
'co2-emissions-potential': 'float',
'number-heated-rooms': 'float',
'floor-description': 'str',
'energy-consumption-potential': 'float',
'local-authority': 'str',
'built-form': 'str',
'number-open-fireplaces': "Int64",
'windows-description': 'str',
'glazed-area': 'str',
# 'inspection-date': str,
'mains-gas-flag': 'str',
'co2-emiss-curr-per-floor-area': 'float',
'address1': 'str',
'heat-loss-corridor': 'str',
'flat-storey-count': "Int64",
'constituency-label': 'str',
'roof-energy-eff': 'str',
'total-floor-area': 'float',
'building-reference-number': 'str',
'environment-impact-current': 'float',
'co2-emissions-current': 'float',
'roof-description': 'str',
'floor-energy-eff': 'str',
'number-habitable-rooms': 'float',
'address2': 'str',
'hot-water-env-eff': 'str',
'posttown': 'str',
'mainheatc-energy-eff': 'str',
'main-fuel': 'str',
'lighting-env-eff': 'str',
'windows-energy-eff': 'str',
'floor-env-eff': 'str',
'sheating-env-eff': 'str',
'lighting-description': 'str',
'roof-env-eff': 'str',
'walls-energy-eff': 'str',
'photo-supply': 'float',
'lighting-cost-potential': 'float',
'mainheat-env-eff': 'str',
'multi-glaze-proportion': 'float',
'main-heating-controls': 'str',
# 'lodgement-datetime',
'flat-top-storey': 'str',
'current-energy-rating': 'str',
'secondheat-description': 'str',
'walls-env-eff': 'str',
'transaction-type': 'str',
# 'uprn': "Int64",
'current-energy-efficiency': 'float',
'energy-consumption-current': 'float',
'mainheat-description': 'str',
'lighting-cost-current': 'float',
# 'lodgement-date',
'extension-count': "Int64",
'mainheatc-env-eff': 'str',
'lmk-key': 'str',
'wind-turbine-count': "Int64",
'tenure': 'str',
'floor-level': 'str',
'potential-energy-efficiency': "Int64",
'hot-water-energy-eff': 'str',
'low-energy-lighting': 'float',
'walls-description': 'str',
'hotwater-description': 'str'
}
class SearchEpc:
"""
@ -38,9 +140,9 @@ class SearchEpc:
self,
address1: str,
postcode: str,
address2: str = None,
address3: str = None,
address4: str = None,
auth_token: str,
os_api_key: str,
full_address: str | None = None,
max_retries: int = None,
uprn: [int, None] = None,
size=None,
@ -50,9 +152,7 @@ class SearchEpc:
but can be used to find the epc for the home, if address1 and postcode are insufficient
:param address1: string, propery's address line 1
:param postcode: string, propery's postcode
:param address2: string, optional, propery's address line 2
:param address3: string, optional, propery's address line 3
:param address4: string, optional, propery's address line 4
:param full_address: string, optional parameter, the full address of the property
:param max_retries: int, optional, number of retries to make when searching the api
:param uprn: int, optional, the uprn of the property
:param size: int, optional, the number of results to return. If not provided, defaults to 25 which is the api's
@ -61,46 +161,102 @@ class SearchEpc:
self.address1 = address1
self.postcode = postcode
self.address2 = address2
self.address3 = address3
self.address4 = address4
self.full_address = full_address
self.uprn = uprn
self.house_number = self.get_house_number(self.address1)
self.numeric_house_number = self.extract_numeric_housenumber_part(self.house_number)
self.max_retries = max_retries if max_retries is not None else self.MAX_RETRIES
self.client = EpcClient(auth_token=os.getenv("EPC_AUTH_TOKEN"))
self.client = EpcClient(auth_token=auth_token)
self.ordnance_survey_client = OrdnanceSuveyClient(
address=self.address1, postcode=self.postcode, api_key=os_api_key
)
self.data = None
self.newest_epc = None
self.older_epcs = None
self.full_sap_epc = None
# These are the address and postcode values, which we store in the database
self.address_clean = None
self.postcode_clean = None
self.size = size if size is not None else 25
def search(self):
@classmethod
def get_house_number(cls, address: str) -> str | None:
"""
This method will use the usaddress library to parse an address and extract the house number
:return:
"""
parsed = usaddress.parse(address)
parsed_house_number = [x for x in parsed if (x[1] == "AddressNumber")]
parsed_house_number = parsed_house_number[0][0] if parsed_house_number else None
if parsed_house_number is None:
# Because usaddress isn't optimal for parsing addresses with some prefixes such as 'Flat',
# we also add a custom approach
# Pattern to look for 'Flat' or 'Apartment' followed by a number, or just a number at the beginning
pattern = r'(?i)(?:flat|apartment)\s*(\d+)|^\s*(\d+)'
match = re.search(pattern, address)
if match:
# Return the first non-None group found
return next(g for g in match.groups() if g is not None)
else:
return None
# Remove training commas
parsed_house_number = parsed_house_number.replace(",", "")
return parsed_house_number
@staticmethod
def extract_numeric_housenumber_part(house_number: str | None) -> int | None:
# Regular expression to find the first occurrence of one or more digits
if house_number is None:
return None
match = re.search(r'\d+', house_number)
if match:
return int(match.group())
else:
return None
def get_epc(self, params=None, size=None):
# Get the EPC data with retries
size = size if size is not None else self.size
if params is None:
if self.uprn:
params = {"uprn": self.uprn}
else:
params = {"address": self.address1, "postcode": self.postcode}
for retry in range(self.max_retries):
try:
if self.uprn:
if "uprn" in params:
# We use the direct call method inside, since we need to implement uprn as a valid
# parameter for the search function
url = os.path.join(self.client.domestic.host, "search")
response = self.client.domestic.call(method="get", url=url, params={"uprn": self.uprn})
response = self.client.domestic.call(method="get", url=url, params=params)
else:
response = self.client.domestic.search(
params={"address": self.address1, "postcode": self.postcode}, size=self.size
)
response = self.client.domestic.search(params=params, size=size)
if response:
self.data = response
return self.SUCCESS
if retry > 0:
print("Failed previous attempt but retry successful")
logger.info("Failed previous attempt but retry successful")
# If we got nothing, final try
if not response:
# TODO: Make a call to OS uprn service and get the address' uprn, just in case there is an
# issue with how we are searching the api
return {
"status": 204,
"message": "no data",
@ -162,7 +318,24 @@ class SearchEpc:
return rows
def retrieve(self, property_type=None, address=None):
@staticmethod
def format_address(newest_epc):
"""
Format address and postcode for storage in the database
"""
postcode = newest_epc["postcode"]
address = newest_epc["address"]
# Format them
address = address.replace(postcode, "").strip()
address = address.rstrip(",").strip()
address = address.title()
postcode = postcode.upper()
return address, postcode
def extract_epc_data(self, property_type=None, address=None):
"""
Given a successful search, this method will format the data and return it
@ -188,7 +361,16 @@ class SearchEpc:
# Finally, we identify the newest epc and the rest, and then return
newest_epc, older_epcs = self.filter_newest_epc(list_of_epcs=rows)
return newest_epc, older_epcs, full_sap_epc
# Retrieve postcode and address
address_epc, postcode_epc = self.format_address(newest_epc=newest_epc)
# Ge the uprn from the newest record for this home
uprns = {r["uprn"] for r in rows if r["uprn"]}
if len(uprns) != 1:
raise ValueError("Multiple UPRNs found - investigate me")
uprn = uprns.pop()
return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, uprn
@staticmethod
def filter_newest_epc(list_of_epcs: List):
@ -208,3 +390,311 @@ class SearchEpc:
older_epcs = [epc for epc in list_of_epcs if epc["lmk-key"] != newest_response[0]["lmk-key"]]
return newest_response[0], older_epcs
@staticmethod
def _get_epc_mode(col: str, epc_data: pd.DataFrame):
"""
Simple method to extract the mode value from the EPC data
:param col: name of the column to take the mode of
:param epc_data: pandas dataframe of epc data
"""
mode_value = epc_data[[col]].mode(dropna=True)
if len(mode_value) != 1:
raise NotImplementedError("TODO: Handle multiple modes")
mode_value = mode_value.iloc[0][col]
return mode_value
def fetch_nearby_epcs(
self, initial_postcode: str,
lmks_to_drop: list[str] | None = None,
built_form: str = "",
property_type: str = ""
):
"""
Fetches and processes EPC data for a given initial postcode, applying successive trimming
to the postcode and filtering the data until a non-empty result set is found.
The function queries the EPC API with the provided postcode, and if no data is found or
if the data doesn't meet certain criteria, it progressively shortens the postcode by
removing the last character and retries the query. This process continues until a valid
set of EPC data is obtained or the postcode is exhausted.
Additional filtering is applied to the obtained EPC data based on 'lmk-key', 'built-form',
and 'property-type'. The data is also processed to extract and numerically interpret house
numbers, calculate house number distances, and apply weights based on these distances.
:param initial_postcode: The initial full postcode for the EPC data query.
:param lmks_to_drop: List of 'lmk-key' values to be excluded from the EPC data.
:param built_form: The 'built-form' value to be used for filtering the EPC data.
:param property_type: The 'property-type' value to be used for filtering the EPC data.
:return:
"""
property_type_api_map = {
"Bungalow": "bungalow",
"Flat": "flat",
"House": "house",
"Maisonette": "maisonette",
"Park home": "park home",
}
postcode = initial_postcode
while postcode:
# Fetch data from EPC API
params = {"postcode": postcode}
if property_type:
params["property-type"] = property_type_api_map[property_type]
# We take the 20 nearest homes of the relevant type, so not to pull in too many irrelevant homes
epc_response = self.get_epc(params=params, size=100)
if epc_response["status"] == 200:
epc_data = pd.DataFrame(self.data["rows"])
if lmks_to_drop is not None:
epc_data = epc_data[~epc_data["lmk-key"].isin(lmks_to_drop)]
if not epc_data.empty:
# Further processing of the EPC data
epc_data['lodgement-datetime'] = pd.to_datetime(epc_data['lodgement-datetime'])
epc_data = epc_data.sort_values("lodgement-datetime", ascending=False).groupby("uprn").head(1)
epc_data["house_number"] = epc_data["address"].apply(lambda add1: self.get_house_number(add1))
epc_data["numeric_house_number"] = epc_data["house_number"].apply(
lambda house_num: self.extract_numeric_housenumber_part(house_num)
)
if self.numeric_house_number is None:
# If we don't have a house number, we treat all weights as equal
epc_data["weight"] = 1
else:
epc_data["house_number_distance"] = abs(
epc_data["numeric_house_number"] - self.numeric_house_number
)
# # We add 1, just in case we have a 0 weight (e.g. comparing house number 7a to 7b, or 9A to 9)
# epc_data["weight"] = 1 / (epc_data["house_number_distance"] + 1)
# # If we have a home without a house number, fill that weight with average
# epc_data["weight"] = epc_data["weight"].fillna(epc_data["weight"].mean())
# # Finally, we might not have any house numbers whatsoever so everything could be
# # missing, so we fill with 1
# epc_data["weight"] = epc_data["weight"].fillna(1)
# TODO: Testing
# If the postcode is different from the initial postcode, it doesn't make sense to have
# any weightings
if all(pd.isnull(epc_data["house_number_distance"])) or (postcode != initial_postcode):
epc_data["weight"] = 1
else:
epc_data["weight"] = 1 / np.sqrt(epc_data["house_number_distance"] + 1)
epc_data["weight"] = epc_data["weight"].fillna(epc_data["weight"].mean())
estimation_property_type = self._estimate_str(
key="property-type", estimation_data=epc_data
) if property_type == "" else property_type
epc_built_form = self._estimate_str(
key="built-form",
estimation_data=epc_data[epc_data["property-type"] == estimation_property_type]
)
if built_form == "Semi-Detached" and epc_built_form in ["End-Terraced", "Mid-Terraced"]:
estimation_built_form = "End-Terraced"
elif (built_form == "") or (pd.isnull(built_form)):
estimation_built_form = epc_built_form
else:
estimation_built_form = built_form
# We handle some edge cases experiences with maisonettes - if built form is detatched, just filter
# on maisonette
# We also add some additional logic for Park homes, because they are far less common than other
# property types
is_maisonette_with_bad_built_form = (estimation_property_type == "Maisonette") & (
estimation_built_form in ["Detached", "Semi-Detached"]
)
is_park_home_without_built_form = (estimation_property_type == "Park home") & (
sum(epc_data["built-form"] == estimation_built_form) == 0
)
has_missing_built_form = not estimation_built_form
if is_maisonette_with_bad_built_form or is_park_home_without_built_form or has_missing_built_form:
epc_data = epc_data[epc_data["property-type"] == estimation_property_type]
else:
epc_data = epc_data[
(epc_data["built-form"] == estimation_built_form) & (
epc_data["property-type"] == estimation_property_type)
]
if not epc_data.empty:
return epc_data # Return the filtered data if it's not empty
# Shorten the postcode by one character for the next iteration
postcode = postcode[:-1].rstrip()
# If loop finishes without a valid response, raise an exception
raise Exception("Unable to find postcode data after trimming - investigate me")
def estimate_epc(self, property_type, built_form, lmks_to_drop=None):
"""
For a property that does not have an EPC, we retrieve the EPC data for the closest properties
and estimate the EPC for the property in question.
Note - do we have postcodes with just a single address? We would need to use a different approach
to find the closest homes
:param property_type: This is the property type of the property we are estimating, that can be retrieved from
the ordnance survey api
:param built_form: This is the built form of the property we are estimating, that can be retrieved from
the ordnance survey api
:param lmks_to_drop: This is a list of LMK keys that should be dropped from the estimation process. This
is used as an override for testing, to drop EPCs for the property we are testing
:return:
"""
# From the ordnance survey data, we want to determine the property type and then use only similar property
# types for the estimation process
epc_data = self.fetch_nearby_epcs(
initial_postcode=self.postcode,
lmks_to_drop=lmks_to_drop,
built_form=built_form,
property_type=property_type
)
# For each attribute, we need to determine the datatype and use an appropriate method
# to estimate.
estimated_epc = {}
for key, vartype in vartypes.items():
epc_data[key] = np.where(pd.isnull(epc_data[key]), None, epc_data[key])
epc_data[key] = np.where(epc_data[key] == "", None, epc_data[key])
estimation_data = epc_data[[key, "weight", "lodgement-datetime"]].copy()
estimation_data = estimation_data[~pd.isnull(estimation_data[key])]
estimation_data = estimation_data[~estimation_data[key].isin(Definitions.DATA_ANOMALY_MATCHES)]
if vartype == "Int64":
# We have some edge cases where we get the error "invalid literal for int() with base 10: '1.0'"
# so this handles this
estimation_data[key] = estimation_data[key].astype(float).astype(vartype)
else:
estimation_data[key] = estimation_data[key].astype(vartype)
if estimation_data.shape[0] == 0:
estimated_epc[key] = None
continue
if vartype == "Int64":
estimated_value = self._estimate_int(estimation_data, key)
elif vartype == "float":
estimated_value = self._estimate_float(estimation_data, key)
elif vartype == "str":
estimated_value = self._estimate_str(estimation_data, key)
else:
raise NotImplementedError("estimation method not implemented for type")
estimated_epc[key] = estimated_value
# Insert an estimated lodgement datetime, with a weighted average
estimated_epc["lodgement-datetime"] = self.calculate_weighted_lodgement_datetime(epc_data=epc_data)
# Extract logement date
estimated_epc["lodgement-date"] = estimated_epc["lodgement-datetime"].strftime("%Y-%m-%d")
estimated_epc["postcode"] = self.postcode
estimated_epc["uprn"] = self.uprn
estimated_epc["address"] = self.full_address
# Indicate that this epc was estimated
estimated_epc["estimated"] = True
return estimated_epc
@staticmethod
def calculate_weighted_lodgement_datetime(epc_data):
numeric_dates = pd.to_datetime(epc_data['lodgement-datetime']).view('int64')
# Calculate the weighted sum of dates
weighted_sum = (numeric_dates * epc_data['weight']).sum()
# Calculate the sum of weights
total_weights = epc_data['weight'].sum()
# Calculate the weighted mean in numeric format
weighted_mean_numeric = weighted_sum / total_weights
# Convert the numeric weighted mean back to datetime
weighted_mean_datetime = pd.to_datetime(weighted_mean_numeric)
return weighted_mean_datetime
@staticmethod
def _estimate_int(estimation_data, key):
return round(np.average(a=estimation_data[key], weights=estimation_data["weight"]))
@staticmethod
def _estimate_float(estimation_data, key):
return round(np.average(a=estimation_data[key], weights=estimation_data["weight"]), 2)
@staticmethod
def _estimate_str(estimation_data, key):
agg = estimation_data.groupby(key)["weight"].sum().reset_index()
agg = agg[agg["weight"] == agg["weight"].max()]
if agg.shape[0] != 1:
# If we have multiple modes, we take the more recent data on average
recent_grouped = estimation_data[
estimation_data[key].isin(agg[key].values)
].groupby(key)["lodgement-datetime"].mean()
newest_group = recent_grouped.idxmax()
return newest_group
return agg[key].values[0]
def find_property(self):
"""
This method will attempt to identify a property. It will, at first, use the EPC api to try and
find the EPC for the property and the associated UPRN. If this fails, it will use the Ordnance Survey API to
find the UPRN of the address.
Because no result may have been provided by the EPC api because of formatting issues with the address,
if the ordnance survey api is used and the uprn retrieved, the EPC api is queried again with the UPRN, just
as a final check to see if there is any EPC data.
If there is no EPC data, the epc data will be estimated based on the surrounding properties
"""
# Step 1: use the epc api to find the property and uprn
response = self.get_epc()
if response["status"] == 200:
(
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn
) = self.extract_epc_data(address=self.full_address)
return
# Step 2: If we don't have an EPC, we use the ordnance survey api to find the uprn
os_response = self.ordnance_survey_client.get_places_api()
if os_response["status"] != 200:
# Investigate this if it happens
raise Exception("Unable to find property - investigate me")
# Step 3: Now that we have a urpn, do another check against the epc api, this time searching with the uprn
self.uprn = self.ordnance_survey_client.most_relevant_result["UPRN"]
response = self.get_epc()
if response["status"] == 200:
(
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn
) = self.extract_epc_data()
return
# Step 4: If we still don't have an EPC, we estimate the EPC data
self.full_address = self.ordnance_survey_client.most_relevant_result["ADDRESS"]
estimated_epc = self.estimate_epc(
property_type=self.ordnance_survey_client.property_type,
built_form=self.ordnance_survey_client.built_form
)
self.newest_epc = estimated_epc
self.older_epcs = []
self.full_sap_epc = {}
# Finally, set a standardised address 1 and postcode
self.address_clean = self.ordnance_survey_client.address_os
self.postcode_clean = self.ordnance_survey_client.postcode_os
return

View file

@ -13,6 +13,7 @@ class Settings(BaseSettings):
HEAT_PREDICTIONS_BUCKET: str
PLAN_TRIGGER_BUCKET: str
EPC_AUTH_TOKEN: str
ORDNANCE_SURVEY_API_KEY: str
DB_HOST: str
DB_PASSWORD: str
DB_USERNAME: str

View file

@ -11,7 +11,7 @@ from backend.app.db.models.portfolio import (
from sqlalchemy.orm.exc import NoResultFound
def create_property(session: Session, portfolio_id: int, address: str, postcode: str) -> (int, bool):
def create_property(session: Session, portfolio_id: int, address: str, postcode: str, uprn: str) -> (int, bool):
"""
This function will create a record for the property in the database if it does not exist.
If it does exist, it will just update the updated_at field.
@ -25,7 +25,7 @@ def create_property(session: Session, portfolio_id: int, address: str, postcode:
try:
# Attempt to fetch the existing property
existing_property = session.query(PropertyModel).filter_by(
address=address, postcode=postcode, portfolio_id=portfolio_id
uprn=uprn, portfolio_id=portfolio_id
).one()
# Update the 'updated_at' field
@ -43,6 +43,7 @@ def create_property(session: Session, portfolio_id: int, address: str, postcode:
address=address,
postcode=postcode,
portfolio_id=portfolio_id,
uprn=uprn,
creation_status=PropertyCreationStatus.LOADING,
status=PortfolioStatus.ASSESSMENT.value,
has_pre_condition_report=False,

View file

@ -153,6 +153,7 @@ class PropertyDetailsEpcModel(Base):
primary_energy_consumption = Column(Float)
co2_emissions = Column(Float)
adjusted_energy_consumption = Column(Float)
estimated = Column(Boolean, default=False)
class PropertyDetailsSpatial(Base):

View file

@ -2,7 +2,7 @@ from datetime import datetime
import numpy as np
import pandas as pd
from epc_api.client import EpcClient
from backend.SearchEpc import SearchEpc
from fastapi import APIRouter, Depends
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.orm import sessionmaker
@ -59,7 +59,6 @@ async def trigger_plan(body: PlanTriggerRequest):
try:
session.begin()
logger.info("Getting the inputs")
epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN)
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
uprn_filenames = read_dataframe_from_s3_parquet(
@ -72,16 +71,21 @@ async def trigger_plan(body: PlanTriggerRequest):
input_properties = []
for config in plan_input:
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
# TODO: implment validation. We should also standardise postcode and address in some fashion as
# a postcode of abcdef would be considered different to ABCDEF
epc_searcher = SearchEpc(
address1=config["address"],
postcode=config["postcode"],
auth_token=get_settings().EPC_AUTH_TOKEN,
os_api_key=get_settings().ORDNANCE_SURVEY_API_KEY
)
epc_searcher.find_property()
# Create a record in db
property_id, is_new = create_property(
session, portfolio_id=body.portfolio_id, address=config['address'], postcode=config['postcode']
session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean, epc_searcher.uprn
)
# if a new record was not created, we don't produduce recommendations
if not is_new:
continue
# TODO: Need to add heat demand target
create_property_targets(
session,
@ -93,20 +97,20 @@ async def trigger_plan(body: PlanTriggerRequest):
input_properties.append(
Property(
postcode=config['postcode'],
address1=config['address'],
epc_client=epc_client,
id=property_id
id=property_id,
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
data=epc_searcher.newest_epc,
old_data=epc_searcher.older_epcs,
full_sap_epc=epc_searcher.full_sap_epc,
)
)
if not input_properties:
return Response(status_code=204)
logger.info("Getting EPC, and spatial data")
logger.info("Getting spatial data")
for p in input_properties:
p.search_address_epc()
p.set_year_built()
p.get_spatial_data(uprn_filenames)
# The materials data could be cached or local so we don't need to make
@ -146,9 +150,6 @@ async def trigger_plan(body: PlanTriggerRequest):
# Finally, we'll prepare data for predicting the impact on SAP
data_processor = DataProcessor(None, newdata=True)
data_processor.insert_data(pd.DataFrame([p.get_model_data()]))
# TODO: Temp
if data_processor.data["UPRN"].values[0] == "":
data_processor.data["UPRN"] = 0
data_processor.pre_process()
@ -510,11 +511,6 @@ async def trigger_plan(body: PlanTriggerRequest):
update_or_create_property_spatial_details(session, p.uprn, p.spatial)
# TODO: TEMP
if p.data["uprn"] == "":
print("Get rid of me!")
p.data["uprn"] = 0
property_data = p.get_full_property_data()
update_property_data(
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
@ -562,7 +558,7 @@ async def trigger_plan(body: PlanTriggerRequest):
# recommendation from being default to not default, we'll need to re-run this process to re-calculate the
# the portfolion level impact
total_valuation_increase = sum(property_valuation_increases)
total_valuation_increase = sum([v for v in property_valuation_increases if v is not None])
labour_days = round(max(
[sum(r["labour_days"] for r in rec_group if r["default"]) for p_id, rec_group in recommendations.items()]
))

View file

@ -35,4 +35,5 @@ mip==1.15.0
boto3==1.28.3
pandas==1.5.3
pyarrow==12.0.1
textblob
textblob
usaddress==0.5.10

View file

@ -637,13 +637,6 @@ def app():
file_key="sap_change_model/dataset_test.parquet",
)
z = dataset[dataset["CONSTITUENCY"].isin(["E14000707", "E14000909"])]
z["CONSTITUENCY"].value_counts()
z[z["CONSTITUENCY"] == "E14000909"]["UPRN"].sample(1)
self.data[self.data["UPRN"] == "100030549358"]
if __name__ == "__main__":
app()

View file

@ -0,0 +1,190 @@
from pathlib import Path
from random import choices, sample
import os
import pandas as pd
from tqdm import tqdm
from dotenv import load_dotenv
from utils.logger import setup_logger
from backend.SearchEpc import SearchEpc, vartypes
from BaseUtility import Definitions
from etl.epc.settings import BUILT_FORM_REMAP
ENV_FILE = Path(__file__).parent / "backend" / ".env"
logger = setup_logger()
DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates"
DIR_SAMPLE_SIZE = 500
N_DIRECTORIES = 50
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
load_dotenv(ENV_FILE)
CATETORICALS_TO_IGNORE = [
"postcode", "constituency", "local-authority", "built-form", "property-type", "address1", "constituency-label",
"building-reference-number", "address2", "posttown", "transaction-type", "lmk-key", "address3",
"local-authority-label", "county",
]
def check_numeric_performance(estimated_value, actual_value):
# If we don't have anything to compare against, return None
if pd.isnull(actual_value):
return None
if pd.isnull(estimated_value):
return 1
if actual_value == 0 and estimated_value == 0:
return 0
if actual_value == 0 and estimated_value != 0:
return 1
return abs(estimated_value - actual_value) / actual_value
def app():
"""
This script is used to test the EPC estimation process.
"""
numerical_vartypes = {key: value for key, value in vartypes.items() if value in ["float", "Int64"]}
str_var_types = {key: value for key, value in vartypes.items() if value == "str"}
# Make sure we have missed any keys
if len(numerical_vartypes) + len(str_var_types) != len(vartypes):
raise ValueError("Not all vartypes have been accounted for")
# Drop some keys that aren't important
for k in CATETORICALS_TO_IGNORE:
str_var_types.pop(k, None)
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
directory_sample = choices(directories, k=N_DIRECTORIES)
results = []
for directory in tqdm(directory_sample):
filepath = directory / "certificates.csv"
df = pd.read_csv(filepath, low_memory=False)
df["UPRN"] = df["UPRN"].astype("Int64").astype("str")
df = df[~pd.isnull(df["UPRN"])]
uprn_sample = sample(df["UPRN"].unique().tolist(), DIR_SAMPLE_SIZE)
df_sample = df[df["UPRN"].isin(uprn_sample)]
# Take the record with the newest LODGEMENT_DATETIME by uprn
df_sample = df_sample.sort_values("LODGEMENT_DATETIME", ascending=False).drop_duplicates("UPRN")
# Convert the columns to lower case and replace underscores with hyphens, the same as the api
df_sample.columns = df_sample.columns.str.lower().str.replace("_", "-")
# For each epc, we test the estimation process
for _, epc in df_sample.iterrows():
epc = epc.to_dict()
address1 = epc["address1"]
postcode = epc["postcode"]
# Get all EPCs for this urpn and we make sure they get dropped from the estimate_epc function
epcs_for_uprn = df[df["UPRN"] == epc["uprn"]]
lmks_to_drop = epcs_for_uprn["LMK_KEY"].tolist()
searcher = SearchEpc(address1, postcode, auth_token=EPC_AUTH_TOKEN, os_api_key="")
searcher.uprn = epc["uprn"]
# Perform the same remapping for built-form as in the Property class for this test, in case we get (e.g.)
# Enclosed End-Terrace
built_form = BUILT_FORM_REMAP.get(epc["built-form"], epc["built-form"])
if ((epc["property-type"] == "Maisonette") & (built_form == "Detached")) or (
built_form in Definitions.DATA_ANOMALY_MATCHES
):
built_form = ""
estimated_epc = searcher.estimate_epc(
property_type=epc["property-type"], built_form=built_form, lmks_to_drop=lmks_to_drop
)
# We now compare the difference between the estimated and original
# TODO: We can convert windows and lighting to numeric versions and estimate how close we are
numeric_performance = {
key: check_numeric_performance(estimated_epc[key], epc[key]) for key, value in
numerical_vartypes.items()
}
# Remove Nones
numeric_performance = {key: value for key, value in numeric_performance.items() if value is not None}
# Get an average
numeric_performance = sum(numeric_performance.values()) / len(numeric_performance)
numeric_success = 1 - numeric_performance
# categorical performance
categorical_performance = {
key: 0 if estimated_epc[key] != epc[key] else 1 for key, value in str_var_types.items()
}
# Get an average
categorical_success = sum(categorical_performance.values()) / len(categorical_performance)
results.append(
{
"uprn": epc["uprn"],
"numeric_success": numeric_success,
"categorical_success": categorical_success,
"property_type": epc["property-type"],
"built_form": epc["built-form"],
"tenure": epc["tenure"],
}
)
# Get aggregate performance figures
results_df = pd.DataFrame(results)
results_df["tenure"] = results_df["tenure"].replace("Rented (social)", "rental (social)")
avg_numeric_succes = results_df["numeric_success"].median()
avg_categorical_sucess = results_df["categorical_success"].median()
# With 20 nearest homes
# 0.7718100840549558
# 0.5116279069767442
# 100 nearest homes
# 0.7859617377809409
# 0.5348837209302325
# Group by tenure
by_tenure = results_df.groupby("tenure").agg(
{"numeric_success": "median", "categorical_success": "median", "uprn": "count"}
)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
# With 20 nearest homes
# numeric_success categorical_success uprn
# tenure
# NO DATA! 0.847840 0.581395 278
# Not defined - use in the case of a new dwelling... 0.930282 0.651163 617
# Owner-occupied 0.770330 0.511628 2588
# Rented (private) 0.791885 0.558140 1232
# owner-occupied 0.741088 0.488372 10912
# rental (private) 0.749064 0.488372 3252
# rental (social) 0.822109 0.581395 3878
# unknown 0.895840 0.627907 1820
# 100 nearest homes
# tenure
# NO DATA! 0.899566 0.604651 233
# Not defined - use in the case of a new dwelling... 0.927518 0.674419 608
# Owner-occupied 0.777026 0.511628 3167
# Rented (private) 0.805646 0.534884 1316
# owner-occupied 0.762180 0.488372 10835
# rental (private) 0.760503 0.511628 3181
# rental (social) 0.830057 0.604651 3705
# unknown 0.899948 0.627907 1571
# By property type - we also want to see how many properties we have for each property type
by_property_type = results_df.groupby("property_type").agg(
{"numeric_success": "median", "categorical_success": "median", "uprn": "count"}
)
# By property_type & built form
by_property_type_built_form = results_df.groupby(["property_type", "built_form"]).agg(
{"numeric_success": "median", "categorical_success": "median", "uprn": "count"}
)

View file

@ -0,0 +1,42 @@
"""
This script will create an input csv for the recommendation engine and upload it to S3, which can be used for
testing
"""
import pandas as pd
from utils.s3 import save_csv_to_s3
USER_ID = 8
PORTFOLIO_ID = 57
def app():
"""
This portfolio is for testing windows recommendations
:return:
"""
test_file = pd.DataFrame(
[
{"address": "21 Butler House", "postcode": "E2 0PN", "Notes": None},
{"address": "22 Butler House", "postcode": "E2 0PN", "Notes": None},
{"address": "23 Butler House", "postcode": "E2 0PN", "Notes": None},
{"address": "24 Butler House", "postcode": "E2 0PN", "Notes": None},
]
)
# Store the data in s3
filename = f"{USER_ID}/{PORTFOLIO_ID}/no_epc.csv"
save_csv_to_s3(
dataframe=test_file,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Social",
"goal": "Increase EPC",
"goal_value": "A",
"trigger_file_path": filename
}
print(body)

View file

@ -155,6 +155,8 @@ class Recommendations:
# For the moment, we cap the number of SAP points that can be achieved by ventilation at 2
rec["sap_points"] = min(rec["sap_points"], VentilationRecommendations.SAP_LIMIT)
# Round to 2 decimal places
rec["sap_points"] = round(rec["sap_points"], 2)
rec["co2_equivalent_savings"] = float(property_instance.data["co2-emissions-current"]) - new_carbon
# Energy consumption current is per meter squared, so we need to multiply by the floor area to get