finished ha32 analysis

This commit is contained in:
Khalim Conn-Kowlessar 2023-12-11 16:17:43 +00:00
parent b41fa37072
commit 739afbd79b
3 changed files with 248 additions and 96 deletions

View file

@ -66,7 +66,7 @@ class SearchEpc:
def search(self):
# Get the EPC data with retries
response = {}
for retry in range(self.max_retries):
try:
response = self.client.domestic.search(
@ -81,14 +81,15 @@ class SearchEpc:
print("Failed previous attempt but retry successful")
# If we got nothing, final try
if not response:
raise NotImplementedError("Implement me")
# response = client.domestic.search(
# params={"address": " ".join([home["Dwelling num"], home["Street"]]),
# "postcode": home["Postcode"]}
# )
# TODO: Make a call to OS uprn service and get the address' uprn, just in case there is an
# issue with how we are searching the api
return {
"status": 204,
"message": "no data",
"error": None
}
# TODO: Eventually, if we have nothing, we should exit with a 201 or 202, saying that
# there is not data for this property
return {
"status": 200,
"message": "success",
@ -107,7 +108,7 @@ class SearchEpc:
"error": str(e)
}
def retrieve(self):
def retrieve(self, property_type=None):
"""
Given a successful search, this method will format the data and return it
@ -125,7 +126,12 @@ class SearchEpc:
uprns = {r["uprn"] for r in rows}
if len(uprns) != 1:
raise NotImplementedError("More than one unique UPRN, need to handle this case")
logger.error("Multiple EPCs found - we should use an alternate method of searching - TODO")
if property_type is not None:
# We can do a filter on the property type
rows_filtered = [r for r in rows if r["property-type"] == property_type]
if rows_filtered:
rows = rows_filtered
# We now check for a full sap epc:
full_sap_epc = [r for r in rows if r["transaction-type"] == "new dwelling"]

View file

@ -37,20 +37,19 @@ class Eligibility:
def parse_fabric(self, key):
if "thermal transmittance" in self.epc[key]:
if key == "walls-description":
return WallAttributes(self.epc["walls-description"]).process()
if key == "roof-description":
return RoofAttributes(self.epc["roof-description"]).process()
raise ValueError("Invalid Key")
# Get the cleaned version of the description
return [
remapped = [
data for data in self.cleaned[key] if
data["original_description"] == self.epc[key]
][0]
]
if remapped:
return remapped[0]
if key == "walls-description":
return WallAttributes(self.epc["walls-description"]).process()
if key == "roof-description":
return RoofAttributes(self.epc["roof-description"]).process()
def loft_insulation(self, loft_thickness_threshold: int = None):
"""
@ -72,7 +71,7 @@ class Eligibility:
if not is_loft:
self.loft = {
"suitablility": False,
"suitability": False,
"thickness": None,
"reason": "roof not loft"
}
@ -88,14 +87,14 @@ class Eligibility:
if insulation_thickness > loft_thickness_threshold:
# Insulation is already thick enough
self.loft = {
"suitablility": False,
"suitability": False,
"thickness": insulation_thickness,
"reason": "existing insulation"
}
return
self.loft = {
"suitablility": True,
"suitability": True,
"thickness": insulation_thickness,
"reason": None
}
@ -121,7 +120,7 @@ class Eligibility:
if is_unfilled_cavity:
self.cavity = {
"suitablility": True,
"suitability": True,
"type": "empty",
}
return
@ -170,7 +169,11 @@ class Eligibility:
self.cavity_insulation()
self.loft_insulation()
self.gbis = (self.cavity["suitablility"] or self.loft["suitibility"]) and (
# self.gbis = (self.cavity["suitability"] or self.loft["suitability"]) and (
# int(self.epc["current-energy-efficiency"]) <= 68
# )
self.gbis = (self.cavity["suitability"]) and (
int(self.epc["current-energy-efficiency"]) <= 68
)
@ -214,7 +217,7 @@ class Eligibility:
self.loft_insulation()
# make sure conditions 2 and 3 are true
is_eligible = self.cavity["suitablility"] & self.loft["suitablility"]
is_eligible = self.cavity["suitability"] & self.loft["suitability"]
if post_retrofit_sap is None:
self.eco4 = {

View file

@ -8,7 +8,7 @@ from tqdm import tqdm
import pandas as pd
import numpy as np
import msgpack
from datetime import datetime
from datetime import datetime, timedelta
from utils.logger import setup_logger
from utils.s3 import read_from_s3
from dotenv import load_dotenv
@ -87,6 +87,14 @@ def marge_ha_32(asset_list, identified_addresses):
identified_addresses["Address"]
)
# Update how 7 Norton grove is listed as it has the wrong postcode
asset_list["Postcode"] = np.where(
(asset_list["Street"] == "Norton Grove") & (asset_list["Postcode"] == "HU4 6HQ") & (
asset_list["Dwelling num"] == "7"),
"HU4 6HG",
asset_list["Postcode"]
)
asset_list["merge_key"] = (
asset_list["Dwelling num"].astype(str).str.lower().str.strip().str.replace(" ", "") +
asset_list["Street"].astype(str).str.lower().str.strip().str.replace(" ", "") +
@ -398,6 +406,208 @@ def prepare_model_data_row(property_id, modelling_epc, cleaned, cleaning_data, c
return scoring_dict
def get_ha_32data(ha_data, cleaned, cleaning_data, created_at):
house_number_key = "Dwelling num"
address_key = "Street"
postcode_key = "Postcode"
house_name = "Dwelling name"
house_type_key = "Dwelling type"
house_type_lookup = {
"Bungalow": "Bungalow",
"Flat": "Flat",
'House': "House",
'Store Room': None,
'Bungalow Disabled': "Bungalow",
'Flat Disabled': "Flat",
'Dormer Bungalow': "Bungalow",
'Pop-In': None,
'Laundry': None,
'Shed': None,
'Bedsit': None,
}
scoring_data = []
results = []
no_house_numbers = []
for _, house in tqdm(ha_data.iterrows(), total=len(ha_data)):
# If we don't have a house number, we'll continue since we won't realistically be able to find
# an address
if pd.isnull(house[house_number_key]):
no_house_numbers.append(house["row_id"])
continue
if house_name is not None:
if not pd.isnull(house[house_name]):
address1 = " ".join([house[house_name], house[house_number_key], house[address_key]])
else:
address1 = " ".join([house[house_number_key], house[address_key]])
else:
address1 = " ".join([house[house_number_key], house[address_key]])
searcher = SearchEpc(
address1=address1,
postcode=house[postcode_key]
)
response = searcher.search()
if response["status"] == 204:
# If the property is identified, we should fix this
# if house["identified"]:
# raise NotImplementedError("Check if we have an epc")
results.append(
{
"row_id": house["row_id"],
"warmfront_identified": house["identified"],
"gbis_eligible": None,
"eco4_eligible": None,
"date_epc": None,
"message": "No EPC found",
}
)
continue
newest_epc, older_epcs, _ = searcher.retrieve(
property_type=house_type_lookup.get(house[house_type_key], None)
)
# We also want to get the penultimate epc
penultimate_epc, _ = searcher.filter_newest_epc(older_epcs)
if not penultimate_epc:
penultimate_epc = newest_epc
eligibility = Eligibility(epc=newest_epc, cleaned=cleaned)
eligibility.check_gbis()
eligibility.check_eco4()
# If there is no eligibility, we need to check the penultimate epc
if (not eligibility.eco4["eligible"]) and (not eligibility.gbis):
eligibility = Eligibility(epc=penultimate_epc, cleaned=cleaned)
eligibility.check_gbis()
eligibility.check_eco4()
if eligibility.eco4["eligible"]:
# TODO: Check me
scoring_dictionary = prepare_model_data_row(
property_id=house["row_id"],
modelling_epc=eligibility.epc,
cleaned=cleaned,
cleaning_data=cleaning_data,
created_at=created_at
)
scoring_data.append(scoring_dictionary)
results.append(
{
"row_id": house["row_id"],
"warmfront_identified": house["identified"],
"gbis_eligible": eligibility.gbis,
"eco4_eligible": eligibility.eco4["eligible"],
"date_epc": eligibility.epc["lodgement-date"],
"message": "eco4 conditional on post sap",
}
)
continue
# if (house["identified"] and not eligibility.gbis) and (
# house["identified"] and not eligibility.eco4["eligible"]):
# raise NotImplementedError("Investigate ms")
# If nothing is eligible or gbis is eligible, then we make a record this
results.append(
{
"row_id": house["row_id"],
"warmfront_identified": house["identified"],
"gbis_eligible": eligibility.gbis,
"eco4_eligible": eligibility.eco4["eligible"],
"date_epc": eligibility.epc["lodgement-date"],
"message": None
}
)
logger.info("no_house_numbers")
return results, scoring_data
def analyse_ha_32_results(results, ha32, no_house_numbers):
results_df = pd.DataFrame(results)
import pickle
# with open("ha_32_results.pickle", "wb") as f:
# pickle.dump(results_df, f)
"""
We want to know:
1) What proportion of identified properties we get correct
2) If we miss identified properties, why
3) Which properties do we identify that were not identified by warmfront. What is our confidence on these?
For HA32, most of these (if not all) properties were identified under gbis
"""
# What proportio
warmfront_identified = results_df[
results_df["warmfront_identified"]
]
success_rate = warmfront_identified["gbis_eligible"].sum() / warmfront_identified.shape[0]
# For HA32, this is 89%
# missed = results_df[
# results_df["warmfront_identified"] & (warmfront_identified["gbis_eligible"] != True)
# ]
# to_check = missed[pd.isnull(missed["message"])]
# ha32[ha32["row_id"] == to_check["row_id"].values[14]].squeeze()
# to_check[to_check["row_id"] == to_check["row_id"].values[14]].squeeze()
# For these properties, warmfront identified all of them, however two did not seem to look valid.
# We could perhaps update our detection, if the properties not found are not currently EPC C or above, but
# do not look eligible from a building materials perspective
# E.g.:
# row_ids = ha32[ha32["Postcode"] == "HU4 6TG"]["row_id"].values
# z = results_df[results_df["row_id"].isin(row_ids)]
# Reason 1: The EPC indicates that the cavity is filled (GBIS allows for more than just cavity measures, however
# we check ust the cavity for GBIS homes, since I believe this is what Warmfront have in place with
# regards to commercial agreements with the installer. An example of this is 30 Coxwold Grove,
# HU4 6HH.
#
# Reason 2: Some properties do not have any existing data. This amounts for 16 of the 50 that we missed.
# We will be implemntating a solution to interpolate homes that do not have any data, based on their
# neighbours. An example of this is 979 Hessle Road, HU4 6QG. If we look at the neighbours, we would
# likely infer that this property has an empty cavity and therefore would identify
#
# Reason 3: Some properties, e.g. 975 Hessle Road, HU4 6QG, look like they would quality for GBIS,
# but is already a C, based on its Nov 2022 EPC (it was a C before that too). I'm personally not sure
# why this home would get identified as you would not be able to get GBIS funding. Same for 977 Hessle
# road. This was the most common reason. Another example 8 Edith Cavell Court, HU5 4BA
#
# Reason 4: Some properties are a combination of reason 1 and 3. This could be to do with inaccurate EPCs as
# emperically speaking, when going through this manually, it seemed like the ones that fall into this
# category had slightly older EPCs (pre-2019). There are a few like this but e.g
# 3, Summergroves Way HU4 6SZ
# We now look for properties that we identified, that were not identified by Warmfront
new_possibilities = results_df[
(~results_df["warmfront_identified"]) &
(results_df["gbis_eligible"] | results_df["eco4_eligible"])
].copy()
# We deem that Any EPC that is produced in the last 3 years gives us good confidence
cutoff_date = datetime.now() - timedelta(days=3 * 365)
new_possibilities["high_confidence"] = pd.to_datetime(new_possibilities["date_epc"]) >= cutoff_date
# We do a quick check on properties that didn't have a house number:
no_house_numbers_ha32 = ha32[ha32["row_id"].isin(no_house_numbers)]["identified"].sum()
if no_house_numbers_ha32:
logger.error("We have some identified properties that have no house numbers - investigate me")
return success_rate, new_possibilities
def app():
ha32_asset_list, ha15_asset_list, ha32_identified_addresses, ha15_identified_addresses = load_data()
@ -422,72 +632,5 @@ def app():
# We want to retrieve EPCs for every single property
# NOTE: HA32 is MOSTLY cavity via GBIS
ha_data = ha32
house_number_key = "Dwelling num"
address_key = "Street"
postcode_key = "Postcode"
def get_data(ha_data, house_number_key, address_key, postcode_key):
scoring_data = []
results = []
for _, house in tqdm(ha_data.iterrows(), total=len(ha_data)):
searcher = SearchEpc(
address1=" ".join([house[house_number_key], house[address_key]]),
postcode=house[postcode_key]
)
searcher.search()
newest_epc, older_epcs, _ = searcher.retrieve()
# We also want to get the penultimate epc
penultimate_epc, _ = searcher.filter_newest_epc(older_epcs)
if not penultimate_epc:
penultimate_epc = newest_epc
eligibility = Eligibility(epc=newest_epc, cleaned=cleaned)
eligibility.check_gbis()
eligibility.check_eco4()
# If there is no eligibility, we need to check the penultimate epc
if (not eligibility.eco4["eligible"]) and (not eligibility.gbis):
eligibility = Eligibility(epc=penultimate_epc, cleaned=cleaned)
eligibility.check_gbis()
eligibility.check_eco4()
if eligibility.eco4["eligible"]:
# TODO: Check me
scoring_dictionary = prepare_model_data_row(
property_id=house["row_id"],
modelling_epc=eligibility.epc,
cleaned=cleaned,
cleaning_data=cleaning_data,
created_at=created_at
)
scoring_data.append(scoring_dictionary)
results.append(
{
"row_id": house["row_id"],
"warmfront_identified": house["identified"],
"gbis_eligible": eligibility.gbis,
"eco4_eligible": eligibility.eco4["eligible"],
"date_epc": eligibility.epc["lodgement-date"],
"eco4_note": "conditional on post sap"
}
)
continue
if (house["identified"] and not eligibility.gbis) and (
house["identified"] and not eligibility.eco4["eligible"]):
raise NotImplementedError("Investigate ms")
# If nothing is eligible or gbis is eligible, then we make a record this
results.append(
{
"row_id": house["row_id"],
"warmfront_identified": house["identified"],
"gbis_eligible": eligibility.gbis,
"eco4_eligible": eligibility.eco4["eligible"],
"date_epc": eligibility.epc["lodgement-date"],
"eco4_note": None
}
)
ha_32_results = get_ha_32data(ha_data, cleaned, cleaning_data, created_at)