mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
added in cavity age estimation
This commit is contained in:
parent
f78078384b
commit
04dba265de
5 changed files with 490 additions and 30 deletions
|
|
@ -19,7 +19,9 @@ class PropertyValuation:
|
|||
100070505235: 344000, # Based on Zoopla's estimation of 131 School road, which is also semi-detached
|
||||
100070513306: 182000, # Based on Zoopla's estimation of 61 Simmons Drive
|
||||
100071306896: 77000, # Based on Flat 2 of 44 Wedgewood Road on Zoopla
|
||||
100021192109: 650000 # Based on Zoopla
|
||||
100021192109: 650000, # Based on Zoopla
|
||||
766249482: 358000, # Based on Zoopla estimate for 19 Spring Lane, 3 bedroom semi-detached
|
||||
100120703802: 277000, # Based on Zoopla
|
||||
}
|
||||
|
||||
# We base our valuation uplifts on a number of sources
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ class Eligibility:
|
|||
|
||||
# If the loft has less than 100mm of insulation, we classify the home has needing loft insulation
|
||||
LOFT_INSULATION_THRESHOLD = 100
|
||||
HIGH_LOFT_INSULATION_THRESHOLD = 269
|
||||
|
||||
# Because EPCS have different values for tenure, we need to remap them to a common set of values
|
||||
tenure_remap = {
|
||||
|
|
@ -104,6 +105,8 @@ class Eligibility:
|
|||
self.LOFT_INSULATION_THRESHOLD if loft_thickness_threshold is None else loft_thickness_threshold
|
||||
)
|
||||
|
||||
high_loft_thickness_threshold = self.HIGH_LOFT_INSULATION_THRESHOLD
|
||||
|
||||
# We firstly check if the roof is a loft
|
||||
is_loft = self.roof["is_pitched"] and (not self.roof["is_roof_room"])
|
||||
|
||||
|
|
@ -122,7 +125,22 @@ class Eligibility:
|
|||
is_flat=self.roof["is_flat"]
|
||||
)
|
||||
|
||||
if insulation_thickness > loft_thickness_threshold:
|
||||
if insulation_thickness <= loft_thickness_threshold:
|
||||
self.loft = {
|
||||
"suitability": True,
|
||||
"thickness": insulation_thickness,
|
||||
"reason": None
|
||||
}
|
||||
|
||||
if insulation_thickness <= high_loft_thickness_threshold:
|
||||
self.loft = {
|
||||
"suitability": True,
|
||||
"thickness": insulation_thickness,
|
||||
"reason": "high loft thickness but below regulation"
|
||||
}
|
||||
return
|
||||
|
||||
if insulation_thickness > high_loft_thickness_threshold:
|
||||
# Insulation is already thick enough
|
||||
self.loft = {
|
||||
"suitability": False,
|
||||
|
|
@ -131,12 +149,6 @@ class Eligibility:
|
|||
}
|
||||
return
|
||||
|
||||
self.loft = {
|
||||
"suitability": True,
|
||||
"thickness": insulation_thickness,
|
||||
"reason": None
|
||||
}
|
||||
|
||||
def cavity_insulation(self):
|
||||
|
||||
"""
|
||||
|
|
@ -161,6 +173,17 @@ class Eligibility:
|
|||
is_partial_filled_cavity = is_cavity and is_partial_filled
|
||||
is_underperforming_cavity = is_cavity and is_underperforming
|
||||
|
||||
# Check if it has internal or external wall insulation
|
||||
has_internal_wall_insulation = self.walls["internal_insulation"]
|
||||
has_external_wall_insulation = self.walls["external_insulation"]
|
||||
|
||||
if has_internal_wall_insulation or has_external_wall_insulation:
|
||||
self.cavity = {
|
||||
"suitability": False,
|
||||
"type": "internal or external wall insulation"
|
||||
}
|
||||
return
|
||||
|
||||
if is_unfilled_cavity:
|
||||
self.cavity = {
|
||||
"suitability": True,
|
||||
|
|
@ -354,6 +377,13 @@ class Eligibility:
|
|||
else:
|
||||
message = "subject to post retrofit sap" if is_eligible else "not eligible"
|
||||
|
||||
# Update the message to flag properties that failed just because of a full cavity.
|
||||
# We need to double check that the wall is a cavity, that the loft is suitable and that the
|
||||
# sap is within reason
|
||||
# We can then estimate the age of the cavity fill
|
||||
if not is_eligible and (current_sap < 69) and self.loft["suitability"] and self.walls["is_cavity_wall"]:
|
||||
message = "Failed due to full cavity - check cavity age"
|
||||
|
||||
self.eco4_warmfront = {
|
||||
"eligible": is_eligible,
|
||||
"message": message
|
||||
|
|
|
|||
|
|
@ -252,6 +252,31 @@ def load_data():
|
|||
return data, survey_list
|
||||
|
||||
|
||||
def calculate_cavity_age(newest_epc, older_epcs, cleaned):
|
||||
all_epcs = [newest_epc] + older_epcs
|
||||
|
||||
df = []
|
||||
for x in all_epcs:
|
||||
# Get the cleaned mapping
|
||||
mapped = [y for y in cleaned["walls-description"] if y["original_description"] == x["walls-description"]]
|
||||
if not mapped:
|
||||
continue
|
||||
df.append(
|
||||
{
|
||||
**mapped[0],
|
||||
"inspection-date": x["lodgement-date"],
|
||||
}
|
||||
)
|
||||
|
||||
df = pd.DataFrame(df)
|
||||
df = df[
|
||||
(df["is_cavity_wall"] == True) & (df["is_filled_cavity"] == True)
|
||||
]
|
||||
|
||||
cavity_age = (datetime.now() - pd.to_datetime(df["inspection-date"].max())).days
|
||||
return cavity_age
|
||||
|
||||
|
||||
def get_epc_data(data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds):
|
||||
scoring_data = []
|
||||
results = []
|
||||
|
|
@ -319,6 +344,19 @@ def get_epc_data(data, cleaned, cleaning_data, created_at, photo_supply_lookup,
|
|||
if penultimate_epc.get("estimated") is None:
|
||||
older_epcs = [x for x in searcher.data["rows"] if x["lmk-key"] != penultimate_epc["lmk-key"]]
|
||||
|
||||
# If the property is a cavity wall and it's filled, we produce an estimate for the age of the cavity
|
||||
|
||||
# Loft MUST be suitable
|
||||
cavity_age = None
|
||||
if (
|
||||
eligibility.walls["is_cavity_wall"] and
|
||||
eligibility.walls["is_filled_cavity"] and
|
||||
eligibility.loft["suitability"] and
|
||||
eligibility.eco4_warmfront["message"] == "Failed due to full cavity - check cavity age"
|
||||
):
|
||||
# We check the age of the cavity and if it's particularly old, we flag it
|
||||
cavity_age = calculate_cavity_age(newest_epc, older_epcs, cleaned)
|
||||
|
||||
# Full checks
|
||||
eligibility.check_gbis()
|
||||
eligibility.check_eco4()
|
||||
|
|
@ -362,6 +400,10 @@ def get_epc_data(data, cleaned, cleaning_data, created_at, photo_supply_lookup,
|
|||
"heating": eligibility.epc["mainheat-description"],
|
||||
"tenure": eligibility.tenure,
|
||||
"date_epc": eligibility.epc["lodgement-date"],
|
||||
"loft_thickness": eligibility.roof["insulation_thickness"],
|
||||
"cavity_age": cavity_age,
|
||||
**eligibility.walls,
|
||||
**eligibility.roof,
|
||||
}
|
||||
)
|
||||
|
||||
|
|
@ -472,12 +514,46 @@ def analyse_results(results_df, data, survey_list):
|
|||
(analysis_data["eco4_eligible"])
|
||||
]
|
||||
|
||||
eco_eligible = analysis_data[analysis_data["eco4_eligible"] == True]
|
||||
eco_ineligible = analysis_data[analysis_data["eco4_eligible"] == False]
|
||||
|
||||
eco_ineligible["eco4_message"].value_counts()
|
||||
|
||||
# SAP too high:
|
||||
sap_too_high = eco_ineligible[eco_ineligible["eco4_message"] == "sap too high"].copy()
|
||||
further_possibilities = sap_too_high[
|
||||
sap_too_high["walls"].isin(
|
||||
[
|
||||
"Cavity wall, as built, insulated",
|
||||
"Cavity wall, as built, no insulation",
|
||||
"Cavity wall, as built, partial insulation",
|
||||
"Cavity wall, no insulation",
|
||||
"Cavity wall, partial insulation"
|
||||
]
|
||||
)
|
||||
]
|
||||
|
||||
filled_cavities = eco_ineligible[
|
||||
eco_ineligible["eco4_message"] == "sap too high"
|
||||
]
|
||||
|
||||
warmfront_identified = analysis_data[analysis_data["warmfront_identified"]]
|
||||
warmfront_identified["walls"].value_counts()
|
||||
|
||||
all_identified_gbis = analysis_data[
|
||||
(analysis_data["warmfront_identified"] & analysis_data["funding_scheme"].isin(
|
||||
["ECO4 GBIS (ECO+)"])) |
|
||||
(analysis_data["gbis_eligible"] & analysis_data["eco4_eligible"].isin([False, None]))
|
||||
]
|
||||
|
||||
empty_cavity_desriptions = [
|
||||
"Cavity wall, as built, no insulation", "Cavity wall, as built, partial insulation",
|
||||
"Cavity wall, no insulation", "Cavity wall, partial insulation"
|
||||
]
|
||||
|
||||
empty_cavities = analysis_data[analysis_data["walls"].isin(empty_cavity_desriptions)]
|
||||
remaining_empty = empty_cavities[~empty_cavities["warmfront_identified"]]
|
||||
|
||||
warmfront_identified = analysis_data[analysis_data["warmfront_identified"]]
|
||||
|
||||
# Of the ECO jobs, what proportion to we get right
|
||||
|
|
@ -553,7 +629,7 @@ def app():
|
|||
|
||||
# Read pickle
|
||||
# import pickle
|
||||
# with open("ha16.pickle", "rb") as f:
|
||||
# with open("ha16_8_jan_2.pickle", "rb") as f:
|
||||
# saved = pickle.load(f)
|
||||
# scoring_data = saved["scoring_data"]
|
||||
# results_df = saved["results"]
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import os
|
||||
import msgpack
|
||||
import openpyxl
|
||||
from openpyxl.styles.colors import COLOR_INDEX
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
import pandas as pd
|
||||
|
|
@ -8,7 +8,7 @@ import numpy as np
|
|||
from utils.s3 import read_from_s3
|
||||
from utils.logger import setup_logger
|
||||
from dotenv import load_dotenv
|
||||
from backend.app.utils import read_parquet_from_s3
|
||||
from utils.s3 import read_dataframe_from_s3_parquet
|
||||
from tqdm import tqdm
|
||||
from backend.SearchEpc import SearchEpc
|
||||
from etl.eligibility.Eligibility import Eligibility
|
||||
|
|
@ -16,9 +16,11 @@ from etl.eligibility.ha_15_32.app import prepare_model_data_row
|
|||
from etl.epc.DataProcessor import DataProcessor
|
||||
from etl.epc.settings import COLUMNS_TO_MERGE_ON
|
||||
from backend.ml_models.api import ModelApi
|
||||
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
|
||||
|
||||
import re
|
||||
|
||||
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
|
||||
ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env"
|
||||
|
||||
logger = setup_logger()
|
||||
|
|
@ -272,27 +274,99 @@ def load_data():
|
|||
)
|
||||
data["warmfront_identified"] = data["warmfront_identified"].fillna(False)
|
||||
|
||||
return data, eco4_prospects_survey_list
|
||||
lost_identified_properties = eco4_prospects_survey_list[
|
||||
~eco4_prospects_survey_list["survey_key"].isin(matched["survey_key"])
|
||||
]
|
||||
|
||||
return data, eco4_prospects_survey_list, lost_identified_properties
|
||||
|
||||
|
||||
def get_epc_data(data, cleaned, cleaning_data, created_at):
|
||||
def map_year_to_age_band(year):
|
||||
try:
|
||||
year = int(year)
|
||||
except ValueError:
|
||||
return "Invalid Year" # Or any other way you want to handle invalid inputs
|
||||
|
||||
if year < 1900:
|
||||
return "England and Wales: before 1900"
|
||||
elif 1900 <= year <= 1929:
|
||||
return "England and Wales: 1900-1929"
|
||||
elif 1930 <= year <= 1949:
|
||||
return "England and Wales: 1930-1949"
|
||||
elif 1950 <= year <= 1966:
|
||||
return "England and Wales: 1950-1966"
|
||||
elif 1967 <= year <= 1975:
|
||||
return "England and Wales: 1967-1975"
|
||||
elif 1976 <= year <= 1982:
|
||||
return "England and Wales: 1976-1982"
|
||||
elif 1983 <= year <= 1990:
|
||||
return "England and Wales: 1983-1990"
|
||||
elif 1991 <= year <= 1995:
|
||||
return "England and Wales: 1991-1995"
|
||||
elif 1996 <= year <= 2002:
|
||||
return "England and Wales: 1996-2002"
|
||||
elif 2003 <= year <= 2006:
|
||||
return "England and Wales: 2003-2006"
|
||||
elif 2007 <= year <= 2011:
|
||||
return "England and Wales: 2007-2011"
|
||||
else: # Assuming all remaining years are 2012 onwards
|
||||
return "England and Wales: 2012 onwards"
|
||||
|
||||
|
||||
def get_epc_data(data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds):
|
||||
scoring_data = []
|
||||
results = []
|
||||
nodata = []
|
||||
|
||||
property_type_lookup = {
|
||||
"Flat": {"property-type": "Flat", "built-form": None},
|
||||
"Mid Terrace House": {"property-type": "House", "built-form": "Mid-Terrace"},
|
||||
"End Terrace House": {"property-type": "House", "built-form": "End-Terrace"},
|
||||
"Maisonnette": {"property-type": "Flat", "built-form": None},
|
||||
"Semi Detached House": {"property-type": "House", "built-form": "Semi-Detached"},
|
||||
"Detached House": {"property-type": "House", "built-form": "Detached"},
|
||||
"Coach House": {"property-type": "House", "built-form": "Detached"},
|
||||
"Bungalow": {"property-type": "Bungalow", "built-form": None},
|
||||
"Detached Bungalow": {"property-type": "Bungalow", "built-form": "Detached"},
|
||||
"House": {"property-type": "House", "built-form": None},
|
||||
"Semi Detached Bung": {"property-type": "Bungalow", "built-form": "Semi-Detached"},
|
||||
"Bedspace": {"property-type": None, "built-form": None},
|
||||
"Office Buildings": {"property-type": None, "built-form": None},
|
||||
"End Terrace Bungalow": {"property-type": "Bungalow", "built-form": "End-Terrace"},
|
||||
"Mid Terrace Bungalow": {"property-type": "Bungalow", "built-form": "Mid-Terrace"},
|
||||
"Bedsit": {"property-type": "Flat", "built-form": None},
|
||||
"Mid Terrace Housekeeping": {"property-type": "House", "built-form": "Mid-Terrace"},
|
||||
"Mid Terrace Housekeeping ": {"property-type": "House", "built-form": "Mid-Terrace"},
|
||||
"End Terrace Housex": {"property-type": "House", "built-form": "End-Terrace"},
|
||||
"Guest Room": {"property-type": None, "built-form": None}
|
||||
}
|
||||
|
||||
for _, property_meta in tqdm(data.iterrows(), total=len(data)):
|
||||
|
||||
searcher = SearchEpc(
|
||||
address1=property_meta["HouseNo"],
|
||||
postcode=property_meta["postcode"],
|
||||
size=1000
|
||||
auth_token=EPC_AUTH_TOKEN,
|
||||
os_api_key=None,
|
||||
full_address=property_meta["address"]
|
||||
)
|
||||
searcher.search()
|
||||
searcher.ordnance_survey_client.property_type = property_type_lookup[property_meta["T1_AssetType"]][
|
||||
"property-type"]
|
||||
searcher.ordnance_survey_client.built_form = property_type_lookup[property_meta["T1_AssetType"]]["built-form"]
|
||||
searcher.find_property(skip_os=True)
|
||||
|
||||
if searcher.data is None:
|
||||
if searcher.newest_epc is None:
|
||||
nodata.append(property_meta)
|
||||
continue
|
||||
|
||||
newest_epc, older_epcs, full_sap_epc = searcher.retrieve(address=property_meta["T1_Address"])
|
||||
if searcher.newest_epc.get("estimated"):
|
||||
# We insert the row ID as our proxy for UPRN
|
||||
proxy_uprn = int(property_meta["row_id"].split("_")[1])
|
||||
searcher.newest_epc["uprn"] = proxy_uprn
|
||||
|
||||
newest_epc = searcher.newest_epc
|
||||
older_epcs = searcher.older_epcs
|
||||
full_sap_epc = searcher.full_sap_epc
|
||||
# We also want to get the penultimate epc
|
||||
penultimate_epc, _ = searcher.filter_newest_epc(older_epcs)
|
||||
if not penultimate_epc:
|
||||
|
|
@ -302,25 +376,26 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
|
|||
eligibility.check_gbis_warmfront()
|
||||
eligibility.check_eco4_warmfront()
|
||||
|
||||
if (not eligibility.eco4_warmfront["eligible"]) and (not eligibility.gbis_warmfront) and (
|
||||
property_meta["warmfront_identified"]
|
||||
):
|
||||
if (not eligibility.eco4_warmfront["eligible"]) and (not eligibility.gbis_warmfront):
|
||||
eligibility = Eligibility(epc=penultimate_epc, cleaned=cleaned)
|
||||
eligibility.check_gbis_warmfront()
|
||||
eligibility.check_eco4_warmfront()
|
||||
# If this is the case, we need to update the older epcs
|
||||
older_epcs = [
|
||||
x for x in older_epcs if x["lmk-key"] not in [newest_epc["lmk-key"], penultimate_epc["lmk-key"]]
|
||||
]
|
||||
# We don't update just to make data cleaning easier
|
||||
if penultimate_epc.get("estimated") is None:
|
||||
older_epcs = [x for x in searcher.data["rows"] if x["lmk-key"] != penultimate_epc["lmk-key"]]
|
||||
|
||||
# Full checks
|
||||
eligibility.check_gbis()
|
||||
eligibility.check_eco4()
|
||||
|
||||
if eligibility.eco4_warmfront["eligible"]:
|
||||
if eligibility.epc["uprn"] == "":
|
||||
if eligibility.epc["uprn"] in ["", None]:
|
||||
eligibility.epc["uprn"] = int(property_meta["row_id"].split("_")[1])
|
||||
|
||||
if eligibility.epc["construction-age-band"] in ["", None]:
|
||||
eligibility.epc["construction-age-band"] = map_year_to_age_band(property_meta["Build Yr"])
|
||||
|
||||
scoring_dictionary = prepare_model_data_row(
|
||||
property_id=property_meta["row_id"],
|
||||
modelling_epc=eligibility.epc,
|
||||
|
|
@ -328,7 +403,9 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
|
|||
cleaning_data=cleaning_data,
|
||||
created_at=created_at,
|
||||
old_data=older_epcs,
|
||||
full_sap_epc=full_sap_epc
|
||||
full_sap_epc=full_sap_epc,
|
||||
photo_supply_lookup=photo_supply_lookup,
|
||||
floor_area_decile_thresholds=floor_area_decile_thresholds,
|
||||
)
|
||||
scoring_data.extend(scoring_dictionary)
|
||||
|
||||
|
|
@ -450,6 +527,232 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
|
|||
return results_df, scoring_data, nodata
|
||||
|
||||
|
||||
def get_epc_data_for_lost_surveys(
|
||||
lost_identified_properties, cleaned, cleaning_data, created_at, photo_supply_lookup,
|
||||
floor_area_decile_thresholds
|
||||
):
|
||||
lost_identified_properties["row_id"] = [
|
||||
"lost_surveys_ha25_" + str(i) for i in range(0, len(lost_identified_properties))
|
||||
]
|
||||
|
||||
scoring_data = []
|
||||
results = []
|
||||
nodata = []
|
||||
|
||||
property_type_lookup = {
|
||||
"MID-TERRACE": {"property-type": "House", "built-form": "Mid-Terrace"},
|
||||
"N/A": {"property-type": "House", "built-form": None},
|
||||
"END-TERRACE": {"property-type": "House", "built-form": "End-Terrace"},
|
||||
"GROUND-FLOOR": {"property-type": "House", "built-form": None},
|
||||
"TOP-FLOOR": {"property-type": "House", "built-form": None},
|
||||
"SEMI-DETACHED": {"property-type": "House", "built-form": "Semi-Detached"},
|
||||
"MID-FLOOR": {"property-type": "House", "built-form": None},
|
||||
"TOP-FLOOR FLAT": {"property-type": "House", "built-form": None},
|
||||
"DETACHED": {"property-type": "House", "built-form": "Detached"},
|
||||
"MID-FLOOR FLAT": {"property-type": "House", "built-form": None},
|
||||
"SEMI- DETACHED": {"property-type": "House", "built-form": "Semi-Detached"},
|
||||
"NO EPC ON GOV": {"property-type": "House", "built-form": None},
|
||||
"Top-floor flat": {"property-type": "House", "built-form": None},
|
||||
"GROUND-FLOOR FLAT": {"property-type": "House", "built-form": None},
|
||||
"NOT ON GOV SITE": {"property-type": "House", "built-form": None}
|
||||
}
|
||||
|
||||
for _, property_meta in tqdm(lost_identified_properties.iterrows(), total=len(lost_identified_properties)):
|
||||
|
||||
if property_meta["POSTCODE"] is None:
|
||||
continue
|
||||
|
||||
full_address = ", ".join(
|
||||
[str(x) for x in [
|
||||
property_meta["NO"], property_meta["ADDRESS 1"], property_meta["ADDRESS 2"], property_meta["ADDRESS 3"]
|
||||
] if x is not None]
|
||||
)
|
||||
|
||||
searcher = SearchEpc(
|
||||
address1=str(property_meta["NO"]),
|
||||
postcode=property_meta["POSTCODE"],
|
||||
auth_token=EPC_AUTH_TOKEN,
|
||||
os_api_key=None,
|
||||
full_address=full_address
|
||||
)
|
||||
|
||||
property_type_key = property_meta["PROPERTY TYPE"]
|
||||
if property_type_key is not None:
|
||||
searcher.ordnance_survey_client.property_type = property_type_lookup[property_type_key.strip()][
|
||||
"property-type"]
|
||||
searcher.ordnance_survey_client.built_form = property_type_lookup[property_type_key.strip()][
|
||||
"built-form"]
|
||||
searcher.find_property(skip_os=True)
|
||||
|
||||
if searcher.newest_epc is None:
|
||||
nodata.append(property_meta)
|
||||
continue
|
||||
|
||||
if searcher.newest_epc.get("estimated"):
|
||||
# We insert the row ID as our proxy for UPRN
|
||||
proxy_uprn = int(property_meta["row_id"].split("_")[-1])
|
||||
searcher.newest_epc["uprn"] = proxy_uprn
|
||||
|
||||
newest_epc = searcher.newest_epc
|
||||
older_epcs = searcher.older_epcs
|
||||
full_sap_epc = searcher.full_sap_epc
|
||||
# We also want to get the penultimate epc
|
||||
penultimate_epc, _ = searcher.filter_newest_epc(older_epcs)
|
||||
if not penultimate_epc:
|
||||
penultimate_epc = newest_epc
|
||||
|
||||
eligibility = Eligibility(epc=newest_epc, cleaned=cleaned)
|
||||
eligibility.check_gbis_warmfront()
|
||||
eligibility.check_eco4_warmfront()
|
||||
|
||||
if (not eligibility.eco4_warmfront["eligible"]) and (not eligibility.gbis_warmfront):
|
||||
eligibility = Eligibility(epc=penultimate_epc, cleaned=cleaned)
|
||||
eligibility.check_gbis_warmfront()
|
||||
eligibility.check_eco4_warmfront()
|
||||
# If this is the case, we need to update the older epcs
|
||||
# We don't update just to make data cleaning easier
|
||||
if penultimate_epc.get("estimated") is None:
|
||||
older_epcs = [x for x in searcher.data["rows"] if x["lmk-key"] != penultimate_epc["lmk-key"]]
|
||||
|
||||
# Full checks
|
||||
eligibility.check_gbis()
|
||||
eligibility.check_eco4()
|
||||
|
||||
if eligibility.eco4_warmfront["eligible"] & (eligibility.epc["construction-age-band"] not in ["", None]):
|
||||
if eligibility.epc["uprn"] in ["", None]:
|
||||
eligibility.epc["uprn"] = int(property_meta["row_id"].split("_")[1])
|
||||
|
||||
scoring_dictionary = prepare_model_data_row(
|
||||
property_id=property_meta["row_id"],
|
||||
modelling_epc=eligibility.epc,
|
||||
cleaned=cleaned,
|
||||
cleaning_data=cleaning_data,
|
||||
created_at=created_at,
|
||||
old_data=older_epcs,
|
||||
full_sap_epc=full_sap_epc,
|
||||
photo_supply_lookup=photo_supply_lookup,
|
||||
floor_area_decile_thresholds=floor_area_decile_thresholds,
|
||||
)
|
||||
scoring_data.extend(scoring_dictionary)
|
||||
|
||||
results.append(
|
||||
{
|
||||
"row_id": property_meta["row_id"],
|
||||
"uprn": eligibility.epc["uprn"],
|
||||
"Address": property_meta["ADDRESS 1"],
|
||||
"Postcode": property_meta["POSTCODE"],
|
||||
"property_type": eligibility.epc["property-type"],
|
||||
"gbis_eligible": eligibility.gbis_warmfront,
|
||||
"eco4_eligible": eligibility.eco4_warmfront["eligible"],
|
||||
"eco4_message": eligibility.eco4_warmfront["message"],
|
||||
"sap": float(eligibility.epc["current-energy-efficiency"]),
|
||||
"gbis_eligible_future": eligibility.gbis["eligible"],
|
||||
"gbis_eligible_future_message": eligibility.gbis["message"],
|
||||
"eco4_eligible_future": eligibility.eco4["eligible"],
|
||||
"eco4_eligible_future_message": eligibility.eco4["message"],
|
||||
# Property components
|
||||
"roof": eligibility.roof["clean_description"],
|
||||
"walls": eligibility.walls["clean_description"],
|
||||
"cavity_type": eligibility.cavity["type"],
|
||||
"heating": eligibility.epc["mainheat-description"],
|
||||
"tenure": eligibility.tenure,
|
||||
"date_epc": eligibility.epc["lodgement-date"],
|
||||
}
|
||||
)
|
||||
|
||||
scoring_df = pd.DataFrame(scoring_data)
|
||||
|
||||
# Perform the same cleaning as in the model - first clean number of room variables though
|
||||
scoring_df = DataProcessor.apply_averages_cleaning(
|
||||
data_to_clean=scoring_df,
|
||||
cleaning_data=cleaning_data,
|
||||
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
|
||||
colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
|
||||
)
|
||||
|
||||
scoring_df = DataProcessor.apply_averages_cleaning(
|
||||
data_to_clean=scoring_df,
|
||||
cleaning_data=cleaning_data,
|
||||
cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"],
|
||||
).drop(columns=["LOCAL_AUTHORITY"])
|
||||
|
||||
scoring_df = DataProcessor.clean_missings_after_description_process(
|
||||
scoring_df,
|
||||
ignore_cols=[c for c in scoring_df.columns if ("thermal_transmittance" in c) or (
|
||||
"insulation_thickness" in c) or ("ENERGY_EFF" in c)]
|
||||
)
|
||||
|
||||
scoring_df = DataProcessor.clean_efficiency_variables(scoring_df)
|
||||
scoring_df["UPRN"] = scoring_df["UPRN"].astype(int)
|
||||
|
||||
model_api = ModelApi(portfolio_id="ha33-eligibility", timestamp=created_at)
|
||||
all_predictions = model_api.predict_all(
|
||||
df=scoring_df,
|
||||
bucket="retrofit-data-dev",
|
||||
prediction_buckets={
|
||||
"sap_change_predictions": "retrofit-sap-predictions-dev",
|
||||
"heat_demand_predictions": "retrofit-heat-predictions-dev",
|
||||
"carbon_change_predictions": "retrofit-carbon-predictions-dev"
|
||||
}
|
||||
)
|
||||
|
||||
predictions = all_predictions["sap_change_predictions"].copy()
|
||||
|
||||
results_df = pd.DataFrame(results)
|
||||
|
||||
predictions = predictions.rename(columns={"property_id": "row_id"}).merge(
|
||||
results_df[["row_id", "sap"]], how="left", on="row_id"
|
||||
)
|
||||
predictions["sap_uplift"] = predictions["predictions"] - predictions["sap"]
|
||||
predictions = predictions.groupby("row_id")["sap_uplift"].sum().reset_index()
|
||||
|
||||
results_df = results_df.merge(
|
||||
predictions[["sap_uplift", "row_id"]],
|
||||
how="left",
|
||||
on="row_id"
|
||||
)
|
||||
results_df["post_install_sap"] = results_df["sap"] + results_df["sap_uplift"]
|
||||
|
||||
eligibility_assessment = []
|
||||
for _, row in results_df[results_df["eco4_eligible"] == True].iterrows():
|
||||
# The upgrade requirements are dependent on the current SAP
|
||||
|
||||
# If the property is an F or G, it only needs to upgrade to an %
|
||||
if row["sap"] <= 38:
|
||||
if row["post_install_sap"] >= 57:
|
||||
eligibility_classification = "highest confidence"
|
||||
elif row["post_install_sap"] >= 55:
|
||||
eligibility_classification = "high confidence"
|
||||
elif row["post_install_sap"] >= 53:
|
||||
eligibility_classification = "medium confidence"
|
||||
else:
|
||||
eligibility_classification = "unlikely"
|
||||
else:
|
||||
|
||||
if row["post_install_sap"] >= 71:
|
||||
eligibility_classification = "highest confidence"
|
||||
elif row["post_install_sap"] >= 69:
|
||||
eligibility_classification = "high confidence"
|
||||
elif row["post_install_sap"] >= 67:
|
||||
eligibility_classification = "medium confidence"
|
||||
else:
|
||||
eligibility_classification = "unlikely"
|
||||
|
||||
eligibility_assessment.append(
|
||||
{
|
||||
"row_id": row["row_id"],
|
||||
"eligibility_classification": eligibility_classification
|
||||
}
|
||||
)
|
||||
|
||||
eligibility_assessment = pd.DataFrame(eligibility_assessment)
|
||||
|
||||
results_df = results_df.merge(
|
||||
eligibility_assessment, how="left", on="row_id"
|
||||
)
|
||||
return results_df, scoring_data, nodata
|
||||
|
||||
|
||||
def analyse_results(results_df, data, eco4_prospects_survey_list):
|
||||
analysis_data = data[["row_id", "survey_key", "warmfront_identified"]].merge(
|
||||
results_df, how="left", on="row_id"
|
||||
|
|
@ -457,6 +760,18 @@ def analyse_results(results_df, data, eco4_prospects_survey_list):
|
|||
|
||||
warmfront_identified = analysis_data[analysis_data["warmfront_identified"]]
|
||||
|
||||
identified_eco = analysis_data[analysis_data["eco4_eligible"] == True]
|
||||
identified_eco = identified_eco[identified_eco["eco4_message"] == "subject to post retrofit sap"]
|
||||
|
||||
identified_gbis = analysis_data[
|
||||
(analysis_data["gbis_eligible"] == True) & (analysis_data["eco4_eligible"] == False)
|
||||
]
|
||||
|
||||
# Take just unfilled cavities and remove filled potentials
|
||||
identified_gbis["walls"].value_counts()
|
||||
|
||||
identified_gbis["walls"].value_counts()
|
||||
|
||||
# Of the ECO jobs, what proportion to we get right
|
||||
|
||||
success_rate = (warmfront_identified["eco4_eligible"] | warmfront_identified["gbis_eligible"]).sum() / \
|
||||
|
|
@ -490,8 +805,15 @@ def analyse_results(results_df, data, eco4_prospects_survey_list):
|
|||
].shape[0]
|
||||
|
||||
|
||||
def analyse_lost_surveys(results_df):
|
||||
identified_eco = results_df[results_df["eco4_eligible"] == True]
|
||||
# 59 for lost surveys
|
||||
identified_gbis = results_df[results_df["gbis_eligible"] == True]
|
||||
# 107
|
||||
|
||||
|
||||
def app():
|
||||
data, eco4_prospects_survey_list = load_data()
|
||||
data, eco4_prospects_survey_list, lost_identified_properties = load_data()
|
||||
|
||||
data["row_id"] = ["ha25_" + str(i) for i in range(0, len(data))]
|
||||
|
||||
|
|
@ -501,16 +823,21 @@ def app():
|
|||
)
|
||||
cleaned = msgpack.unpackb(cleaned, raw=False)
|
||||
|
||||
cleaning_data = read_parquet_from_s3(
|
||||
cleaning_data = read_dataframe_from_s3_parquet(
|
||||
bucket_name="retrofit-data-dev", file_key="sap_change_model/cleaning_dataset.parquet",
|
||||
)
|
||||
|
||||
created_at = datetime.now().isoformat()
|
||||
|
||||
results_df, scoring_data, nodata = get_epc_data(data, cleaned, cleaning_data, created_at)
|
||||
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket="retrofit-data-dev")
|
||||
|
||||
results_df, scoring_data, nodata = get_epc_data(
|
||||
data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds
|
||||
)
|
||||
# Pickle the outputs
|
||||
# Old data was ha25.pickle
|
||||
# import pickle
|
||||
# with open("ha25.pickle", "wb") as f:
|
||||
# with open("ha25_9_jan.pickle", "wb") as f:
|
||||
# pickle.dump(
|
||||
# {
|
||||
# "results_df": results_df,
|
||||
|
|
@ -519,3 +846,11 @@ def app():
|
|||
# },
|
||||
# f
|
||||
# )
|
||||
|
||||
# Load in pickle
|
||||
# import pickle
|
||||
# with open("ha25_9_jan.pickle", "rb") as f:
|
||||
# saved = pickle.load(f)
|
||||
# results_df = saved["results_df"]
|
||||
# scoring_data = saved["scoring_data"]
|
||||
# nodata = saved["nodata"]
|
||||
|
|
|
|||
|
|
@ -150,6 +150,7 @@ def get_ha7_data(data, cleaned, cleaning_data, created_at, photo_supply_lookup,
|
|||
"heating": eligibility.epc["mainheat-description"],
|
||||
"tenure": eligibility.tenure,
|
||||
"date_epc": eligibility.epc["lodgement-date"],
|
||||
**newest_epc,
|
||||
}
|
||||
)
|
||||
|
||||
|
|
@ -250,10 +251,18 @@ def get_ha7_data(data, cleaned, cleaning_data, created_at, photo_supply_lookup,
|
|||
|
||||
def analyse_ha_7(results_df, data):
|
||||
df = results_df.merge(
|
||||
data[["row_id", "row_code", "Property Type"]], how="left", on="row_id"
|
||||
data[["row_id", "row_code", "Property Type", "Construction Year Band"]], how="left", on="row_id"
|
||||
)
|
||||
warmfront_identification = df["row_code"].value_counts()
|
||||
warmfront_identified = df[df["row_code"] == "potential ECO4"]
|
||||
warmfront_identified["walls"].value_counts(normalize=True)
|
||||
|
||||
df["Construction Year Band"].value_counts(normalize=True)
|
||||
|
||||
# Number of days from today
|
||||
|
||||
days_to_today = (datetime.now() - pd.to_datetime(warmfront_identified["date_epc"])).dt.days
|
||||
days_to_today.mean()
|
||||
|
||||
property_types = df["Property Type"].value_counts()
|
||||
|
||||
|
|
@ -305,3 +314,11 @@ def app():
|
|||
# import pickle
|
||||
# with open("ha7_results.pkl", "wb") as f:
|
||||
# pickle.dump({"results_df": results_df, "scoring_data": scoring_data, "nodata": nodata}, f)
|
||||
|
||||
# Read in the old data
|
||||
# import pickle
|
||||
# with open("ha7_results.pkl", "rb") as f:
|
||||
# old_data = pickle.load(f)
|
||||
# results_df = old_data["results_df"]
|
||||
# scoring_data = old_data["scoring_data"]
|
||||
# nodata = old_data["nodata"]
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue