working on ha33 app

This commit is contained in:
Khalim Conn-Kowlessar 2023-12-15 11:32:22 +00:00
parent 707724cdb1
commit 538e38dc1e
6 changed files with 210 additions and 18 deletions

View file

@ -126,7 +126,7 @@ class SearchEpc:
uprns = {r["uprn"] for r in rows}
if len(uprns) != 1:
logger.error("Multiple EPCs found - we should use an alternate method of searching - TODO")
logger.error("Multiple UPRNS found - we should use an alternate method of searching - TODO")
if property_type is not None:
# We can do a filter on the property type
rows_filtered = [r for r in rows if r["property-type"] == property_type]

View file

@ -56,9 +56,6 @@ class Eligibility:
self.roof = self.parse_fabric("roof-description")
self.floor = self.parse_fabric("floor-description")
self.loft_insulation()
self.cavity_insulation()
self.tenure = self.tenure_remap.get(self.epc["tenure"], None)
def parse_fabric(self, key):
@ -71,14 +68,19 @@ class Eligibility:
if remapped:
return remapped[0]
if "SAP05:" in self.epc[key]:
# This is a placeholder method for handling this but this will occur in the case of a very old
# EPC and therefore we just skip
self.epc[key] = "(assumed)"
if key == "walls-description":
cleaner_cls = WallAttributes(self.epc["roof-description"])
cleaner_cls = WallAttributes(self.epc[key])
elif key == "roof-description":
cleaner_cls = RoofAttributes(self.epc["roof-description"])
cleaner_cls = RoofAttributes(self.epc[key])
elif key == "floor-description":
cleaner_cls = FloorAttributes(self.epc["floor-description"])
cleaner_cls = FloorAttributes(self.epc[key])
else:
raise ValueError("Invalid key")

View file

@ -380,7 +380,7 @@ def prepare_model_data_row(property_id, modelling_epc, cleaned, cleaning_data, c
{
"recommendation_id": "-".join([property_id, "cavity"]),
"type": "cavity_wall_insulation",
"new_u_value": 0.55,
"new_u_value": 0.35,
"parts": [{}]
},
{
@ -997,21 +997,12 @@ def analyse_ha_15_results(results_df, ha15, no_house_numbers):
(results_df["eco4_eligible_future"] == True) & (~(results_df["gbis_eligible"] | results_df["eco4_eligible"]))
].copy()
rids = new_possibilities_eco[new_possibilities_eco["sap"] == 54]["row_id"]
z = ha15[ha15["row_id"].isin(rids)]
new_possibilities_gbis = results_df[
(~results_df["warmfront_identified"]) &
(results_df["gbis_eligible_future"] == True) & (results_df["eco4_eligible_future"] == False) & (
~(results_df["gbis_eligible"] | results_df["eco4_eligible"]))
].copy()
not_new = results_df[
(~results_df["warmfront_identified"]) &
(results_df["gbis_eligible_future"] != True) & (results_df["eco4_eligible_future"] != True) & (
~(results_df["gbis_eligible"] | results_df["eco4_eligible"]))
].copy()
# We deem that Any EPC that is produced in the last 3 years gives us good confidence for GBIS
cutoff_date = datetime.now() - timedelta(days=3 * 365)
@ -1094,7 +1085,7 @@ def app():
# with open("ha32.pickle", "rb") as f:
# ha32_dict = pickle.load(f)
#
# ha32_results = ha32_dict["ha32_results"]
# ha32_scoring_data = ha32_dict["ha32_scoring_data"]
# ha32_no_house_numbers = ha32_dict["ha32_no_house_numbers"]
@ -1116,6 +1107,13 @@ def app():
# f
# )
# with open("ha15.pickle", "rb") as f:
# ha15_dict = pickle.load(f)
#
# ha15_results_df = ha15_dict["ha15_results_df"]
# ha15_scoring_df = ha15_dict["ha15_scoring_df"]
# ha15_no_house_numbers = ha15_dict["ha15_no_house_numbers"]
ha15_success_rate, ha15_new, ha15_identified_results, ha15_missed_results = analyse_ha_15_results(
results_df=ha15_results_df,
ha15=ha15,

View file

@ -0,0 +1,174 @@
import msgpack
from pathlib import Path
from datetime import datetime
import pandas as pd
from utils.s3 import read_from_s3
from utils.logger import setup_logger
from dotenv import load_dotenv
from backend.app.utils import read_parquet_from_s3
from tqdm import tqdm
from backend.SearchEpc import SearchEpc
from etl.eligibility.Eligibility import Eligibility
from etl.eligibility.ha_15_32.app import prepare_model_data_row
import re
ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env"
logger = setup_logger()
load_dotenv(ENV_FILE)
def load_ha_33():
"""
Load HA33 data
:return:
"""
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
files = [
"HA 33 Assets 1 of 4.csv",
"HA 33 Assets 2 of 4.csv",
"HA 33 Assets 3 of 4.csv",
"HA 33 Assets 4 of 4.csv"
]
data = []
for file in files:
part = pd.read_csv(f"etl/eligibility/ha_15_32/{file}", low_memory=False)
cols_to_top = [c for c in part.columns if "Unnamed:" in c]
part = part.drop(columns=cols_to_top)
data.append(part)
data = pd.concat(data)
return data
def standardise_ha33(data):
split_addresses = data['ADDRESS'].str.split(',', expand=True)
split_addresses.columns = ['address1', 'address2', 'address3', 'address4', 'address5']
data = pd.concat([data, split_addresses], axis=1)
del split_addresses
# Using regex to replace 'FT {number}' or 'FT{number}', with '{number}'
data['address1'] = data['address1'].str.replace(r'FT\s*(\d+)', r'\1', regex=True)
data.columns = [col.strip() for col in data.columns]
# TODO: we have 23 THIRTY SEVENTH AVENUE, can we replace THIRTY SEVENTH with 37TH
return data
def get_ha_33data(data, cleaned, cleaning_data, created_at):
house_type_lookup = {
"Bungalow": "Bungalow",
"Flat": "Flat",
'House': "House",
'Maisonette': "Maisonette",
'Flalolflfp mujjjjunjimj': "Flat",
'STUDIO': "Flat",
}
# house = data[data["row_id"] == "h3390"].squeeze()
flat_pattern = r'flat\s+(\d+)'
scoring_data = []
results = []
nodata = []
for _, house in tqdm(data.iterrows(), total=len(data)):
# Check if we gave a flat in address 3
if re.search(flat_pattern, house["address2"].lower(), re.IGNORECASE):
address1 = house["address2"].strip()
else:
address1 = house["address1"].strip()
# I.e. just a number
if len(address1) <= 3:
address1 = address1 + " " + house["address2"].strip()
searcher = SearchEpc(
address1=address1,
postcode=house["POST CODE"]
)
response = searcher.search()
if response["status"] == 204:
nodata.append(house["row_id"])
continue
newest_epc, older_epcs, _ = searcher.retrieve(
property_type=house_type_lookup.get(house["PROPERTY TYPE"], None)
)
eligibility = Eligibility(epc=newest_epc, cleaned=cleaned)
eligibility.check_gbis_warmfront()
eligibility.check_eco4_warmfront()
# If the house is not identified, we do a full gbis and eco4 check
eligibility.check_gbis()
eligibility.check_eco4()
if eligibility.eco4_warmfront["eligible"]:
scoring_dictionary = prepare_model_data_row(
property_id=house["row_id"],
modelling_epc=eligibility.epc,
cleaned=cleaned,
cleaning_data=cleaning_data,
created_at=created_at
)
scoring_data.append(scoring_dictionary)
# If nothing is eligible or gbis is eligible, then we make a record this
results.append(
{
"row_id": house["row_id"],
"gbis_eligible": eligibility.gbis_warmfront,
"eco4_eligible": eligibility.eco4_warmfront["eligible"],
"eco4_message": eligibility.eco4_warmfront["message"],
"sap": float(eligibility.epc["current-energy-efficiency"]),
"gbis_eligible_future": eligibility.gbis["eligible"],
"gbis_eligible_future_message": eligibility.gbis["message"],
"eco4_eligible_future": eligibility.eco4["eligible"],
"eco4_eligible_future_message": eligibility.eco4["message"],
# Property components
"roof": eligibility.roof["clean_description"],
"walls": eligibility.walls["clean_description"],
"heating": eligibility.epc["mainheat-description"],
"tenure": eligibility.tenure,
"date_epc": eligibility.epc["lodgement-date"],
}
)
return results, scoring_data, nodata
def app():
"""
Because HA33 is large, we deal with it separately
:return:
"""
data = load_ha_33()
data = standardise_ha33(data)
data["row_id"] = ["h33" + str(i) for i in range(0, len(data))]
cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-dev"
)
cleaned = msgpack.unpackb(cleaned, raw=False)
cleaning_data = read_parquet_from_s3(
bucket_name="retrofit-data-dev", file_key="sap_change_model/cleaning_dataset.parquet",
)
created_at = datetime.now().isoformat()
get_ha_33data(data, cleaned, cleaning_data, created_at)

View file

@ -33,6 +33,12 @@ class RoofAttributes(Definitions):
"ystafell(oedd) to, dim inswleiddio": "roof room(s), no insulation",
}
DEFAULT_KEYS = [
'thermal_transmittance', 'thermal_transmittance_unit', 'is_pitched', 'is_roof_room',
'is_loft', 'is_flat', 'is_thatched', 'is_at_rafters', 'is_assumed', 'has_dwelling_above',
'is_valid', 'insulation_thickness'
]
def __init__(self, description: str):
"""
:param description: Description of the roof.
@ -95,6 +101,8 @@ class RoofAttributes(Definitions):
result: Dict[str, Union[float, str, bool, None]] = {}
if self.nodata:
for key in self.DEFAULT_KEYS:
result[key] = False
return result
description = self.description

View file

@ -68,6 +68,13 @@ class WallAttributes(Definitions):
'Cowith external insulation': 'Cob, with external insulation',
}
DEFAULT_KEYS = [
'thermal_transmittance', 'thermal_transmittance_unit', 'is_cavity_wall', 'is_filled_cavity',
'is_solid_brick', 'is_system_built', 'is_timber_frame', 'is_granite_or_whinstone',
'is_as_built', 'is_cob', 'is_assumed', 'is_sandstone_or_limestone',
'insulation_thickness', 'external_insulation', 'internal_insulation'
]
def __init__(self, description: str):
"""
:param description: Description of the walls.
@ -98,6 +105,9 @@ class WallAttributes(Definitions):
def process(self) -> Dict[str, Union[float, str, bool, None]]:
result: Dict[str, Union[float, str, bool, None]] = {}
if self.nodata:
for key in self.DEFAULT_KEYS:
result[key] = False
return result
description = self.description.lower()