From 8b8e2bf902f8cc6c588eab8b64253580f3364694 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Fri, 1 Mar 2024 16:29:19 +0000 Subject: [PATCH] working on new forecast approach for warmfront remaining sales --- .../ha_15_32/ha_analysis_batch_3.py | 811 +++++++++++++++++- utils/s3.py | 2 +- 2 files changed, 768 insertions(+), 45 deletions(-) diff --git a/etl/eligibility/ha_15_32/ha_analysis_batch_3.py b/etl/eligibility/ha_15_32/ha_analysis_batch_3.py index 61c4a243..bb27029e 100644 --- a/etl/eligibility/ha_15_32/ha_analysis_batch_3.py +++ b/etl/eligibility/ha_15_32/ha_analysis_batch_3.py @@ -17,6 +17,7 @@ from etl.eligibility.ha_15_32.app import prepare_model_data_row from backend.ml_models.api import ModelApi from etl.solar.SolarPhotoSupply import SolarPhotoSupply from recommendations.recommendation_utils import calculate_cavity_age +from etl.epc.Record import EPCRecord EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env" @@ -181,25 +182,25 @@ class DataLoader: if ha_name in ["HA1", "HA6", "HA16", "HA24"]: asset_list["matching_address"] = asset_list[ self.COLUMN_CONFIG[ha_name]["address"] - ].str.lower().str.strip() + ].astype(str).str.lower().str.strip() asset_list["matching_postcode"] = asset_list[ self.COLUMN_CONFIG[ha_name]["postcode"] - ].str.lower().str.strip() + ].astype(str).str.lower().str.strip() elif ha_name == "HA7": # Create matching_address by concatenating Address 1, Address 2, Address 3, Address 4, Postcode - asset_list["matching_address"] = asset_list["Address"].str.lower().str.strip() + ", " + \ - asset_list["Address2"].str.lower().str.strip() + ", " + \ - asset_list["Address3"].str.lower().str.strip() + ", " + \ - asset_list["Postcode"].str.lower().str.strip() - asset_list["matching_postcode"] = asset_list["Postcode"].str.lower().str.strip() + asset_list["matching_address"] = asset_list["Address"].astype(str).str.lower().str.strip() + ", " + \ + asset_list["Address2"].astype(str).str.lower().str.strip() + ", " + \ + asset_list["Address3"].astype(str).str.lower().str.strip() + ", " + \ + asset_list["Postcode"].astype(str).str.lower().str.strip() + asset_list["matching_postcode"] = asset_list["Postcode"].astype(str).str.lower().str.strip() elif ha_name == "HA14": # Create matching_address by concatenating Address 1, Address 2, Address 3, Address 4, Postcode - asset_list["matching_address"] = asset_list["Address 1"].str.lower().str.strip() + ", " + \ - asset_list["Address 2"].str.lower().str.strip() + ", " + \ - asset_list["Address 3"].str.lower().str.strip() + ", " + \ - asset_list["Address 4"].str.lower().str.strip() + ", " + \ - asset_list["Postcode"].str.lower().str.strip() - asset_list["matching_postcode"] = asset_list["Postcode"].str.lower().str.strip() + asset_list["matching_address"] = asset_list["Address 1"].astype(str).str.lower().str.strip() + ", " + \ + asset_list["Address 2"].astype(str).str.lower().str.strip() + ", " + \ + asset_list["Address 3"].astype(str).str.lower().str.strip() + ", " + \ + asset_list["Address 4"].astype(str).str.lower().str.strip() + ", " + \ + asset_list["Postcode"].astype(str).str.lower().str.strip() + asset_list["matching_postcode"] = asset_list["Postcode"].astype(str).str.lower().str.strip() elif ha_name == "HA39": # Create matching_address by concatenating add_1, add_2, add_3, add_4, add_5, post_code @@ -209,7 +210,7 @@ class DataLoader: asset_list["add_4"].astype(str).str.lower().str.strip() + ", " + \ asset_list["add_5"].astype(str).str.lower().str.strip() + ", " + \ asset_list["post_code"].astype(str).str.lower().str.strip() - asset_list["matching_postcode"] = asset_list["post_code"].str.lower().str.strip() + asset_list["matching_postcode"] = asset_list["post_code"].astype(str).str.lower().str.strip() elif ha_name == "HA107": # Create matching_address by concatenating House No, Street, Town, District, Postcode asset_list["matching_address"] = asset_list["House No"].astype(str).str.lower().str.strip() + ", " + \ @@ -1098,8 +1099,8 @@ class DataLoader: self.december_figures = pd.read_csv(self.december_figures_filepath) # Remove the spaces in HA Name self.december_figures["HA Name"] = self.december_figures["HA Name"].str.replace(" ", "") - self.december_figures["ECO4"] = self.december_figures["ECO4"].astype("Int64") - self.december_figures["GBIS"] = self.december_figures["GBIS"].astype("Int64") + for col in ["ECO4", "GBIS", "ECO4 remaining", "GBIS remaining"]: + self.december_figures[col] = self.december_figures[col].astype("Int64") if self.use_cache: self.data = read_pickle_from_s3( @@ -1203,7 +1204,6 @@ class DataLoader: # Update the asset list with the categorisations and rename changes if asset_list.shape[0] != asset_list_starting_size: raise ValueError("The asset list has changed in size") - self.data[ha_name]["asset_list"] = asset_list # Report on sales sales_report = {} @@ -1259,7 +1259,31 @@ class DataLoader: ) # We get the sales - sales_report = survey_list["installation_status"].value_counts().to_dict() + sales_report = { + "ECO4 - surveys sold": survey_list.shape[0], + **survey_list["installation_status"].value_counts().to_dict() + } + + # We find some cases where properties have sold but are missing CIGA checks + survey_list_to_merge = survey_list[["asset_list_row_id"]].copy() + survey_list_to_merge["has_a_survey_record"] = True + survey_list_to_merge = survey_list_to_merge[~pd.isnull(survey_list_to_merge["asset_list_row_id"])] + + asset_list = asset_list.merge(survey_list_to_merge, how='left', on="asset_list_row_id") + asset_list["ECO Eligibility"] = np.where( + (asset_list["ECO Eligibility"] == "eco4 (subject to ciga)") & ( + asset_list["has_a_survey_record"] == True + ), + "eco4 - passed ciga", + asset_list["ECO Eligibility"] + ) + asset_list = asset_list.drop(columns=["has_a_survey_record"]) + + # Update the survey list with installation status + self.data[ha_name]["survey_list"] = survey_list + + # Insert updated asset list + self.data[ha_name]["asset_list"] = asset_list ha_facts_and_figures.append( { @@ -1687,7 +1711,21 @@ def analyse_ha_data(outputs, loader): :return: """ + eco4_rate = 1710 + gbis_rate = 600 + old_eco4_rate = 1456 + old_gbis_rate = 432 + + epc_c_threshold = 80 + scheme_map = { + "ECO4": "ECO4", + "AFFORDABLE WARMTH": "ECO4", + "ECO4 A/W": "ECO4", + "ECO4 GBIS (ECO+)": "GBIS" + } + ha_analysis_results = [] + total_revenue_results = [] for ha_name, datasets in outputs.items(): inputs = [x for k, x in loader.data.items() if k == ha_name][0] @@ -1702,6 +1740,88 @@ def analyse_ha_data(outputs, loader): left_on="asset_list_row_id" ) + analysis_data["is_remaining"] = True + + n_sold_eco4 = 0 + n_sold_gbis = 0 + if not inputs["survey_list"].empty: + # Merge on the survey list and signal everything that is remaining or not (i.e. anything that hasn't had + # a survey) + survey_list = inputs["survey_list"].copy() + + # TODO: TEMP + scheme_column = survey_list.columns[0] + # We clean up the survey list installation or cancelled + survey_list["installed_or_cancelled_clean"] = survey_list["INSTALLED OR CANCELLED"].str.lower() + # Remove all punctuation + survey_list["installed_or_cancelled_clean"] = survey_list["installed_or_cancelled_clean"].str.replace( + r'[^\w\s]', '', regex=True + ) + # Remove double spaces + survey_list["installed_or_cancelled_clean"] = survey_list["installed_or_cancelled_clean"].str.replace( + r'\s+', ' ', regex=True + ) + # Remove trailing spaces + survey_list["installed_or_cancelled_clean"] = survey_list["installed_or_cancelled_clean"].str.strip() + + # Remap the values in the scheme column + survey_list[scheme_column] = survey_list[scheme_column].replace(scheme_map) + + survey_list["installation_status"] = None + survey_list["installation_status"] = np.where( + survey_list["installed_or_cancelled_clean"].isin(["installed", "installed see notes"]), + "installed", + survey_list["installation_status"] + ) + survey_list["installation_status"] = np.where( + survey_list["installed_or_cancelled_clean"].isin(["cancelled"]), + "cancelled", + survey_list["installation_status"] + ) + # Find partial installations + survey_list["installation_status"] = np.where( + survey_list["installed_or_cancelled_clean"].str.contains("still to be installed"), + "partially installed", + survey_list["installation_status"] + ) + # Find partial cancellations + # TODO: We might have more indications of partial cancellations + survey_list["installation_status"] = np.where( + survey_list["installed_or_cancelled_clean"].isin(["loft cancelled"]), + "partially cancelled", + survey_list["installation_status"] + ) + + # Finally, for other cases, we set the status to "in progress" + survey_list["installation_status"] = survey_list["installation_status"].fillna("in progress") + + # We concatenate the scheme name with the installation status + survey_list["installation_status"] = ( + survey_list[scheme_column] + " - " + survey_list["installation_status"] + ) + + # TODO: END TEMP + + survey_list_to_merge = survey_list[["asset_list_row_id", scheme_column]].copy() + survey_list_to_merge["is_remaining"] = False + analysis_data = analysis_data.drop(columns="is_remaining").merge( + survey_list_to_merge, + how="left", on="asset_list_row_id" + ) + analysis_data["is_remaining"] = analysis_data["is_remaining"].fillna(True) + + n_sold_eco4 = survey_list_to_merge[survey_list_to_merge[scheme_column] == "ECO4"].shape[0] + n_sold_gbis = survey_list_to_merge[survey_list_to_merge[scheme_column] == "GBIS"].shape[0] + + # Take just remaining + analysis_data = analysis_data[analysis_data["is_remaining"]] + + # Also, if the HA has started selling, we remove any that are still subject to ciga + n_eco4_missed_subject_to_ciga = 0 + if not inputs["survey_list"].empty: + n_eco4_missed_subject_to_ciga = (analysis_data["ECO Eligibility"] == "eco4 (subject to ciga)").sum() + analysis_data = analysis_data[analysis_data["ECO Eligibility"] != "eco4 (subject to ciga)"] + ################################################################################################ # We take the properties that strictly qualified under eco ################################################################################################ @@ -1714,8 +1834,11 @@ def analyse_ha_data(outputs, loader): eco4_identified["identification_type"] ) + # For expansive, the property can be no higher than an EPC C eco4_identified["identification_type"] = np.where( - (eco4_identified["eco4_eligible"] == True) & (eco4_identified["eco4_strict"] == False), + (eco4_identified["eco4_eligible"] == True) & (eco4_identified["eco4_strict"] == False) & ( + eco4_identified["sap"] <= epc_c_threshold + ), "expansive", eco4_identified["identification_type"] ) @@ -1743,21 +1866,17 @@ def analyse_ha_data(outputs, loader): "Meets fabric, fails SAP check", "Meets cavity, loft borderline, meets sap", ] - ), + ) & (ciga_dependent_identified["sap"] <= epc_c_threshold), "strict", ciga_dependent_identified["identification_type"] ) ciga_dependent_identified["identification_type"] = np.where( - (ciga_dependent_identified["eco4_message"].isin(["All conditions fail", "failed fabric check"])) & - (ciga_dependent_identified["walls"].isin(["Cavity wall, filled cavity"])), - "expansive", - ciga_dependent_identified["identification_type"] - ) - - ciga_dependent_identified["identification_type"] = np.where( - (ciga_dependent_identified["eco4_message"].isin(["Meets just cavity"])) | ( + ((ciga_dependent_identified["eco4_message"].isin(["Meets just cavity"])) | ( ciga_dependent_identified["walls"].isin(["Cavity wall, filled cavity"]) + )) & ( + (ciga_dependent_identified["sap"] <= epc_c_threshold) & + pd.isnull(ciga_dependent_identified["identification_type"]) ), "expansive", ciga_dependent_identified["identification_type"] @@ -1775,7 +1894,9 @@ def analyse_ha_data(outputs, loader): ) gbis_identified["identification_type"] = np.where( - (gbis_identified["gbis_eligible"] == True) & (gbis_identified["sap"] >= 69), + (gbis_identified["gbis_eligible"] == True) & (gbis_identified["sap"] <= epc_c_threshold) & ( + pd.isnull(gbis_identified["identification_type"]) + ), "expansive", gbis_identified["identification_type"] ) @@ -1806,9 +1927,16 @@ def analyse_ha_data(outputs, loader): ] surplus_gbis = surplus_gbis[surplus_gbis["is_estimated"] == False] - # Output variables + # Output variables - the data was sent to us in December, but the remaining figures are + # what was in November + november_remaining = loader.december_figures[loader.december_figures["HA Name"] == ha_name] + # ECO4 - n_properties_in_asset_list = inputs["asset_list"].shape[0] + n_properties_remaining_in_asset_list = inputs["asset_list"].shape[0] + november_eco4_remaining = max(november_remaining["ECO4 remaining"].values[0], 0) + november_eco4_sold = november_remaining["No. of Tech surveys complete - Eco 4"].values[0] + eco4_sales_since_november = n_sold_eco4 - november_eco4_sold + n_warmfront_identified_eco4 = eco4_identified.shape[0] + ciga_dependent_identified.shape[0] eco4_of_which_identified_strict = ( eco4_identified[eco4_identified["identification_type"] == "strict"].shape[0] + @@ -1820,26 +1948,37 @@ def analyse_ha_data(outputs, loader): ) # GBIS n_warmfront_identified_gbis = gbis_identified.shape[0] + november_gbis_remaining = max(november_remaining["GBIS remaining"].values[0], 0) + november_gbis_sold = november_remaining["No. of Tech surveys complete - GBIS"].values[0] + gbis_sales_since_november = n_sold_gbis - november_gbis_sold gbis_of_which_identified_strict = gbis_identified[gbis_identified["identification_type"] == "strict"].shape[0] gbis_of_which_identified_expansive = \ gbis_identified[gbis_identified["identification_type"] == "expansive"].shape[0] to_append = { ("", "HA Name"): ha_name, - ("", "# Properties in asset list"): n_properties_in_asset_list, + ("", "# properties in asset list"): n_properties_remaining_in_asset_list, ############ # ECO4 ############ - ("ECO4", "# Properties identieid by Warmfront"): n_warmfront_identified_eco4, + ("ECO4", "# remaining November file"): november_eco4_remaining, + ("ECO4", "# sold in November file"): november_eco4_sold, + ("ECO4", "# sold (survey list)"): n_sold_eco4, + ("ECO4", "# that missed CIGA check"): n_eco4_missed_subject_to_ciga, + ("ECO4", "# Remaining properties (asset list)"): n_warmfront_identified_eco4, ("ECO4", "Of which identified by model - strict"): eco4_of_which_identified_strict, ("ECO4", "Of which identified by model - expansive"): eco4_of_which_identified_expansive, ("ECO4", "Of which identified by model - total"): ( - eco4_of_which_identified_strict + eco4_of_which_identified_expansive), + eco4_of_which_identified_strict + eco4_of_which_identified_expansive + ), ("ECO4", "Additional properties"): surplus_eco4.shape[0], ############ # GBIS ############ - ("GBIS", "# Properties identieid by Warmfront"): n_warmfront_identified_gbis, + ("GBIS", "# remaining November file"): november_gbis_remaining, + ("GBIS", "# sold in November file"): november_gbis_sold, + ("GBIS", "# sold (survey list)"): n_sold_gbis, + ("GBIS", "# Remaining properties (asset list)"): n_warmfront_identified_gbis, ("GBIS", "Of which identified by model - strict"): gbis_of_which_identified_strict, ("GBIS", "Of which identified by model - expansive"): gbis_of_which_identified_expansive, ("GBIS", "Of which identified by model - total"): ( @@ -1850,6 +1989,24 @@ def analyse_ha_data(outputs, loader): ha_analysis_results.append(to_append) + # Calculate the revenue results + to_append_revenue = { + ("", "HA Name"): ha_name, + # Eco4 revenue + ("ECO4", "£ remaining November file"): november_eco4_remaining * eco4_rate, + ("ECO4", "£ sold November file"): november_eco4_sold * old_eco4_rate, + ("ECO4", "£ sold since November"): eco4_sales_since_november * eco4_rate, + ("ECO4", "£ stuck at ciga check"): n_eco4_missed_subject_to_ciga * eco4_rate, + ("ECO4", "£ remaining (asset list)"): n_warmfront_identified_eco4 * eco4_rate, + ("ECO4", "Of which identified by model - strict"): eco4_of_which_identified_strict * eco4_rate, + ("ECO4", "Of which identified by model - expansive"): eco4_of_which_identified_expansive * eco4_rate, + ("ECO4", "Of which identified by model - total"): eco4_rate * ( + eco4_of_which_identified_strict + eco4_of_which_identified_expansive + ), + ("ECO4", "Additional properties"): eco4_rate * surplus_eco4.shape[0], + } + total_revenue_results.append(to_append_revenue) + ha_analysis_results = pd.DataFrame(ha_analysis_results) ha_analysis_results.columns = pd.MultiIndex.from_tuples(ha_analysis_results.columns) @@ -1862,8 +2019,8 @@ def analyse_ha_data(outputs, loader): facts_and_figures = facts_and_figures.rename( columns={ # ECO4 cols - "ECO4": "ECO4 - December", - "GBIS": "GBIS - December", + "ECO4": "ECO4 - November", + "GBIS": "GBIS - November", "eco4 (subject to ciga)": "ECO4 - subject to ciga", "eco4": "ECO4 - doesn't need CIGA", "eco4 - passed ciga": "ECO4 - passed CIGA", @@ -1880,19 +2037,27 @@ def analyse_ha_data(outputs, loader): # ECO4 - doesn't need CIGA + ECO4 - passed CIGA # 2) if ciga checks haven't been completed (i.e. ECO4 - passed ciga is missing), this sum is # ECO4 - doesn't need CIGA + ECO4 - subject to ciga - facts_and_figures["ECO4 total (asset list)"] = np.where( + facts_and_figures["ECO4 total (asset list - pre ciga)"] = ( + facts_and_figures["ECO4 - doesn't need CIGA"] + + facts_and_figures["ECO4 - subject to ciga"] + + facts_and_figures["ECO4 - passed CIGA"] + ) + + facts_and_figures["ECO4 total (asset list - post ciga)"] = None + facts_and_figures["ECO4 total (asset list - post ciga)"] = np.where( facts_and_figures["ECO4 - passed CIGA"] > 0, facts_and_figures["ECO4 - doesn't need CIGA"] + facts_and_figures["ECO4 - passed CIGA"], - facts_and_figures["ECO4 - doesn't need CIGA"] + facts_and_figures["ECO4 - subject to ciga"] + facts_and_figures["ECO4 total (asset list - post ciga)"] ) # Re-arrange the columns facts_and_figures = facts_and_figures[ [ 'HA Name', - 'ECO4 - December', - 'GBIS - December', - 'ECO4 total (asset list)', + 'ECO4 - November', + 'GBIS - November', + 'ECO4 total (asset list - pre ciga)', + 'ECO4 total (asset list - post ciga)', 'GBIS total (asset list)', 'ECO4 - subject to ciga', "ECO4 - doesn't need CIGA", @@ -1916,6 +2081,8 @@ def analyse_ha_data(outputs, loader): facts_and_figures["Missed CIGA checks opportunity"] ) + facts_and_figures.to_csv("Facts and figures sample.csv") + # Re arrage the columns # Also sort ha_analysis_results by ha number @@ -1937,6 +2104,333 @@ def analyse_ha_data(outputs, loader): for i, width in enumerate(get_col_widths(df)): writer.sheets[sheet].set_column(i, i, width) + # Inspection: - Looking into the proportion of homes with "cavity, as built, insulated (assumed)" as their + # description, and what proportion of time they get identified via non-invasive surveys + + # true_eco4_assets = [] + # ciga_dependent_assets = [] + # not_eligible = [] + # as_built_insulated = [] + # date_cols = { + # "HA39": "date_built", + # "HA14": "Built In Year", + # "HA6": "Construction Year", + # "HA1": "Build Date", + # "HA107": "YEAR BUILT" + # } + # for ha_name, data_objects in outputs.items(): + # inputs = [x for k, x in loader.data.items() if k == ha_name][0] + # + # date_col = date_cols[ha_name] + # results_df = data_objects["results_df"].copy() + # df = inputs["asset_list"][['asset_list_row_id', "ECO Eligibility", date_col]].rename( + # columns={"row_meaning": "asset_identification_status", date_col: "date_built"} + # ).merge( + # results_df, + # how="left", + # right_on="row_id", + # left_on="asset_list_row_id" + # ) + # + # # take the true ECO4 + # true_eco4 = df[df["ECO Eligibility"] == "eco4"].copy() + # ciga_dependent = df[ + # df["ECO Eligibility"].isin( + # [ + # "eco4 (subject to ciga)", + # "failed ciga", + # "eco4 - passed ciga" + # ] + # ) + # ] + # insulated_assumed = df[df["walls"] == "Cavity wall, as built, insulated"].copy() + # # We convert date built to datetime + # try: + # insulated_assumed = insulated_assumed[~pd.isnull(insulated_assumed["date_built"])] + # insulated_assumed["year_built"] = pd.to_datetime(insulated_assumed["date_built"].astype(str)).dt.year + # as_built_insulated.append(insulated_assumed) + # except Exception as e: + # print("oh well") + # + # true_eco4_assets.append(true_eco4) + # ciga_dependent_assets.append(ciga_dependent) + # + # true_eco4_assets = pd.concat(true_eco4_assets) + # ciga_dependent_assets = pd.concat(ciga_dependent_assets) + # as_built_insulated = pd.concat(as_built_insulated) + # + # true_eco4_assets["walls"].value_counts(normalize=True) + # ciga_dependent_assets["walls"].value_counts(normalize=True) + # + # from recommendations.recommendation_utils import extract_insulation_thickness + # + # true_eco4_assets["roof_insulation_thickness"] = true_eco4_assets["roof"].apply( + # lambda x: extract_insulation_thickness(x) + # ) + # + # true_eco4_assets["e"] = true_eco4_assets.merge( + # pd.DataFrame(cleaned["roof-description"])[["original_description", "insulation_thickness"]], + # how="left", + # left_on="roof", + # right_on="original_description" + # ) + # + # true_eco4_assets["sap"].mean() + # + # true_eco4_assets["insulation_thickness"].isin( + # ["250", "150", "200", "100", "75", "50"] + # ).sum() / true_eco4_assets.shape[0] + # + # true_eco4_assets["insulation_thickness"].isin( + # ["100"] + # ).sum() / true_eco4_assets.shape[0] + # + # as_built_insulated.groupby("property_type")["ECO Eligibility"].value_counts(normalize=True) + + +def get_propensity_model_data( + loader, cleaned, cleaning_data, created_at, photo_supply_lookup, + floor_area_decile_thresholds, pull_data=True +): + # TODO: Set a seed! + model_data = [] + for ha_name, data_assets in loader.data.items(): + + logger.info("Processing HA: %s", ha_name) + if data_assets["survey_list"].empty: + continue + + number_sold = data_assets["survey_list"].shape[0] + + # For each HA, we read pull in the data required, and store in S3 + asset_list = data_assets["asset_list"].copy() + # We determine the number of properties that we should select that are eligible + asset_list_size = asset_list.shape[0] + # Number eligible + n_eligibile = asset_list[asset_list["ECO Eligibility"] != "not eligible"].shape[0] + success_rate = n_eligibile / asset_list_size + needed_sample_size = np.ceil(number_sold / success_rate) + number_negative_samples = int(needed_sample_size - number_sold) + + sold_asset_list_ids = data_assets["survey_list"]["asset_list_row_id"].tolist() + negative_sample_asset_list_ids = asset_list["asset_list_row_id"].sample(number_negative_samples).tolist() + sample_ids = sold_asset_list_ids + negative_sample_asset_list_ids + + sample_asset_list = asset_list[asset_list["asset_list_row_id"].isin(sample_ids)] + + # In order to have the most confidence, we should take just properties that have 1 EPC. We might need to + # cut down the number of properties that we include because of this + # Note: This is an imbalanced problem so we will need to build a model accomadating of that + + data = [] + errors = [] + for index, property_meta in tqdm(sample_asset_list.iterrows(), total=len(sample_asset_list)): + + if property_meta["matching_postcode"] is None: + continue + + property_type, built_form = get_property_type_and_built_form( + property_meta=property_meta, ha_name=ha_name + ) + + searcher = SearchEpc( + address1=str(property_meta["HouseNo"]), + postcode=property_meta["matching_postcode"], + auth_token=EPC_AUTH_TOKEN, + os_api_key="", + full_address=property_meta["matching_address"] + ) + searcher.ordnance_survey_client.property_type = property_type + searcher.ordnance_survey_client.built_form = built_form + searcher.find_property(skip_os=True) + + if searcher.newest_epc is None: + continue + + if searcher.newest_epc.get("estimated"): + # We insert the row ID as our proxy for UPRN + searcher.newest_epc["uprn"] = int(property_meta["asset_list_row_id"].split(ha_name)[1]) + + newest_epc = searcher.newest_epc + older_epcs = searcher.older_epcs + full_sap_epc = searcher.full_sap_epc + + # If we have more than 1 EPC for the moment we just continue + if older_epcs or full_sap_epc: + continue + try: + + # We clean up the data + epc_records = { + 'original_epc': newest_epc.copy(), + 'full_sap_epc': full_sap_epc.copy(), + 'old_data': older_epcs.copy(), + } + + epc_record = EPCRecord( + epc_records=epc_records, + run_mode="newdata", + cleaning_data=cleaning_data + ) + + # If we have some data, continue + data.append( + { + "ECO Eligibility": property_meta["ECO Eligibility"], + "asset_list_row_id": property_meta["asset_list_row_id"], + **epc_record.get("prepared_epc") + } + ) + except Exception as e: + errors.append( + { + "error": str(e), + "asset_list_row_id": property_meta["asset_list_row_id"], + "matching_postcode": property_meta["matching_postcode"], + "matching_address": property_meta["matching_address"] + } + ) + + data = pd.DataFrame(data) + # We store the results in S3 as a pickle + save_pickle_to_s3( + data=data, + bucket_name="retrofit-datalake-dev", + s3_file_name=f"propensity_model_data/{ha_name}/train.pickle" + ) + + # Store the errors + if errors: + save_pickle_to_s3( + data=errors, + bucket_name="retrofit-datalake-dev", + s3_file_name=f"propensity_model_data/{ha_name}/errors.pickle" + ) + + model_data.append(data) + + return model_data + + +def conversion_model(loader): + # Read in the model data + + model_data = [] + for ha_name in loader.data.keys(): + try: + picked = read_pickle_from_s3( + bucket_name="retrofit-datalake-dev", + s3_file_name=f"propensity_model_data/{ha_name}/train.pickle" + ) + data = pd.DataFrame(picked) + + # We merge on the sales data + sales_data = loader.data[ha_name]["survey_list"].copy() + data = data.merge( + sales_data[["asset_list_row_id", "installation_status"]], + how="left", + on="asset_list_row_id" + ) + data["ha_name"] = ha_name + + except Exception as e: + logger.error("Error reading in the data for %s", ha_name) + continue + + model_data.append(data) + + model_data = pd.concat(model_data) + + model_data["response"] = model_data["installation_status"].isin( + [ + "ECO4 - in progress", + "ECO4 - installed" + ] + ).astype(int) + + # Because of how we pulled the data, we need to re-balance the sample + ha_names = model_data["ha_name"].unique() + + balanced_sample = [] + for ha_name in ha_names: + df = model_data[model_data["ha_name"] == ha_name] + positive_samples = df[df["response"] == 1] + negative_samples = df[df["response"] != 1] + + inputs = [x for k, x in loader.data.items() if k == ha_name][0] + asset_list = inputs["asset_list"].copy() + asset_list_size = asset_list.shape[0] + n_eligibile = asset_list[asset_list["ECO Eligibility"] != "not eligible"].shape[0] + success_rate = n_eligibile / asset_list_size + needed_sample_size = np.ceil(positive_samples.shape[0] / success_rate) + number_negative_samples = int(needed_sample_size - positive_samples.shape[0]) + negative_samples_subset = negative_samples.sample(number_negative_samples) + + output = pd.concat([positive_samples, negative_samples_subset]) + + balanced_sample.append(output) + + balanced_sample = pd.concat(balanced_sample) + + # We work with a small sample + # Drop the ECO Eligibility column and installation_status column + # We keep the ID column + balanced_sample = balanced_sample.drop( + columns=['ECO Eligibility', 'asset_list_row_id', 'address', 'uprn_source', 'address3', 'local_authority_label', + 'county', 'postcode', 'constituency', 'local_authority', 'inspection_date', 'address1', + 'constituency_label', 'building_reference_number', 'address2', 'posttown', 'lodgement_datetime', + 'uprn', 'lodgement_date', 'lmk_key', 'installation_status', 'ha_name'] + ) + + # POC model + df = balanced_sample.copy() + # FIll missings with means, if they exist + numeric_cols = df.select_dtypes(include=['float64', 'int64']).columns + df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].mean()) + + categorical_cols = df.select_dtypes(include=['object', 'category']).columns + df[categorical_cols] = df[categorical_cols].fillna("other") + + # Reduce the number of categories to a specific number and the rest to other + max_n_categories = 10 + for col in categorical_cols: + top_categories = df[col].value_counts().nlargest(max_n_categories).index + df[col] = df[col].where(df[col].isin(top_categories), other="other") + + # Use a model based approach to feature selection + import xgboost as xgb + from sklearn.model_selection import train_test_split + + # Assuming your outcome column is named 'target' + X = df.drop(columns=['response']) + y = df['response'] + df["low_energy_fixed_light_count"].va + + # Encoding categorical variables if not already done + X = pd.get_dummies(X, drop_first=True) + + # Splitting the data into train and test sets + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) + + # Initialize an XGBoost classifier + model = xgb.XGBClassifier() + + # Fit the model + model.fit(X_train, y_train) + + # Get feature importances + feature_importances = model.feature_importances_ + + # Map feature importances to their corresponding column names + feature_importance_dict = {feature: importance for feature, importance in zip(X.columns, feature_importances)} + + # Sort features by importance + sorted_features = sorted(feature_importance_dict.items(), key=lambda item: item[1], reverse=True) + + # Display sorted features + for feature, importance in sorted_features: + print(f"{feature}: {importance}") + def patch_cleaned(cleaned): # Patch to handle the a missing description @@ -2054,6 +2548,218 @@ def patch_cleaned(cleaned): return cleaned +def forecast_remaining_sales(loader): + # Assumptions: + # We cap the ciga conversion rate at 75% because I expect future HAs to have a lower CIGA conversion rate + # and I don't want the numbers to change too much, depenent on the CIGA conversation rate + maximum_ciga_conversion = 0.75 + + gbis_rate = 600 + eco4_rate = 1710 + old_gbis_rate = 432 + old_eco4_rate = 1456 + + # 1) Calculate the conversion rate from passed CIGA to actual sale + converted_ciga_jobs = [] + for ha_name, input_data in loader.data.items(): + asset_list = input_data["asset_list"].copy() + survey_list = input_data["survey_list"].copy() + + if survey_list.empty: + continue + + ciga_dependent_assets = asset_list[ + asset_list["ECO Eligibility"] == "eco4 - passed ciga" + ] + + # These are now the ciga dependent assets at installation + ciga_dependent_assets_at_installation = ciga_dependent_assets.merge( + survey_list[["asset_list_row_id", "installation_status"]], + how="inner", + on="asset_list_row_id" + ) + + # We then calculate how many get cancelled + ciga_dependent_assets_sold = ciga_dependent_assets_at_installation[ + ciga_dependent_assets_at_installation["installation_status"].isin( + [ + "ECO4 - installed", "ECO4 - in progress" + ] + ) + ] + + ciga_dependent_assets_failed = ciga_dependent_assets_at_installation[ + ~ciga_dependent_assets_at_installation["installation_status"].isin( + [ + "ECO4 - installed", "ECO4 - in progress" + ] + ) + ] + + converted_ciga_jobs.append( + { + "HA Name": ha_name, + "# Ciga dependent at installation": ciga_dependent_assets_at_installation.shape[0], + "# Ciga dependent successfully installed": ciga_dependent_assets_sold.shape[0], + "# Ciga dependent failed install": ciga_dependent_assets_failed.shape[0] + } + ) + + converted_ciga_jobs = pd.DataFrame(converted_ciga_jobs) + + # We calculate a ciga pass to install conversaion rate + median_ciga_pass_to_install = ( + converted_ciga_jobs["# Ciga dependent successfully installed"].sum() / + converted_ciga_jobs["# Ciga dependent at installation"].sum() + ) + + # 2) Calculate the conversion rate from CIGA dependent ciga passed + ciga_passrates = [] + for ha_name, input_data in loader.data.items(): + + # If we don't have a ciga list, we can't do anything + if input_data["ciga_list"].empty: + continue + + # 1) Calculate the conversion rate for CIGA to actual sale + asset_list = input_data["asset_list"].copy() + + ciga_completed_assets = asset_list[ + asset_list["ECO Eligibility"].isin( + [ + "eco4 - passed ciga", + "failed ciga" + ] + ) + ] + + ciga_passed = ciga_completed_assets[ + ciga_completed_assets["ECO Eligibility"].isin( + [ + "eco4 - passed ciga" + ] + ) + ] + + ciga_passrates.append( + { + "Ha Name": ha_name, + "# CIGA dependent": ciga_completed_assets.shape[0], + "# CIGA passed": ciga_passed.shape[0], + } + ) + + ciga_passrates = pd.DataFrame(ciga_passrates) + + median_ciga_pass_to_install = ciga_passrates["# CIGA passed"].sum() / ciga_passrates["# CIGA dependent"].sum() + + # 3) Calculate the conversion rate of an ECO4 and a GBISjob, that doesn't need ciga, to install + eco4_ciga_independent_passrates = [] + gbis_ciga_independent_passrates = [] + for ha_name, input_data in loader.data.items(): + asset_list = input_data["asset_list"].copy() + survey_list = input_data["survey_list"].copy() + + if survey_list.empty: + continue + + # For properties that were identified as a typical ECO4 job, we calculate the number of properties that + # installed + # vs cancelled + + typical_eco4 = asset_list[asset_list["ECO Eligibility"] == "eco4"] + typical_gbis = asset_list[asset_list["ECO Eligibility"] == "gbis"] + + # Merge on the surveys + typical_eco4_installed = typical_eco4.merge( + survey_list[["asset_list_row_id", "installation_status"]], how="inner", on="asset_list_row_id" + ) + + if not typical_eco4_installed.empty: + typical_eco4_sold = typical_eco4_installed[ + typical_eco4_installed["installation_status"].isin( + [ + "ECO4 - installed", "ECO4 - in progress" + ] + ) + ] + + eco4_ciga_independent_passrates.append( + { + "Ha Name": ha_name, + "# ECO4 at install stage": typical_eco4_installed.shape[0], + "# ECO4 successfully installed": typical_eco4_sold.shape[0] + } + ) + + typical_gbis_installed = typical_gbis.merge( + survey_list[["asset_list_row_id", "installation_status"]], how="inner", on="asset_list_row_id" + ) + if not typical_gbis_installed.empty: + typical_gbis_sold = typical_gbis_installed[ + typical_gbis_installed["installation_status"].isin( + [ + "GBIS - in progress", "GBIS - installed" + ] + ) + ] + + gbis_ciga_independent_passrates.append( + { + "Ha Name": ha_name, + "# GBIS at install stage": typical_gbis_installed.shape[0], + "# GBIS successfully installed": typical_gbis_sold.shape[0] + } + ) + + eco4_ciga_independent_passrates = pd.DataFrame(eco4_ciga_independent_passrates) + gbis_ciga_independent_passrates = pd.DataFrame(gbis_ciga_independent_passrates) + + median_eco4_to_install = ( + eco4_ciga_independent_passrates["# ECO4 successfully installed"].sum() / + eco4_ciga_independent_passrates["# ECO4 at install stage"].sum() + ) + + median_gbis_to_install = ( + gbis_ciga_independent_passrates["# GBIS successfully installed"].sum() / + gbis_ciga_independent_passrates["# GBIS at install stage"].sum() + ) + + # Produce the final output + december_figures = loader.december_figures.copy() + december_figures = december_figures.fillna(0) + results = [] + for ha_name, input_data in loader.data.items(): + # Original warmfront figures + original_warmfront_estimates = december_figures[december_figures["HA Name"] == ha_name] + + original_warmfront_eco4 = original_warmfront_estimates["ECO4"].values[0] + original_warmfront_remaining_eco4 = original_warmfront_estimates["ECO4 remaining"].values[0] + original_warmfront_gbis = original_warmfront_estimates["GBIS"].values[0] + original_warmfront_remaining_gbis = original_warmfront_estimates["GBIS remaining"].values[0] + + original_warmfront_eco4_revenue = ( + original_warmfront_remaining_eco4 * eco4_rate + + (original_warmfront_eco4 - original_warmfront_remaining_eco4) * old_eco4_rate + ) + original_warmfront_remaining_eco4_revenue = original_warmfront_remaining_eco4 * eco4_rate + + original_warmfront_gbis_revenue = ( + original_warmfront_remaining_gbis * gbis_rate + + (original_warmfront_gbis - original_warmfront_remaining_gbis) * old_gbis_rate + ) + + results.append( + { + ("", "", "HA Name"): ha_name, + ("Original Warmfront estimate", "Total - #", "ECO4 - November"): original_warmfront_eco4, + ("", "Remaining - #", ""): original_warmfront_remaining_eco4, + ("", "Total - £", ""): original_warmfront_eco4_revenue, + ("", "Remaining - £", ""): original_warmfront_remaining_eco4_revenue, + } + ) + + def app(): """ This app contains the housin association analysis for HAs 1, 6, 14, 39 and 107. @@ -2067,11 +2773,14 @@ def app(): pull_data = False # List all of the data in the folder - directories = [str(list(entry.iterdir())[0]) for entry in DATA_FOLDER.iterdir() if entry.is_dir()] + + directories = [str(file) for entry in DATA_FOLDER.iterdir() if entry.is_dir() + for file in entry.iterdir() if file.suffix == '.xlsx'] # Grab the December HA figures filepath december_figures_filepath = "local_data/ha_data/HA_December_figures.csv" - priority_has = ["HA1", "HA6", "HA7", "HA14", "HA16", "HA24", "HA39", "HA107"] + # priority_has = ["HA1", "HA6", "HA7", "HA14", "HA16", "HA24", "HA39", "HA107"] + priority_has = ["HA1", "HA6", "HA7", "HA14", "HA16", "HA39", "HA107"] # Filter down the directories to only the priority HAs directories = [d for d in directories if d.split("/")[2] in priority_has] @@ -2103,3 +2812,17 @@ def app(): floor_area_decile_thresholds=floor_area_decile_thresholds, pull_data=pull_data ) + + analyse_ha_data(outputs, loader) + + # import pickle + # with open("ha_analysis.pickle", "wb") as f: + # pickle.dump({"outputs": outputs, "loader": loader}, f) + + # To read: + # import pickle + # with open("ha_analysis.pickle", "rb") as f: + # outputs = pickle.load(f)["outputs"] + # + # with open("loader.pickle", "rb") as f: + # loader = pickle.load(f) diff --git a/utils/s3.py b/utils/s3.py index cb55094a..8d36bdb3 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -184,7 +184,7 @@ def read_pickle_from_s3(bucket_name, s3_file_name): logger.errpr("Incomplete credentials provided.") return None except Exception as e: - logger.errpr(f'Failed to download data from {bucket_name}/{s3_file_name}: {str(e)}') + logger.error(f'Failed to download data from {bucket_name}/{s3_file_name}: {str(e)}') return None # Deserialize data from pickle format