diff --git a/.idea/Model.iml b/.idea/Model.iml
index b0f9c00d..4413bb06 100644
--- a/.idea/Model.iml
+++ b/.idea/Model.iml
@@ -7,7 +7,7 @@
-
+
diff --git a/.idea/misc.xml b/.idea/misc.xml
index 1122b380..6f308057 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -3,7 +3,7 @@
-
+
diff --git a/backend/DbClient.py b/backend/DbClient.py
new file mode 100644
index 00000000..2ee01349
--- /dev/null
+++ b/backend/DbClient.py
@@ -0,0 +1,7 @@
+class DbClient:
+
+ def __init__(self):
+ """
+ This class handles interaction with the database
+ """
+ pass
diff --git a/backend/OrdnanceSurvey.py b/backend/OrdnanceSurvey.py
new file mode 100644
index 00000000..837e76bd
--- /dev/null
+++ b/backend/OrdnanceSurvey.py
@@ -0,0 +1,105 @@
+from functools import lru_cache
+import urllib.parse
+import requests
+from utils.logger import setup_logger
+
+logger = setup_logger()
+
+
+class OrdnanceSuveyClient:
+
+ def __init__(self, address, postcode, api_key):
+ """
+ This class is tasked with interaction with the ordnance survey API.
+ :param address: The address for the property to search for
+ :param postcode: The postcode for the property to search for
+ """
+
+ self.address = address
+ self.postcode = postcode
+ self.full_address = ", ".join([self.address, self.postcode])
+ self.api_key = api_key
+
+ self.results = None
+
+ self.most_relevant_result = None
+ self.property_type = None
+ self.built_form = None
+ # This will be postcode and address, as returned by the ordnance survey
+ self.address_os = None
+ self.postcode_os = None
+
+ def set_places_address(self):
+ """
+ Given a response from the places api, this function will set the address and postcode of the property
+ """
+
+ if self.most_relevant_result is None:
+ raise ValueError("No results found - run get_places_api first")
+
+ self.address_os = self.most_relevant_result["ADDRESS"]
+ self.postcode_os = self.most_relevant_result["POSTCODE"]
+ # We strip out the postcode from the address as this is already stored separately
+ self.address_os = self.address_os.replace(self.postcode_os, "").strip()
+ # Remove trailing comma
+ self.address_os = self.address_os.rstrip(",").strip()
+ # Convert to title case
+ self.address_os = self.address_os.title()
+ # Make sure postcode is upper case
+ self.postcode_os = self.postcode_os.upper()
+
+ @lru_cache(maxsize=128)
+ def get_places_api(self):
+ """
+ This method is tasked with getting the places api from the Ordnance Survey.
+ """
+
+ if not self.api_key:
+ raise ValueError("Ordnance Survey API key not specified")
+
+ encoded_address_query = urllib.parse.quote(self.full_address)
+ url = (f"https://api.os.uk/search/places/v1/find?query={encoded_address_query}&key="
+ f"{self.api_key}")
+ response = requests.get(url)
+ if response.status_code == 200:
+ data = response.json()
+ results = data['results']
+ self.results = results
+
+ # Extract some details about the best match
+ self.most_relevant_result = self.results[0]["DPA"]
+
+ self.parse_classification_code(self.most_relevant_result["CLASSIFICATION_CODE"])
+ self.set_places_address()
+
+ else:
+ logger.info("Could not find any results for the provided address and postcode")
+
+ return {"status": response.status_code}
+
+ def parse_classification_code(self, classification_code: str):
+ """
+ This function will convert the classification code, returned by the OS places api, to a property type that is
+ compatible with the EPC database.
+
+ The various classifications cane be found here:
+ https://osdatahub.os.uk/docs/places/technicalSpecification
+
+ Under LPI Output, CLASSIFICATION_CODE is described, and a link is provided to the full table of classifications
+ For these purposes, we do not need the full classification as this includes non-residential properties. We only
+ parse the ones of interest to us
+ :return:
+ """
+
+ value_map = {
+ # In the OS api, "RD" is a "Dwelling" however this is not valid property type in the EPC database
+ 'RD': {},
+ 'RD02': {'property_type': 'House', 'built_form': 'Detatched'},
+ 'RD03': {'property_type': 'House', 'built_form': 'Semi-Detatched'},
+ 'RD04': {'property_type': 'House', 'built_form': 'Mid-Terrace'},
+ 'RD06': {'property_type': 'Flat'},
+ }
+
+ mapped = value_map.get(classification_code, {})
+ self.property_type = mapped.get("property_type", "")
+ self.built_form = mapped.get("built_form", "")
diff --git a/backend/Property.py b/backend/Property.py
index 41696c37..be60784c 100644
--- a/backend/Property.py
+++ b/backend/Property.py
@@ -18,7 +18,6 @@ from recommendations.recommendation_utils import (
)
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev')
-EPC_AUTH_TOKEN = os.environ.get('EPC_AUTH_TOKEN')
DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None)
logger = setup_logger()
@@ -49,16 +48,18 @@ class Property(Definitions):
spatial = None
- def __init__(self, id, postcode, address1, epc_client=None, data=None):
+ def __init__(self, id, address, postcode, data=None, old_data=None, full_sap_epc=None):
self.id = id
+
+ self.address = address
self.postcode = postcode
- self.address1 = address1
self.data = data
- self.old_data = None
+ self.old_data = old_data
+ self.full_sap_epc = full_sap_epc
self.property_dimensions = None
- self.uprn = None
- self.full_sap_epc = None
+ self.uprn = None if data is None else int(data["uprn"])
+
self.in_conservation_area, self.is_listed, self.is_heritage = None, None, None
self.restricted_measures = False
self.year_built = None
@@ -92,47 +93,6 @@ class Property(Definitions):
self.current_adjusted_energy = None
self.expected_adjusted_energy = None
- if epc_client:
- self.epc_client = epc_client
- else:
- self.epc_client = EpcClient(auth_token=EPC_AUTH_TOKEN)
-
- def search_address_epc(self):
- """
- This method searches for an address in the EPC database and returns the first result
- :return: property data
- """
- if self.data:
- return
-
- # This will fail if a property does not have an EPC - this has been documented as a case to handle
- response = self.epc_client.domestic.search(params={"address": self.address1, "postcode": self.postcode})
-
- # Check if we have a full sap EPC
- self.full_sap_epc = [r for r in response["rows"] if r["transaction-type"] == "new dwelling"]
- self.full_sap_epc = self.full_sap_epc[0] if self.full_sap_epc else self.full_sap_epc
-
- if len(response["rows"]) > 1:
- newest_response = [
- r for r in response["rows"] if
- r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in response["rows"]])
- ]
- if len(newest_response) > 1:
- raise Exception("More than one result found for this address - investigate me")
-
- # We'll keep old EPCs in case it contains information, not present on the newest one
- self.old_data = [epc for epc in response["rows"] if epc["lmk-key"] != newest_response[0]["lmk-key"]]
-
- response["rows"] = newest_response
-
- self.data = response["rows"][0]
- # For the moment, if we don't have a UPRN, we don't do anything about it, however we'll handle this in
- # the future by using the Ordnance Survey places API
- if not self.data["uprn"]:
- logger.warning("We do not have a UPRN for this property")
- else:
- self.uprn = int(self.data["uprn"])
-
def set_energy(self):
"""
Extracts and formats data about the home's energy and co2 consumption
@@ -282,6 +242,7 @@ class Property(Definitions):
if self.data["property-type"] == "Flat":
self.data["built-form"] = "Semi-Detached"
+ self.set_year_built()
self.set_energy()
self.set_ventilation()
self.set_solar_pv()
@@ -498,7 +459,7 @@ class Property(Definitions):
"""
Utility function for usage in the lambda, for preparing the _rating fields
"""
- return rating_lookup[field].value if field not in cls.DATA_ANOMALY_MATCHES else None
+ return rating_lookup[field].value if (field not in cls.DATA_ANOMALY_MATCHES) and (field is not None) else None
def get_property_details_epc(self, portfolio_id: int, rating_lookup):
@@ -539,6 +500,7 @@ class Property(Definitions):
"primary_energy_consumption": self.energy["primary_energy_consumption"],
"co2_emissions": self.energy["co2_emissions"],
"adjusted_energy_consumption": self.current_adjusted_energy,
+ "estimated": self.data.get("estimated", False)
}
return property_details_epc
diff --git a/backend/SearchEpc.py b/backend/SearchEpc.py
index 238ae465..b3f58b04 100644
--- a/backend/SearchEpc.py
+++ b/backend/SearchEpc.py
@@ -1,12 +1,114 @@
import os
import time
+import re
+
+import usaddress
+import pandas as pd
+import numpy as np
from epc_api.client import EpcClient
+from backend.OrdnanceSurvey import OrdnanceSuveyClient
+from BaseUtility import Definitions
from utils.logger import setup_logger
from typing import List
from fuzzywuzzy import process
logger = setup_logger()
+vartypes = {
+ 'low-energy-fixed-light-count': "Int64",
+ # 'address': 'str',
+ # 'uprn-source': 'str',
+ 'floor-height': 'float',
+ 'heating-cost-potential': 'float',
+ 'unheated-corridor-length': 'float',
+ 'hot-water-cost-potential': 'float',
+ 'construction-age-band': 'str',
+ 'potential-energy-rating': 'str',
+ 'mainheat-energy-eff': 'str',
+ 'windows-env-eff': 'str',
+ 'lighting-energy-eff': 'str',
+ 'environment-impact-potential': "Int64",
+ 'glazed-type': 'str',
+ 'heating-cost-current': 'float',
+ 'address3': 'str',
+ 'mainheatcont-description': 'str',
+ 'sheating-energy-eff': 'str',
+ 'property-type': 'str',
+ 'local-authority-label': 'str',
+ 'fixed-lighting-outlets-count': "Int64",
+ 'energy-tariff': 'str',
+ 'mechanical-ventilation': 'str',
+ 'hot-water-cost-current': 'str',
+ 'county': 'str',
+ 'postcode': 'str',
+ 'solar-water-heating-flag': 'str',
+ 'constituency': 'str',
+ 'co2-emissions-potential': 'float',
+ 'number-heated-rooms': 'float',
+ 'floor-description': 'str',
+ 'energy-consumption-potential': 'float',
+ 'local-authority': 'str',
+ 'built-form': 'str',
+ 'number-open-fireplaces': "Int64",
+ 'windows-description': 'str',
+ 'glazed-area': 'str',
+ # 'inspection-date': str,
+ 'mains-gas-flag': 'str',
+ 'co2-emiss-curr-per-floor-area': 'float',
+ 'address1': 'str',
+ 'heat-loss-corridor': 'str',
+ 'flat-storey-count': "Int64",
+ 'constituency-label': 'str',
+ 'roof-energy-eff': 'str',
+ 'total-floor-area': 'float',
+ 'building-reference-number': 'str',
+ 'environment-impact-current': 'float',
+ 'co2-emissions-current': 'float',
+ 'roof-description': 'str',
+ 'floor-energy-eff': 'str',
+ 'number-habitable-rooms': 'float',
+ 'address2': 'str',
+ 'hot-water-env-eff': 'str',
+ 'posttown': 'str',
+ 'mainheatc-energy-eff': 'str',
+ 'main-fuel': 'str',
+ 'lighting-env-eff': 'str',
+ 'windows-energy-eff': 'str',
+ 'floor-env-eff': 'str',
+ 'sheating-env-eff': 'str',
+ 'lighting-description': 'str',
+ 'roof-env-eff': 'str',
+ 'walls-energy-eff': 'str',
+ 'photo-supply': 'float',
+ 'lighting-cost-potential': 'float',
+ 'mainheat-env-eff': 'str',
+ 'multi-glaze-proportion': 'float',
+ 'main-heating-controls': 'str',
+ # 'lodgement-datetime',
+ 'flat-top-storey': 'str',
+ 'current-energy-rating': 'str',
+ 'secondheat-description': 'str',
+ 'walls-env-eff': 'str',
+ 'transaction-type': 'str',
+ # 'uprn': "Int64",
+ 'current-energy-efficiency': 'float',
+ 'energy-consumption-current': 'float',
+ 'mainheat-description': 'str',
+ 'lighting-cost-current': 'float',
+ # 'lodgement-date',
+ 'extension-count': "Int64",
+ 'mainheatc-env-eff': 'str',
+ 'lmk-key': 'str',
+ 'wind-turbine-count': "Int64",
+ 'tenure': 'str',
+ 'floor-level': 'str',
+ 'potential-energy-efficiency': "Int64",
+ 'hot-water-energy-eff': 'str',
+ 'low-energy-lighting': 'float',
+ 'walls-description': 'str',
+ 'hotwater-description': 'str'
+}
+
class SearchEpc:
"""
@@ -38,9 +140,9 @@ class SearchEpc:
self,
address1: str,
postcode: str,
- address2: str = None,
- address3: str = None,
- address4: str = None,
+ auth_token: str,
+ os_api_key: str,
+ full_address: str | None = None,
max_retries: int = None,
uprn: [int, None] = None,
size=None,
@@ -50,9 +152,7 @@ class SearchEpc:
but can be used to find the epc for the home, if address1 and postcode are insufficient
:param address1: string, propery's address line 1
:param postcode: string, propery's postcode
- :param address2: string, optional, propery's address line 2
- :param address3: string, optional, propery's address line 3
- :param address4: string, optional, propery's address line 4
+ :param full_address: string, optional parameter, the full address of the property
:param max_retries: int, optional, number of retries to make when searching the api
:param uprn: int, optional, the uprn of the property
:param size: int, optional, the number of results to return. If not provided, defaults to 25 which is the api's
@@ -61,46 +161,102 @@ class SearchEpc:
self.address1 = address1
self.postcode = postcode
- self.address2 = address2
- self.address3 = address3
- self.address4 = address4
+ self.full_address = full_address
self.uprn = uprn
+ self.house_number = self.get_house_number(self.address1)
+ self.numeric_house_number = self.extract_numeric_housenumber_part(self.house_number)
self.max_retries = max_retries if max_retries is not None else self.MAX_RETRIES
- self.client = EpcClient(auth_token=os.getenv("EPC_AUTH_TOKEN"))
+ self.client = EpcClient(auth_token=auth_token)
+ self.ordnance_survey_client = OrdnanceSuveyClient(
+ address=self.address1, postcode=self.postcode, api_key=os_api_key
+ )
self.data = None
+ self.newest_epc = None
+ self.older_epcs = None
+ self.full_sap_epc = None
+
+ # These are the address and postcode values, which we store in the database
+ self.address_clean = None
+ self.postcode_clean = None
self.size = size if size is not None else 25
- def search(self):
+ @classmethod
+ def get_house_number(cls, address: str) -> str | None:
+ """
+ This method will use the usaddress library to parse an address and extract the house number
+ :return:
+ """
+
+ parsed = usaddress.parse(address)
+ parsed_house_number = [x for x in parsed if (x[1] == "AddressNumber")]
+ parsed_house_number = parsed_house_number[0][0] if parsed_house_number else None
+
+ if parsed_house_number is None:
+ # Because usaddress isn't optimal for parsing addresses with some prefixes such as 'Flat',
+ # we also add a custom approach
+
+ # Pattern to look for 'Flat' or 'Apartment' followed by a number, or just a number at the beginning
+ pattern = r'(?i)(?:flat|apartment)\s*(\d+)|^\s*(\d+)'
+
+ match = re.search(pattern, address)
+
+ if match:
+ # Return the first non-None group found
+ return next(g for g in match.groups() if g is not None)
+ else:
+ return None
+
+ # Remove training commas
+ parsed_house_number = parsed_house_number.replace(",", "")
+
+ return parsed_house_number
+
+ @staticmethod
+ def extract_numeric_housenumber_part(house_number: str | None) -> int | None:
+ # Regular expression to find the first occurrence of one or more digits
+
+ if house_number is None:
+ return None
+
+ match = re.search(r'\d+', house_number)
+
+ if match:
+ return int(match.group())
+ else:
+ return None
+
+ def get_epc(self, params=None, size=None):
# Get the EPC data with retries
+ size = size if size is not None else self.size
+ if params is None:
+ if self.uprn:
+ params = {"uprn": self.uprn}
+ else:
+ params = {"address": self.address1, "postcode": self.postcode}
for retry in range(self.max_retries):
try:
- if self.uprn:
+ if "uprn" in params:
# We use the direct call method inside, since we need to implement uprn as a valid
# parameter for the search function
url = os.path.join(self.client.domestic.host, "search")
- response = self.client.domestic.call(method="get", url=url, params={"uprn": self.uprn})
+ response = self.client.domestic.call(method="get", url=url, params=params)
else:
- response = self.client.domestic.search(
- params={"address": self.address1, "postcode": self.postcode}, size=self.size
- )
+ response = self.client.domestic.search(params=params, size=size)
if response:
self.data = response
return self.SUCCESS
if retry > 0:
- print("Failed previous attempt but retry successful")
+ logger.info("Failed previous attempt but retry successful")
# If we got nothing, final try
if not response:
- # TODO: Make a call to OS uprn service and get the address' uprn, just in case there is an
- # issue with how we are searching the api
-
return {
"status": 204,
"message": "no data",
@@ -162,7 +318,24 @@ class SearchEpc:
return rows
- def retrieve(self, property_type=None, address=None):
+ @staticmethod
+ def format_address(newest_epc):
+ """
+ Format address and postcode for storage in the database
+ """
+ postcode = newest_epc["postcode"]
+ address = newest_epc["address"]
+
+ # Format them
+ address = address.replace(postcode, "").strip()
+ address = address.rstrip(",").strip()
+ address = address.title()
+
+ postcode = postcode.upper()
+
+ return address, postcode
+
+ def extract_epc_data(self, property_type=None, address=None):
"""
Given a successful search, this method will format the data and return it
@@ -188,7 +361,16 @@ class SearchEpc:
# Finally, we identify the newest epc and the rest, and then return
newest_epc, older_epcs = self.filter_newest_epc(list_of_epcs=rows)
- return newest_epc, older_epcs, full_sap_epc
+ # Retrieve postcode and address
+ address_epc, postcode_epc = self.format_address(newest_epc=newest_epc)
+
+ # Ge the uprn from the newest record for this home
+ uprns = {r["uprn"] for r in rows if r["uprn"]}
+ if len(uprns) != 1:
+ raise ValueError("Multiple UPRNs found - investigate me")
+ uprn = uprns.pop()
+
+ return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, uprn
@staticmethod
def filter_newest_epc(list_of_epcs: List):
@@ -208,3 +390,311 @@ class SearchEpc:
older_epcs = [epc for epc in list_of_epcs if epc["lmk-key"] != newest_response[0]["lmk-key"]]
return newest_response[0], older_epcs
+
+ @staticmethod
+ def _get_epc_mode(col: str, epc_data: pd.DataFrame):
+ """
+ Simple method to extract the mode value from the EPC data
+ :param col: name of the column to take the mode of
+ :param epc_data: pandas dataframe of epc data
+ """
+
+ mode_value = epc_data[[col]].mode(dropna=True)
+ if len(mode_value) != 1:
+ raise NotImplementedError("TODO: Handle multiple modes")
+ mode_value = mode_value.iloc[0][col]
+
+ return mode_value
+
+ def fetch_nearby_epcs(
+ self, initial_postcode: str,
+ lmks_to_drop: list[str] | None = None,
+ built_form: str = "",
+ property_type: str = ""
+ ):
+ """
+ Fetches and processes EPC data for a given initial postcode, applying successive trimming
+ to the postcode and filtering the data until a non-empty result set is found.
+
+ The function queries the EPC API with the provided postcode, and if no data is found or
+ if the data doesn't meet certain criteria, it progressively shortens the postcode by
+ removing the last character and retries the query. This process continues until a valid
+ set of EPC data is obtained or the postcode is exhausted.
+
+ Additional filtering is applied to the obtained EPC data based on 'lmk-key', 'built-form',
+ and 'property-type'. The data is also processed to extract and numerically interpret house
+ numbers, calculate house number distances, and apply weights based on these distances.
+
+ :param initial_postcode: The initial full postcode for the EPC data query.
+ :param lmks_to_drop: List of 'lmk-key' values to be excluded from the EPC data.
+ :param built_form: The 'built-form' value to be used for filtering the EPC data.
+ :param property_type: The 'property-type' value to be used for filtering the EPC data.
+ :return:
+ """
+
+ property_type_api_map = {
+ "Bungalow": "bungalow",
+ "Flat": "flat",
+ "House": "house",
+ "Maisonette": "maisonette",
+ "Park home": "park home",
+ }
+
+ postcode = initial_postcode
+ while postcode:
+ # Fetch data from EPC API
+ params = {"postcode": postcode}
+ if property_type:
+ params["property-type"] = property_type_api_map[property_type]
+
+ # We take the 20 nearest homes of the relevant type, so not to pull in too many irrelevant homes
+ epc_response = self.get_epc(params=params, size=100)
+
+ if epc_response["status"] == 200:
+ epc_data = pd.DataFrame(self.data["rows"])
+
+ if lmks_to_drop is not None:
+ epc_data = epc_data[~epc_data["lmk-key"].isin(lmks_to_drop)]
+
+ if not epc_data.empty:
+ # Further processing of the EPC data
+ epc_data['lodgement-datetime'] = pd.to_datetime(epc_data['lodgement-datetime'])
+ epc_data = epc_data.sort_values("lodgement-datetime", ascending=False).groupby("uprn").head(1)
+ epc_data["house_number"] = epc_data["address"].apply(lambda add1: self.get_house_number(add1))
+ epc_data["numeric_house_number"] = epc_data["house_number"].apply(
+ lambda house_num: self.extract_numeric_housenumber_part(house_num)
+ )
+
+ if self.numeric_house_number is None:
+ # If we don't have a house number, we treat all weights as equal
+ epc_data["weight"] = 1
+ else:
+ epc_data["house_number_distance"] = abs(
+ epc_data["numeric_house_number"] - self.numeric_house_number
+ )
+ # # We add 1, just in case we have a 0 weight (e.g. comparing house number 7a to 7b, or 9A to 9)
+ # epc_data["weight"] = 1 / (epc_data["house_number_distance"] + 1)
+ # # If we have a home without a house number, fill that weight with average
+ # epc_data["weight"] = epc_data["weight"].fillna(epc_data["weight"].mean())
+ # # Finally, we might not have any house numbers whatsoever so everything could be
+ # # missing, so we fill with 1
+ # epc_data["weight"] = epc_data["weight"].fillna(1)
+ # TODO: Testing
+ # If the postcode is different from the initial postcode, it doesn't make sense to have
+ # any weightings
+ if all(pd.isnull(epc_data["house_number_distance"])) or (postcode != initial_postcode):
+ epc_data["weight"] = 1
+ else:
+ epc_data["weight"] = 1 / np.sqrt(epc_data["house_number_distance"] + 1)
+ epc_data["weight"] = epc_data["weight"].fillna(epc_data["weight"].mean())
+
+ estimation_property_type = self._estimate_str(
+ key="property-type", estimation_data=epc_data
+ ) if property_type == "" else property_type
+
+ epc_built_form = self._estimate_str(
+ key="built-form",
+ estimation_data=epc_data[epc_data["property-type"] == estimation_property_type]
+ )
+
+ if built_form == "Semi-Detached" and epc_built_form in ["End-Terraced", "Mid-Terraced"]:
+ estimation_built_form = "End-Terraced"
+ elif (built_form == "") or (pd.isnull(built_form)):
+ estimation_built_form = epc_built_form
+ else:
+ estimation_built_form = built_form
+
+ # We handle some edge cases experiences with maisonettes - if built form is detatched, just filter
+ # on maisonette
+ # We also add some additional logic for Park homes, because they are far less common than other
+ # property types
+
+ is_maisonette_with_bad_built_form = (estimation_property_type == "Maisonette") & (
+ estimation_built_form in ["Detached", "Semi-Detached"]
+ )
+
+ is_park_home_without_built_form = (estimation_property_type == "Park home") & (
+ sum(epc_data["built-form"] == estimation_built_form) == 0
+ )
+
+ has_missing_built_form = not estimation_built_form
+
+ if is_maisonette_with_bad_built_form or is_park_home_without_built_form or has_missing_built_form:
+ epc_data = epc_data[epc_data["property-type"] == estimation_property_type]
+ else:
+ epc_data = epc_data[
+ (epc_data["built-form"] == estimation_built_form) & (
+ epc_data["property-type"] == estimation_property_type)
+ ]
+
+ if not epc_data.empty:
+ return epc_data # Return the filtered data if it's not empty
+
+ # Shorten the postcode by one character for the next iteration
+ postcode = postcode[:-1].rstrip()
+
+ # If loop finishes without a valid response, raise an exception
+ raise Exception("Unable to find postcode data after trimming - investigate me")
+
+ def estimate_epc(self, property_type, built_form, lmks_to_drop=None):
+ """
+ For a property that does not have an EPC, we retrieve the EPC data for the closest properties
+ and estimate the EPC for the property in question.
+
+ Note - do we have postcodes with just a single address? We would need to use a different approach
+ to find the closest homes
+ :param property_type: This is the property type of the property we are estimating, that can be retrieved from
+ the ordnance survey api
+ :param built_form: This is the built form of the property we are estimating, that can be retrieved from
+ the ordnance survey api
+ :param lmks_to_drop: This is a list of LMK keys that should be dropped from the estimation process. This
+ is used as an override for testing, to drop EPCs for the property we are testing
+ :return:
+ """
+
+ # From the ordnance survey data, we want to determine the property type and then use only similar property
+ # types for the estimation process
+ epc_data = self.fetch_nearby_epcs(
+ initial_postcode=self.postcode,
+ lmks_to_drop=lmks_to_drop,
+ built_form=built_form,
+ property_type=property_type
+ )
+
+ # For each attribute, we need to determine the datatype and use an appropriate method
+ # to estimate.
+ estimated_epc = {}
+ for key, vartype in vartypes.items():
+ epc_data[key] = np.where(pd.isnull(epc_data[key]), None, epc_data[key])
+ epc_data[key] = np.where(epc_data[key] == "", None, epc_data[key])
+ estimation_data = epc_data[[key, "weight", "lodgement-datetime"]].copy()
+ estimation_data = estimation_data[~pd.isnull(estimation_data[key])]
+ estimation_data = estimation_data[~estimation_data[key].isin(Definitions.DATA_ANOMALY_MATCHES)]
+ if vartype == "Int64":
+ # We have some edge cases where we get the error "invalid literal for int() with base 10: '1.0'"
+ # so this handles this
+ estimation_data[key] = estimation_data[key].astype(float).astype(vartype)
+ else:
+ estimation_data[key] = estimation_data[key].astype(vartype)
+
+ if estimation_data.shape[0] == 0:
+ estimated_epc[key] = None
+ continue
+
+ if vartype == "Int64":
+ estimated_value = self._estimate_int(estimation_data, key)
+ elif vartype == "float":
+ estimated_value = self._estimate_float(estimation_data, key)
+ elif vartype == "str":
+ estimated_value = self._estimate_str(estimation_data, key)
+ else:
+ raise NotImplementedError("estimation method not implemented for type")
+
+ estimated_epc[key] = estimated_value
+
+ # Insert an estimated lodgement datetime, with a weighted average
+ estimated_epc["lodgement-datetime"] = self.calculate_weighted_lodgement_datetime(epc_data=epc_data)
+ # Extract logement date
+ estimated_epc["lodgement-date"] = estimated_epc["lodgement-datetime"].strftime("%Y-%m-%d")
+
+ estimated_epc["postcode"] = self.postcode
+ estimated_epc["uprn"] = self.uprn
+ estimated_epc["address"] = self.full_address
+ # Indicate that this epc was estimated
+ estimated_epc["estimated"] = True
+
+ return estimated_epc
+
+ @staticmethod
+ def calculate_weighted_lodgement_datetime(epc_data):
+ numeric_dates = pd.to_datetime(epc_data['lodgement-datetime']).view('int64')
+
+ # Calculate the weighted sum of dates
+ weighted_sum = (numeric_dates * epc_data['weight']).sum()
+
+ # Calculate the sum of weights
+ total_weights = epc_data['weight'].sum()
+
+ # Calculate the weighted mean in numeric format
+ weighted_mean_numeric = weighted_sum / total_weights
+
+ # Convert the numeric weighted mean back to datetime
+ weighted_mean_datetime = pd.to_datetime(weighted_mean_numeric)
+
+ return weighted_mean_datetime
+
+ @staticmethod
+ def _estimate_int(estimation_data, key):
+ return round(np.average(a=estimation_data[key], weights=estimation_data["weight"]))
+
+ @staticmethod
+ def _estimate_float(estimation_data, key):
+ return round(np.average(a=estimation_data[key], weights=estimation_data["weight"]), 2)
+
+ @staticmethod
+ def _estimate_str(estimation_data, key):
+ agg = estimation_data.groupby(key)["weight"].sum().reset_index()
+ agg = agg[agg["weight"] == agg["weight"].max()]
+ if agg.shape[0] != 1:
+ # If we have multiple modes, we take the more recent data on average
+ recent_grouped = estimation_data[
+ estimation_data[key].isin(agg[key].values)
+ ].groupby(key)["lodgement-datetime"].mean()
+
+ newest_group = recent_grouped.idxmax()
+ return newest_group
+
+ return agg[key].values[0]
+
+ def find_property(self):
+ """
+ This method will attempt to identify a property. It will, at first, use the EPC api to try and
+ find the EPC for the property and the associated UPRN. If this fails, it will use the Ordnance Survey API to
+ find the UPRN of the address.
+
+ Because no result may have been provided by the EPC api because of formatting issues with the address,
+ if the ordnance survey api is used and the uprn retrieved, the EPC api is queried again with the UPRN, just
+ as a final check to see if there is any EPC data.
+
+ If there is no EPC data, the epc data will be estimated based on the surrounding properties
+ """
+
+ # Step 1: use the epc api to find the property and uprn
+ response = self.get_epc()
+
+ if response["status"] == 200:
+ (
+ self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn
+ ) = self.extract_epc_data(address=self.full_address)
+ return
+
+ # Step 2: If we don't have an EPC, we use the ordnance survey api to find the uprn
+ os_response = self.ordnance_survey_client.get_places_api()
+
+ if os_response["status"] != 200:
+ # Investigate this if it happens
+ raise Exception("Unable to find property - investigate me")
+
+ # Step 3: Now that we have a urpn, do another check against the epc api, this time searching with the uprn
+ self.uprn = self.ordnance_survey_client.most_relevant_result["UPRN"]
+ response = self.get_epc()
+ if response["status"] == 200:
+ (
+ self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn
+ ) = self.extract_epc_data()
+ return
+
+ # Step 4: If we still don't have an EPC, we estimate the EPC data
+ self.full_address = self.ordnance_survey_client.most_relevant_result["ADDRESS"]
+ estimated_epc = self.estimate_epc(
+ property_type=self.ordnance_survey_client.property_type,
+ built_form=self.ordnance_survey_client.built_form
+ )
+ self.newest_epc = estimated_epc
+ self.older_epcs = []
+ self.full_sap_epc = {}
+
+ # Finally, set a standardised address 1 and postcode
+ self.address_clean = self.ordnance_survey_client.address_os
+ self.postcode_clean = self.ordnance_survey_client.postcode_os
+ return
diff --git a/backend/app/config.py b/backend/app/config.py
index 22621972..764bddf5 100644
--- a/backend/app/config.py
+++ b/backend/app/config.py
@@ -13,6 +13,7 @@ class Settings(BaseSettings):
HEAT_PREDICTIONS_BUCKET: str
PLAN_TRIGGER_BUCKET: str
EPC_AUTH_TOKEN: str
+ ORDNANCE_SURVEY_API_KEY: str
DB_HOST: str
DB_PASSWORD: str
DB_USERNAME: str
diff --git a/backend/app/db/functions/property_functions.py b/backend/app/db/functions/property_functions.py
index 93dc0c49..88b4e87d 100644
--- a/backend/app/db/functions/property_functions.py
+++ b/backend/app/db/functions/property_functions.py
@@ -11,7 +11,7 @@ from backend.app.db.models.portfolio import (
from sqlalchemy.orm.exc import NoResultFound
-def create_property(session: Session, portfolio_id: int, address: str, postcode: str) -> (int, bool):
+def create_property(session: Session, portfolio_id: int, address: str, postcode: str, uprn: str) -> (int, bool):
"""
This function will create a record for the property in the database if it does not exist.
If it does exist, it will just update the updated_at field.
@@ -25,7 +25,7 @@ def create_property(session: Session, portfolio_id: int, address: str, postcode:
try:
# Attempt to fetch the existing property
existing_property = session.query(PropertyModel).filter_by(
- address=address, postcode=postcode, portfolio_id=portfolio_id
+ uprn=uprn, portfolio_id=portfolio_id
).one()
# Update the 'updated_at' field
@@ -43,6 +43,7 @@ def create_property(session: Session, portfolio_id: int, address: str, postcode:
address=address,
postcode=postcode,
portfolio_id=portfolio_id,
+ uprn=uprn,
creation_status=PropertyCreationStatus.LOADING,
status=PortfolioStatus.ASSESSMENT.value,
has_pre_condition_report=False,
diff --git a/backend/app/db/models/portfolio.py b/backend/app/db/models/portfolio.py
index 6f865381..f7c0370b 100644
--- a/backend/app/db/models/portfolio.py
+++ b/backend/app/db/models/portfolio.py
@@ -153,6 +153,7 @@ class PropertyDetailsEpcModel(Base):
primary_energy_consumption = Column(Float)
co2_emissions = Column(Float)
adjusted_energy_consumption = Column(Float)
+ estimated = Column(Boolean, default=False)
class PropertyDetailsSpatial(Base):
diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py
index a284e50b..89347be2 100644
--- a/backend/app/plan/router.py
+++ b/backend/app/plan/router.py
@@ -2,7 +2,7 @@ from datetime import datetime
import numpy as np
import pandas as pd
-from epc_api.client import EpcClient
+from backend.SearchEpc import SearchEpc
from fastapi import APIRouter, Depends
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.orm import sessionmaker
@@ -59,7 +59,6 @@ async def trigger_plan(body: PlanTriggerRequest):
try:
session.begin()
logger.info("Getting the inputs")
- epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN)
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
uprn_filenames = read_dataframe_from_s3_parquet(
@@ -72,16 +71,21 @@ async def trigger_plan(body: PlanTriggerRequest):
input_properties = []
for config in plan_input:
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
- # TODO: implment validation. We should also standardise postcode and address in some fashion as
- # a postcode of abcdef would be considered different to ABCDEF
+
+ epc_searcher = SearchEpc(
+ address1=config["address"],
+ postcode=config["postcode"],
+ auth_token=get_settings().EPC_AUTH_TOKEN,
+ os_api_key=get_settings().ORDNANCE_SURVEY_API_KEY
+ )
+ epc_searcher.find_property()
# Create a record in db
property_id, is_new = create_property(
- session, portfolio_id=body.portfolio_id, address=config['address'], postcode=config['postcode']
+ session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean, epc_searcher.uprn
)
# if a new record was not created, we don't produduce recommendations
if not is_new:
continue
- # TODO: Need to add heat demand target
create_property_targets(
session,
@@ -93,20 +97,20 @@ async def trigger_plan(body: PlanTriggerRequest):
input_properties.append(
Property(
- postcode=config['postcode'],
- address1=config['address'],
- epc_client=epc_client,
- id=property_id
+ id=property_id,
+ address=epc_searcher.address_clean,
+ postcode=epc_searcher.postcode_clean,
+ data=epc_searcher.newest_epc,
+ old_data=epc_searcher.older_epcs,
+ full_sap_epc=epc_searcher.full_sap_epc,
)
)
if not input_properties:
return Response(status_code=204)
- logger.info("Getting EPC, and spatial data")
+ logger.info("Getting spatial data")
for p in input_properties:
- p.search_address_epc()
- p.set_year_built()
p.get_spatial_data(uprn_filenames)
# The materials data could be cached or local so we don't need to make
@@ -146,9 +150,6 @@ async def trigger_plan(body: PlanTriggerRequest):
# Finally, we'll prepare data for predicting the impact on SAP
data_processor = DataProcessor(None, newdata=True)
data_processor.insert_data(pd.DataFrame([p.get_model_data()]))
- # TODO: Temp
- if data_processor.data["UPRN"].values[0] == "":
- data_processor.data["UPRN"] = 0
data_processor.pre_process()
@@ -510,11 +511,6 @@ async def trigger_plan(body: PlanTriggerRequest):
update_or_create_property_spatial_details(session, p.uprn, p.spatial)
- # TODO: TEMP
- if p.data["uprn"] == "":
- print("Get rid of me!")
- p.data["uprn"] = 0
-
property_data = p.get_full_property_data()
update_property_data(
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
@@ -562,7 +558,7 @@ async def trigger_plan(body: PlanTriggerRequest):
# recommendation from being default to not default, we'll need to re-run this process to re-calculate the
# the portfolion level impact
- total_valuation_increase = sum(property_valuation_increases)
+ total_valuation_increase = sum([v for v in property_valuation_increases if v is not None])
labour_days = round(max(
[sum(r["labour_days"] for r in rec_group if r["default"]) for p_id, rec_group in recommendations.items()]
))
diff --git a/backend/requirements/base.txt b/backend/requirements/base.txt
index 7a925030..3173f7f8 100644
--- a/backend/requirements/base.txt
+++ b/backend/requirements/base.txt
@@ -35,4 +35,5 @@ mip==1.15.0
boto3==1.28.3
pandas==1.5.3
pyarrow==12.0.1
-textblob
\ No newline at end of file
+textblob
+usaddress==0.5.10
\ No newline at end of file
diff --git a/etl/epc/property_change_app.py b/etl/epc/property_change_app.py
index f8c293a1..7e2262d0 100644
--- a/etl/epc/property_change_app.py
+++ b/etl/epc/property_change_app.py
@@ -637,13 +637,6 @@ def app():
file_key="sap_change_model/dataset_test.parquet",
)
- z = dataset[dataset["CONSTITUENCY"].isin(["E14000707", "E14000909"])]
- z["CONSTITUENCY"].value_counts()
-
- z[z["CONSTITUENCY"] == "E14000909"]["UPRN"].sample(1)
-
- self.data[self.data["UPRN"] == "100030549358"]
-
if __name__ == "__main__":
app()
diff --git a/etl/testing_data/estimate_epc.py b/etl/testing_data/estimate_epc.py
new file mode 100644
index 00000000..9e460678
--- /dev/null
+++ b/etl/testing_data/estimate_epc.py
@@ -0,0 +1,190 @@
+from pathlib import Path
+from random import choices, sample
+
+import os
+import pandas as pd
+from tqdm import tqdm
+from dotenv import load_dotenv
+from utils.logger import setup_logger
+from backend.SearchEpc import SearchEpc, vartypes
+from BaseUtility import Definitions
+from etl.epc.settings import BUILT_FORM_REMAP
+
+ENV_FILE = Path(__file__).parent / "backend" / ".env"
+
+logger = setup_logger()
+
+DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates"
+DIR_SAMPLE_SIZE = 500
+N_DIRECTORIES = 50
+
+EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
+
+load_dotenv(ENV_FILE)
+
+CATETORICALS_TO_IGNORE = [
+ "postcode", "constituency", "local-authority", "built-form", "property-type", "address1", "constituency-label",
+ "building-reference-number", "address2", "posttown", "transaction-type", "lmk-key", "address3",
+ "local-authority-label", "county",
+]
+
+
+def check_numeric_performance(estimated_value, actual_value):
+ # If we don't have anything to compare against, return None
+ if pd.isnull(actual_value):
+ return None
+
+ if pd.isnull(estimated_value):
+ return 1
+
+ if actual_value == 0 and estimated_value == 0:
+ return 0
+
+ if actual_value == 0 and estimated_value != 0:
+ return 1
+
+ return abs(estimated_value - actual_value) / actual_value
+
+
+def app():
+ """
+ This script is used to test the EPC estimation process.
+ """
+
+ numerical_vartypes = {key: value for key, value in vartypes.items() if value in ["float", "Int64"]}
+ str_var_types = {key: value for key, value in vartypes.items() if value == "str"}
+ # Make sure we have missed any keys
+ if len(numerical_vartypes) + len(str_var_types) != len(vartypes):
+ raise ValueError("Not all vartypes have been accounted for")
+
+ # Drop some keys that aren't important
+ for k in CATETORICALS_TO_IGNORE:
+ str_var_types.pop(k, None)
+
+ directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
+
+ directory_sample = choices(directories, k=N_DIRECTORIES)
+
+ results = []
+
+ for directory in tqdm(directory_sample):
+ filepath = directory / "certificates.csv"
+ df = pd.read_csv(filepath, low_memory=False)
+ df["UPRN"] = df["UPRN"].astype("Int64").astype("str")
+ df = df[~pd.isnull(df["UPRN"])]
+
+ uprn_sample = sample(df["UPRN"].unique().tolist(), DIR_SAMPLE_SIZE)
+ df_sample = df[df["UPRN"].isin(uprn_sample)]
+ # Take the record with the newest LODGEMENT_DATETIME by uprn
+ df_sample = df_sample.sort_values("LODGEMENT_DATETIME", ascending=False).drop_duplicates("UPRN")
+ # Convert the columns to lower case and replace underscores with hyphens, the same as the api
+ df_sample.columns = df_sample.columns.str.lower().str.replace("_", "-")
+
+ # For each epc, we test the estimation process
+ for _, epc in df_sample.iterrows():
+ epc = epc.to_dict()
+ address1 = epc["address1"]
+ postcode = epc["postcode"]
+
+ # Get all EPCs for this urpn and we make sure they get dropped from the estimate_epc function
+ epcs_for_uprn = df[df["UPRN"] == epc["uprn"]]
+ lmks_to_drop = epcs_for_uprn["LMK_KEY"].tolist()
+ searcher = SearchEpc(address1, postcode, auth_token=EPC_AUTH_TOKEN, os_api_key="")
+ searcher.uprn = epc["uprn"]
+
+ # Perform the same remapping for built-form as in the Property class for this test, in case we get (e.g.)
+ # Enclosed End-Terrace
+ built_form = BUILT_FORM_REMAP.get(epc["built-form"], epc["built-form"])
+ if ((epc["property-type"] == "Maisonette") & (built_form == "Detached")) or (
+ built_form in Definitions.DATA_ANOMALY_MATCHES
+ ):
+ built_form = ""
+
+ estimated_epc = searcher.estimate_epc(
+ property_type=epc["property-type"], built_form=built_form, lmks_to_drop=lmks_to_drop
+ )
+
+ # We now compare the difference between the estimated and original
+ # TODO: We can convert windows and lighting to numeric versions and estimate how close we are
+ numeric_performance = {
+ key: check_numeric_performance(estimated_epc[key], epc[key]) for key, value in
+ numerical_vartypes.items()
+ }
+
+ # Remove Nones
+ numeric_performance = {key: value for key, value in numeric_performance.items() if value is not None}
+ # Get an average
+ numeric_performance = sum(numeric_performance.values()) / len(numeric_performance)
+ numeric_success = 1 - numeric_performance
+
+ # categorical performance
+ categorical_performance = {
+ key: 0 if estimated_epc[key] != epc[key] else 1 for key, value in str_var_types.items()
+ }
+ # Get an average
+ categorical_success = sum(categorical_performance.values()) / len(categorical_performance)
+
+ results.append(
+ {
+ "uprn": epc["uprn"],
+ "numeric_success": numeric_success,
+ "categorical_success": categorical_success,
+ "property_type": epc["property-type"],
+ "built_form": epc["built-form"],
+ "tenure": epc["tenure"],
+ }
+ )
+
+ # Get aggregate performance figures
+ results_df = pd.DataFrame(results)
+ results_df["tenure"] = results_df["tenure"].replace("Rented (social)", "rental (social)")
+
+ avg_numeric_succes = results_df["numeric_success"].median()
+ avg_categorical_sucess = results_df["categorical_success"].median()
+
+ # With 20 nearest homes
+ # 0.7718100840549558
+ # 0.5116279069767442
+ # 100 nearest homes
+ # 0.7859617377809409
+ # 0.5348837209302325
+
+ # Group by tenure
+ by_tenure = results_df.groupby("tenure").agg(
+ {"numeric_success": "median", "categorical_success": "median", "uprn": "count"}
+ )
+ pd.set_option('display.max_rows', 500)
+ pd.set_option('display.max_columns', 500)
+ pd.set_option('display.width', 1000)
+
+ # With 20 nearest homes
+ # numeric_success categorical_success uprn
+ # tenure
+ # NO DATA! 0.847840 0.581395 278
+ # Not defined - use in the case of a new dwelling... 0.930282 0.651163 617
+ # Owner-occupied 0.770330 0.511628 2588
+ # Rented (private) 0.791885 0.558140 1232
+ # owner-occupied 0.741088 0.488372 10912
+ # rental (private) 0.749064 0.488372 3252
+ # rental (social) 0.822109 0.581395 3878
+ # unknown 0.895840 0.627907 1820
+
+ # 100 nearest homes
+ # tenure
+ # NO DATA! 0.899566 0.604651 233
+ # Not defined - use in the case of a new dwelling... 0.927518 0.674419 608
+ # Owner-occupied 0.777026 0.511628 3167
+ # Rented (private) 0.805646 0.534884 1316
+ # owner-occupied 0.762180 0.488372 10835
+ # rental (private) 0.760503 0.511628 3181
+ # rental (social) 0.830057 0.604651 3705
+ # unknown 0.899948 0.627907 1571
+
+ # By property type - we also want to see how many properties we have for each property type
+ by_property_type = results_df.groupby("property_type").agg(
+ {"numeric_success": "median", "categorical_success": "median", "uprn": "count"}
+ )
+ # By property_type & built form
+ by_property_type_built_form = results_df.groupby(["property_type", "built_form"]).agg(
+ {"numeric_success": "median", "categorical_success": "median", "uprn": "count"}
+ )
diff --git a/etl/testing_data/no_epc_input.py b/etl/testing_data/no_epc_input.py
new file mode 100644
index 00000000..0745ff7a
--- /dev/null
+++ b/etl/testing_data/no_epc_input.py
@@ -0,0 +1,42 @@
+"""
+This script will create an input csv for the recommendation engine and upload it to S3, which can be used for
+testing
+"""
+import pandas as pd
+from utils.s3 import save_csv_to_s3
+
+USER_ID = 8
+PORTFOLIO_ID = 57
+
+
+def app():
+ """
+ This portfolio is for testing windows recommendations
+ :return:
+ """
+
+ test_file = pd.DataFrame(
+ [
+ {"address": "21 Butler House", "postcode": "E2 0PN", "Notes": None},
+ {"address": "22 Butler House", "postcode": "E2 0PN", "Notes": None},
+ {"address": "23 Butler House", "postcode": "E2 0PN", "Notes": None},
+ {"address": "24 Butler House", "postcode": "E2 0PN", "Notes": None},
+ ]
+ )
+
+ # Store the data in s3
+ filename = f"{USER_ID}/{PORTFOLIO_ID}/no_epc.csv"
+ save_csv_to_s3(
+ dataframe=test_file,
+ bucket_name="retrofit-plan-inputs-dev",
+ file_name=filename
+ )
+
+ body = {
+ "portfolio_id": str(PORTFOLIO_ID),
+ "housing_type": "Social",
+ "goal": "Increase EPC",
+ "goal_value": "A",
+ "trigger_file_path": filename
+ }
+ print(body)
diff --git a/recommendations/Recommendations.py b/recommendations/Recommendations.py
index 4b54cb52..a5d1f35c 100644
--- a/recommendations/Recommendations.py
+++ b/recommendations/Recommendations.py
@@ -155,6 +155,8 @@ class Recommendations:
# For the moment, we cap the number of SAP points that can be achieved by ventilation at 2
rec["sap_points"] = min(rec["sap_points"], VentilationRecommendations.SAP_LIMIT)
+ # Round to 2 decimal places
+ rec["sap_points"] = round(rec["sap_points"], 2)
rec["co2_equivalent_savings"] = float(property_instance.data["co2-emissions-current"]) - new_carbon
# Energy consumption current is per meter squared, so we need to multiply by the floor area to get