From 736e02cb4a6ac25f20a0ad774c1f64779af26dc8 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 12 Dec 2023 17:37:47 +0000 Subject: [PATCH] completed ha15 --- etl/eligibility/Eligibility.py | 13 +- etl/eligibility/ha_15_32/app.py | 410 ++++++++++++++++++++++++++++++-- 2 files changed, 405 insertions(+), 18 deletions(-) diff --git a/etl/eligibility/Eligibility.py b/etl/eligibility/Eligibility.py index a24fd2d5..730ff6e1 100644 --- a/etl/eligibility/Eligibility.py +++ b/etl/eligibility/Eligibility.py @@ -46,10 +46,17 @@ class Eligibility: return remapped[0] if key == "walls-description": - return WallAttributes(self.epc["walls-description"]).process() + cleaner_cls = WallAttributes(self.epc["roof-description"]) - if key == "roof-description": - return RoofAttributes(self.epc["roof-description"]).process() + elif key == "roof-description": + cleaner_cls = RoofAttributes(self.epc["roof-description"]) + + else: + raise ValueError("Invalid key") + output = cleaner_cls.process() + output["clean_description"] = cleaner_cls.description.replace("(assumed)", "").rstrip().capitalize() + + return output def loft_insulation(self, loft_thickness_threshold: int = None): """ diff --git a/etl/eligibility/ha_15_32/app.py b/etl/eligibility/ha_15_32/app.py index dd27f7c1..574f926b 100644 --- a/etl/eligibility/ha_15_32/app.py +++ b/etl/eligibility/ha_15_32/app.py @@ -3,6 +3,7 @@ This process has been created to compare the model based eligibility process aga used by the Warmfront team, to identify which properties are eligible for ECO4 and GBIS funding. This work is being done in December 2023, prior to completion of acquisition """ +import pickle from pathlib import Path from tqdm import tqdm import pandas as pd @@ -18,6 +19,8 @@ from etl.eligibility.Eligibility import Eligibility from etl.epc.DataProcessor import DataProcessor from backend.app.utils import read_parquet_from_s3 from backend.app.plan.utils import create_recommendation_scoring_data +from etl.epc.settings import COLUMNS_TO_MERGE_ON +from backend.ml_models.api import ModelApi ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env" @@ -462,6 +465,9 @@ def get_ha_32data(ha_data, cleaned, cleaning_data, created_at): "warmfront_identified": house["identified"], "gbis_eligible": None, "eco4_eligible": None, + "sap": None, + "roof": None, + "walls": None, "date_epc": None, "message": "No EPC found", } @@ -481,13 +487,18 @@ def get_ha_32data(ha_data, cleaned, cleaning_data, created_at): eligibility.check_eco4() # If there is no eligibility, we need to check the penultimate epc - if (not eligibility.eco4["eligible"]) and (not eligibility.gbis): + # However, we only check the penultimate epc if the property is identified + # This is because if the property was identified, it's possible that the newest EPC is a post-retrofit + # EPC, which would mean that the penultimate EPC is the pre-retrofit EPC + # However, if the property HAS been identified, we don't want to check the penultimate EPC since + # The newest EPC will reflect the current state of the home and therefore we determine if there is a new + # opportunity for retrofit + if (not eligibility.eco4["eligible"]) and (not eligibility.gbis) and (house["identified"]): eligibility = Eligibility(epc=penultimate_epc, cleaned=cleaned) eligibility.check_gbis() eligibility.check_eco4() if eligibility.eco4["eligible"]: - # TODO: Check me scoring_dictionary = prepare_model_data_row( property_id=house["row_id"], modelling_epc=eligibility.epc, @@ -502,6 +513,9 @@ def get_ha_32data(ha_data, cleaned, cleaning_data, created_at): "warmfront_identified": house["identified"], "gbis_eligible": eligibility.gbis, "eco4_eligible": eligibility.eco4["eligible"], + "sap": float(eligibility.epc["current-energy-efficiency"]), + "roof": eligibility.roof["clean_description"], + "walls": eligibility.walls["clean_description"], "date_epc": eligibility.epc["lodgement-date"], "message": "eco4 conditional on post sap", } @@ -519,22 +533,235 @@ def get_ha_32data(ha_data, cleaned, cleaning_data, created_at): "warmfront_identified": house["identified"], "gbis_eligible": eligibility.gbis, "eco4_eligible": eligibility.eco4["eligible"], + "sap": float(eligibility.epc["current-energy-efficiency"]), + "roof": eligibility.roof["clean_description"], + "walls": eligibility.walls["clean_description"], "date_epc": eligibility.epc["lodgement-date"], "message": None } ) - logger.info("no_house_numbers") + return results, scoring_data, no_house_numbers - return results, scoring_data + +def get_ha_15data(ha_data, cleaned, cleaning_data, created_at): + house_number_key = None + address_key = "Address Line 1" + postcode_key = "Postcode" + house_name = None + house_type_key = "Property Type" + + house_type_lookup = { + "Bungalow": "Bungalow", + "Flat": "Flat", + 'House': "House", + 'Flat over garage': "Flat", + 'Maisonette': "Maisonette", + } + + scoring_data = [] + results = [] + no_house_numbers = [] + for _, house in tqdm(ha_data.iterrows(), total=len(ha_data)): + + # If we don't have a house number, we'll continue since we won't realistically be able to find + # an address + if house_number_key is not None: + if pd.isnull(house[house_number_key]): + no_house_numbers.append(house["row_id"]) + continue + + if house_name is not None: + if not pd.isnull(house[house_name]): + address1 = " ".join([house[house_name], house[house_number_key], house[address_key]]) + else: + address1 = " ".join([house[house_number_key], house[address_key]]) + else: + address1 = house[address_key] + + searcher = SearchEpc( + address1=address1, + postcode=house[postcode_key] + ) + + response = searcher.search() + if response["status"] == 204: + # If the property is identified, we should fix this + # if house["identified"]: + # raise NotImplementedError("Check if we have an epc") + results.append( + { + "row_id": house["row_id"], + "warmfront_identified": house["identified"], + "gbis_eligible": None, + "eco4_eligible": None, + "sap": None, + "roof": None, + "walls": None, + "date_epc": None, + "message": "No EPC found", + } + ) + continue + + newest_epc, older_epcs, _ = searcher.retrieve( + property_type=house_type_lookup.get(house[house_type_key], None) + ) + # We also want to get the penultimate epc + penultimate_epc, _ = searcher.filter_newest_epc(older_epcs) + if not penultimate_epc: + penultimate_epc = newest_epc + + eligibility = Eligibility(epc=newest_epc, cleaned=cleaned) + eligibility.check_gbis() + eligibility.check_eco4() + + # If there is no eligibility, we need to check the penultimate epc + # However, we only check the penultimate epc if the property is identified + # This is because if the property was identified, it's possible that the newest EPC is a post-retrofit + # EPC, which would mean that the penultimate EPC is the pre-retrofit EPC + # However, if the property HAS been identified, we don't want to check the penultimate EPC since + # The newest EPC will reflect the current state of the home and therefore we determine if there is a new + # opportunity for retrofit + if (not eligibility.eco4["eligible"]) and (not eligibility.gbis) and (house["identified"]): + eligibility = Eligibility(epc=penultimate_epc, cleaned=cleaned) + eligibility.check_gbis() + eligibility.check_eco4() + + if eligibility.eco4["eligible"]: + scoring_dictionary = prepare_model_data_row( + property_id=house["row_id"], + modelling_epc=eligibility.epc, + cleaned=cleaned, + cleaning_data=cleaning_data, + created_at=created_at + ) + scoring_data.append(scoring_dictionary) + results.append( + { + "row_id": house["row_id"], + "warmfront_identified": house["identified"], + "gbis_eligible": eligibility.gbis, + "eco4_eligible": eligibility.eco4["eligible"], + "sap": float(eligibility.epc["current-energy-efficiency"]), + "roof": eligibility.roof["clean_description"], + "walls": eligibility.walls["clean_description"], + "date_epc": eligibility.epc["lodgement-date"], + "message": "eco4 conditional on post sap", + } + ) + continue + + # if (house["identified"] and not eligibility.gbis) and ( + # house["identified"] and not eligibility.eco4["eligible"]): + # raise NotImplementedError("Investigate ms") + + # If nothing is eligible or gbis is eligible, then we make a record this + results.append( + { + "row_id": house["row_id"], + "warmfront_identified": house["identified"], + "gbis_eligible": eligibility.gbis, + "eco4_eligible": eligibility.eco4["eligible"], + "sap": float(eligibility.epc["current-energy-efficiency"]), + "roof": eligibility.roof["clean_description"], + "walls": eligibility.walls["clean_description"], + "date_epc": eligibility.epc["lodgement-date"], + "message": None + } + ) + + # with open("ha_15_outputs.pickle", "rb") as f: + # results_dict = pickle.load(f) + # results = results_dict["results"] + # scoring_data = results_dict["scoring_data"] + # no_house_numbers = results_dict["no_house_numbers"] + + scoring_df = pd.DataFrame(scoring_data) + # Implement the same process that is being used in the recommendation engine to cleaning scoring_df + + # Perform the same cleaning as in the model - first clean number of room variables though + scoring_df = DataProcessor.apply_averages_cleaning( + data_to_clean=scoring_df, + cleaning_data=cleaning_data, + cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'], + colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"], + ) + + scoring_df = DataProcessor.apply_averages_cleaning( + data_to_clean=scoring_df, + cleaning_data=cleaning_data, + cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"], + ).drop(columns=["LOCAL_AUTHORITY"]) + + scoring_df = DataProcessor.clean_missings_after_description_process( + scoring_df, + ignore_cols=[c for c in scoring_df.columns if ("thermal_transmittance" in c) or ( + "insulation_thickness" in c) or ("ENERGY_EFF" in c)] + ) + + scoring_df = DataProcessor.clean_efficiency_variables(scoring_df) + + model_api = ModelApi(portfolio_id="ha32-eligibility", timestamp=created_at) + all_predictions = model_api.predict_all( + df=scoring_df, + bucket="retrofit-data-dev", + prediction_buckets={ + "sap_change_predictions": "retrofit-sap-predictions-dev", + "heat_demand_predictions": "retrofit-heat-predictions-dev", + "carbon_change_predictions": "retrofit-carbon-predictions-dev" + } + ) + + # merge the predictions onto the scoring_df + predictions = all_predictions["sap_change_predictions"] + + results_df = pd.DataFrame(results) + + results_df = results_df.merge( + predictions[["predictions", "property_id"]].rename( + columns={"predictions": "post_install_sap", "property_id": "row_id"} + ), + how="left", + on="row_id" + ) + + # Our methodology for identifying properties is to use the post-install SAP score + # We produce the following classifications, which accomodate the fact that the model can be wrong + # 1) If the post-install SAP score is above 71, we say the property is eligible and we hve high confidence + # 2) If the post-install SAP score is above 69, we say that the property is eligible + # 3) If the post-install SAP score is above 67, we say that the property is eligible, but we are not confident + # 4) If the post-install SAP score is below 67, we say that the property is unlikely to be eligible + + eligibility_assessment = [] + for _, row in results_df[results_df["eco4_eligible"] == True].iterrows(): + + if row["post_install_sap"] >= 71: + eligibility_classification = "highest confidence" + elif row["post_install_sap"] >= 69: + eligibility_classification = "high confidence" + elif row["post_install_sap"] >= 67: + eligibility_classification = "medium confidence" + else: + eligibility_classification = "unlikely" + + eligibility_assessment.append( + { + "row_id": row["row_id"], + "eligibility_classification": eligibility_classification + } + ) + + eligibility_assessment = pd.DataFrame(eligibility_assessment) + + results_df = results_df.merge( + eligibility_assessment, how="left", on="row_id" + ) + + return results_df, scoring_df, no_house_numbers def analyse_ha_32_results(results, ha32, no_house_numbers): - results_df = pd.DataFrame(results) - import pickle - # with open("ha_32_results.pickle", "wb") as f: - # pickle.dump(results_df, f) - """ We want to know: 1) What proportion of identified properties we get correct @@ -544,6 +771,8 @@ def analyse_ha_32_results(results, ha32, no_house_numbers): For HA32, most of these (if not all) properties were identified under gbis """ + results_df = pd.DataFrame(results) + # What proportio warmfront_identified = results_df[ results_df["warmfront_identified"] @@ -552,9 +781,23 @@ def analyse_ha_32_results(results, ha32, no_house_numbers): success_rate = warmfront_identified["gbis_eligible"].sum() / warmfront_identified.shape[0] # For HA32, this is 89% - # missed = results_df[ - # results_df["warmfront_identified"] & (warmfront_identified["gbis_eligible"] != True) - # ] + missed = results_df[ + results_df["warmfront_identified"] & (warmfront_identified["gbis_eligible"] != True) + ] + + sap_too_high = missed[ + missed["sap"] >= 69 + ] + + sap_low_enough = missed[ + missed["sap"] < 69 + ] + + investigate_1 = ha32[ha32["row_id"].isin(sap_too_high["row_id"])][ + ["row_id", "Postcode", "Address", "Dwelling num", "Street"]] + + investigate_2 = ha32[ha32["row_id"].isin(sap_low_enough["row_id"])][ + ["row_id", "Postcode", "Address", "Dwelling num", "Street"]] # to_check = missed[pd.isnull(missed["message"])] @@ -605,7 +848,109 @@ def analyse_ha_32_results(results, ha32, no_house_numbers): if no_house_numbers_ha32: logger.error("We have some identified properties that have no house numbers - investigate me") - return success_rate, new_possibilities + new = { + "n_new_possibilities": new_possibilities.shape[0], + "new_possibilities_confidence": new_possibilities["high_confidence"].value_counts() + } + + return success_rate, new + + +def analyse_ha_15_results(results_df, ha15, no_house_numbers): + """ + We want to know: + 1) What proportion of identified properties we get correct + 2) If we miss identified properties, why + 3) Which properties do we identify that were not identified by warmfront. What is our confidence on these? + + For HA32, most of these (if not all) properties were identified under gbis + """ + + # What proportio + warmfront_identified = results_df[ + results_df["warmfront_identified"] + ] + + n_identified = (warmfront_identified["gbis_eligible"] | warmfront_identified["eco4_eligible"]).sum() + + success_rate = n_identified / warmfront_identified.shape[0] + + eco_identified_confidence = warmfront_identified[warmfront_identified["eco4_eligible"] == True][ + "eligibility_classification"].value_counts() + # For HA15 this is 50.3% + + # of the properties we identify, what is the mix of confidenc + + missed = results_df[ + results_df["warmfront_identified"] & ( + (warmfront_identified["gbis_eligible"] != True) & (warmfront_identified["eco4_eligible"] != True) + ) + ] + + missed_no_data = missed[missed["message"] == "No EPC found"].shape[0] + + sap_too_high = missed[ + missed["sap"] >= 69 + ] + + sap_low_enough = missed[ + missed["sap"] < 69 + ] + + sap_low_enough["walls"].value_counts() + + investigate_1 = ha15[ha15["row_id"].isin(sap_too_high["row_id"])][ + ["row_id", "Postcode", "Address Line 1", "Address Line 2", "Address Line 3"]] + + investigate_2 = ha15[ha15["row_id"].isin(sap_low_enough["row_id"])][ + ["row_id", "Postcode", "Address Line 1", "Address Line 2", "Address Line 3"]] + + missed["message"].value_counts() + + # We now look for properties that we identified, that were not identified by Warmfront + + new_possibilities = results_df[ + (~results_df["warmfront_identified"]) & + (results_df["gbis_eligible"] | results_df["eco4_eligible"]) + ].copy() + + # We deem that Any EPC that is produced in the last 3 years gives us good confidence for GBIS + cutoff_date = datetime.now() - timedelta(days=3 * 365) + + new_possibilities["high_confidence"] = pd.to_datetime(new_possibilities["date_epc"]) >= cutoff_date + + eco_new_possibilities = new_possibilities["eco4_eligible"].sum() + eco_new_possibilities_confidence = new_possibilities[ + new_possibilities["eco4_eligible"] + ]["eligibility_classification"].value_counts() + + gbis_new_possibilites = new_possibilities["gbis_eligible"].sum() + gbis_new_possibilites_confidence = new_possibilities[ + new_possibilities["gbis_eligible"] + ]["high_confidence"].value_counts() + + new = { + "new_possibilities": new_possibilities, + "eco_new_possibilities": eco_new_possibilities, + "eco_new_possibilities_confidence": eco_new_possibilities_confidence, + "gbis_new_possibilites": gbis_new_possibilites, + "gbis_new_possibilites_confidence": gbis_new_possibilites_confidence + } + + identified_results = { + "n_identified": n_identified, + "success_rate": success_rate, + "eco_identified_confidence": eco_identified_confidence + } + + missed_results = { + "n_missed": missed.shape[0], + "n_sap_too_high": sap_too_high.shape[0], + "n_sap_low_enough": sap_low_enough.shape[0], + "missed_no_data": missed_no_data + } + + return success_rate, new, identified_results, missed_results def app(): @@ -631,6 +976,41 @@ def app(): # We want to retrieve EPCs for every single property # NOTE: HA32 is MOSTLY cavity via GBIS - ha_data = ha32 - ha_32_results = get_ha_32data(ha_data, cleaned, cleaning_data, created_at) + ha32_results, ha32_scoring_data, ha32_no_house_numbers = get_ha_32data( + ha_data=ha32, + cleaned=cleaned, + cleaning_data=cleaning_data, + created_at=created_at + ) + + # with open("ha32.pickle", "wb") as f: + # pickle.dump( + # { + # "ha32_results": ha32_results, + # "ha32_scoring_data": ha32_scoring_data, + # "ha32_no_house_numbers": ha32_no_house_numbers + # }, + # f + # ) + + ha32_success_rate, ha32_new_possibilities = analyse_ha_32_results( + results=ha32_results, ha32=ha32, no_house_numbers=ha32_no_house_numbers + ) + + # HA 15 + ha15_results_df, ha15_scoring_df, ha15_no_house_numbers = get_ha_15data(ha15, cleaned, cleaning_data, created_at) + + # with open("ha15.pickle", "wb") as f: + # pickle.dump( + # { + # "ha15_results_df": ha15_results_df, + # "ha15_scoring_df": ha15_scoring_df, + # "ha15_no_house_numbers": ha15_no_house_numbers + # }, + # f + # ) + + ha15_success_rate, ha15_new, ha15_identified_results, ha15_missed_results = analyse_ha_15_results( + ha15_results_df, ha15, ha15_no_house_numbers + )