working on new ha batch

This commit is contained in:
Khalim Conn-Kowlessar 2024-01-24 00:17:17 +00:00
parent 4b73aa75b2
commit 04aeaae613
4 changed files with 167 additions and 63 deletions

View file

@ -68,7 +68,7 @@ class Property(Definitions):
self.in_conservation_area, self.is_listed, self.is_heritage = None, None, None
self.restricted_measures = False
self.year_built = epc_record.get("year_built")
self.number_of_rooms = epc_record.prepared_epc.get("number_heated_rooms")
self.number_of_rooms = epc_record.prepared_epc.get("number_habitable_rooms")
self.age_band = epc_record.get("age_band")
self.construction_age_band = epc_record.get("construction_age_band")
self.number_of_floors = epc_record.get("number_of_floors")

View file

@ -4,6 +4,7 @@ used by the Warmfront team, to identify which properties are eligible for ECO4 a
work is being done in December 2023, prior to completion of acquisition
"""
import pickle
from etl.epc.Record import EPCRecord
from pathlib import Path
from tqdm import tqdm
import pandas as pd
@ -345,48 +346,31 @@ def prepare_model_data_row(
:param modelling_epc:
:return:
"""
epc_records = {
'original_epc': modelling_epc.copy(),
'full_sap_epc': full_sap_epc.copy(),
'old_data': old_data.copy(),
}
prepared_epc = EPCRecord(
epc_records=epc_records,
run_mode="newdata",
cleaning_data=cleaning_data
)
p = Property(
id=property_id,
postcode=modelling_epc["postcode"],
address=modelling_epc["address1"],
data=modelling_epc,
old_data=old_data,
full_sap_epc=full_sap_epc
epc_record=prepared_epc
)
p.get_components(cleaned, photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds)
# THIS IS TEMP AND SHOULDN'T BE HERE
data_to_clean = p.get_model_data()
if data_to_clean["NUMBER_HEATED_ROOMS"] in ['', None]:
data_to_clean["NUMBER_HEATED_ROOMS"] = data_to_clean["NUMBER_HABITABLE_ROOMS"]
p.data["number-heated-rooms"] = data_to_clean["NUMBER_HABITABLE_ROOMS"]
# This is temp - this should happen after scoring
cleaned_property_data = DataProcessor.apply_averages_cleaning(
data_to_clean=pd.DataFrame([dict(**data_to_clean, LOCAL_AUTHORITY=p.data["local-authority"])]),
cleaning_data=cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
p.get_components(
cleaned, photo_supply_lookup=photo_supply_lookup, floor_area_decile_thresholds=floor_area_decile_thresholds
)
p.set_number_lighting_outlets(cleaned_property_data)
data_processor = DataProcessor(None, newdata=True)
data_processor.insert_data(pd.DataFrame([p.get_model_data()]))
data_processor.pre_process()
starting_epc_data = data_processor.get_component_features(suffix="_STARTING")
ending_epc_data = data_processor.get_component_features(suffix="_ENDING")
fixed_data = data_processor.get_fixed_features()
# We update the ending record with the recommended updates and we set lodgement date to today
ending_epc_data["DAYS_TO_ENDING"] = data_processor.calculate_days_to(created_at)
# We simulate the impact of the retrofit using expected performance of the wall and roof,
# after retrofit. We use the minimal u-values required to meet building regulations part L
# TODO: Check the performance of the materials warmfront's installers use, particularly for
# cavity
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
cavity_simulation = {
"recommendation_id": "-".join([property_id, "cavity"]),
@ -402,21 +386,16 @@ def prepare_model_data_row(
"parts": [{"depth": 270}]
}
cavity_scoring = create_recommendation_scoring_data(
property=p,
recommendation=cavity_simulation,
starting_epc_data=starting_epc_data,
ending_epc_data=ending_epc_data,
fixed_data=fixed_data,
)
simulations = [
[cavity_simulation],
[loft_simulation]
]
loft_scoring = create_recommendation_scoring_data(
property=p,
recommendation=loft_simulation,
starting_epc_data=starting_epc_data,
ending_epc_data=ending_epc_data,
fixed_data=fixed_data,
)
p.adjust_difference_record_with_recommendations(simulations)
# Make sure we definitely have the correct data
cavity_scoring = [x for x in p.recommendations_scoring_data if "cavity" in x["id"]][0]
loft_scoring = [x for x in p.recommendations_scoring_data if "loft" in x["id"]][0]
return [cavity_scoring, loft_scoring]

View file

@ -460,29 +460,155 @@ class DataLoader:
)
def get_epc_data(loader):
def get_epc_data(
loader, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds
):
if not loader.data:
raise ValueError("Data not found - please run loader.load() first")
property_type_lookup = {}
property_type_lookup = {
"ha_1": {
"built_form": {
'Mid Terrace': 'Mid-Terrace',
'Semi-Detached': 'Semi-Detached',
'End Terrace': 'End-Terrace',
'Detached': 'Detached',
'Enclosed Mid': 'Mid-Terrace',
'Detached Local Connect': 'Detached',
}
}
}
for ha_name, data_assets in loader.data.items():
# For each HA, we read pull in the data required, and store in S3
asset_list = data_assets["asset_list"]
asset_list = data_assets["asset_list"].copy()
# If the survey list is missing, it means we have no yet completed any surveys and therefore should only
# consider the most recent EPC
consider_penultimate_epc = data_assets["survey_list"] is None
# We iterate through the asset list and pull what we need
results = []
scoring_data = []
for index, property_meta in tqdm(asset_list.iterrows(), total=len(asset_list)):
if ha_name == "ha_1":
property_type = property_meta["Asset Type"]
# We correct a small error
if property_type == "a":
property_type = "House"
# Remap bedsits to flats
if property_type in ["Bedsit", "Room"]:
property_type = "Flat"
built_form = property_type_lookup[ha_name]["built_form"].get(property_meta["Property Type"], None)
else:
raise NotImplementedError("Implement me")
searcher = SearchEpc(
address1=property_meta["No."],
postcode=property_meta["Postcode"],
address1=property_meta["HouseNo"],
postcode=property_meta["matching_postcode"],
auth_token=EPC_AUTH_TOKEN,
os_api_key=None,
full_address=property_meta["Address"]
full_address=property_meta["matching_address"]
)
searcher.ordnance_survey_client.property_type = property_type_lookup[property_meta["Type"]]["property-type"]
searcher.ordnance_survey_client.built_form = property_type_lookup[property_meta["Type"]]["built-form"]
searcher.ordnance_survey_client.property_type = property_type
searcher.ordnance_survey_client.built_form = built_form
searcher.find_property(skip_os=True)
if searcher.newest_epc.get("estimated"):
# We insert the row ID as our proxy for UPRN
searcher.newest_epc["uprn"] = int(property_meta["asset_list_row_id"].split(ha_name)[1])
newest_epc = searcher.newest_epc
older_epcs = searcher.older_epcs
full_sap_epc = searcher.full_sap_epc
# If we have a survey list, we check the penultimate, because the property might have been installed
penultimate_epc = newest_epc
if consider_penultimate_epc:
# We also want to get the penultimate epc
penultimate_epc, _ = searcher.filter_newest_epc(older_epcs)
if not penultimate_epc:
penultimate_epc = newest_epc
eligibility = Eligibility(epc=newest_epc, cleaned=cleaned)
eligibility.check_gbis_warmfront()
eligibility.check_eco4_warmfront()
if (not eligibility.eco4_warmfront["eligible"]) and (
not eligibility.gbis_warmfront
) and consider_penultimate_epc:
# We check the penultimate epc
eligibility = Eligibility(epc=penultimate_epc, cleaned=cleaned)
eligibility.check_gbis_warmfront()
eligibility.check_eco4_warmfront()
# If this is the case, we need to update the older epcs
# We don't update just to make data cleaning easier
if penultimate_epc.get("estimated") is None:
older_epcs = [x for x in searcher.data["rows"] if x["lmk-key"] != penultimate_epc["lmk-key"]]
# If the property is a cavity wall and it's filled, we produce an estimate for the age of the cavity
# Loft MUST be suitable
cavity_age = None
if (
eligibility.walls["is_cavity_wall"] and
eligibility.walls["is_filled_cavity"] and
eligibility.loft["suitability"] and
eligibility.eco4_warmfront["message"] == "Failed due to full cavity - check cavity age"
):
# We check the age of the cavity and if it's particularly old, we flag it
cavity_age = calculate_cavity_age(newest_epc, older_epcs, cleaned)
# Full checks
eligibility.check_gbis()
eligibility.check_eco4()
if eligibility.eco4_warmfront["eligible"]:
if eligibility.epc["uprn"] == "":
eligibility.epc["uprn"] = int(property_meta["asset_list_row_id"].split(ha_name)[1])
scoring_dictionary = prepare_model_data_row(
property_id=property_meta["asset_list_row_id"],
modelling_epc=eligibility.epc,
cleaned=cleaned,
cleaning_data=cleaning_data,
created_at=created_at,
old_data=older_epcs,
full_sap_epc=full_sap_epc,
photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds
)
scoring_data.extend(scoring_dictionary)
results.append(
{
"row_id": property_meta["asset_list_row_id"],
"uprn": eligibility.epc["uprn"],
"property_type": eligibility.epc["property-type"],
"gbis_eligible": eligibility.gbis_warmfront,
"eco4_eligible": eligibility.eco4_warmfront["eligible"],
"eco4_message": eligibility.eco4_warmfront["message"],
"sap": float(eligibility.epc["current-energy-efficiency"]),
"gbis_eligible_future": eligibility.gbis["eligible"],
"gbis_eligible_future_message": eligibility.gbis["message"],
"eco4_eligible_future": eligibility.eco4["eligible"],
"eco4_eligible_future_message": eligibility.eco4["message"],
# Property components
"roof": eligibility.roof["clean_description"],
"walls": eligibility.walls["clean_description"],
"cavity_type": eligibility.cavity["type"],
"heating": eligibility.epc["mainheat-description"],
"tenure": eligibility.tenure,
"date_epc": eligibility.epc["lodgement-date"],
"loft_thickness": eligibility.roof["insulation_thickness"],
"cavity_age": cavity_age,
**eligibility.walls,
**eligibility.roof,
}
)
def app():
"""
@ -491,7 +617,7 @@ def app():
:return:
"""
use_cache = False
use_cache = True
files = {
"ha_1": {

View file

@ -361,7 +361,7 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
if self.prepared_epc["fixed-lighting-outlets-count"] == "":
if self.prepared_epc["fixed-lighting-outlets-count"] in ["", None] + list(DATA_ANOMALY_MATCHES):
# We check old EPCs and the full SAP EPC
@ -537,7 +537,7 @@ class EPCRecord:
else:
value = 0
else:
value = int(value)
value = int(float(value))
self.prepared_epc[attribute] = value
@ -583,9 +583,8 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['photo-supply'] = float(self.prepared_epc['photo-supply']) if self.prepared_epc[
'photo-supply'] != "" \
else None
self.prepared_epc['photo-supply'] = float(self.prepared_epc['photo-supply']) if (
self.prepared_epc['photo-supply'] not in [None, ""]) else None
def _clean_energy(self):
"""