working on new forecast approach for warmfront remaining sales

This commit is contained in:
Khalim Conn-Kowlessar 2024-03-01 16:29:19 +00:00
parent 6ae21bbcb0
commit 8b8e2bf902
2 changed files with 768 additions and 45 deletions

View file

@ -17,6 +17,7 @@ from etl.eligibility.ha_15_32.app import prepare_model_data_row
from backend.ml_models.api import ModelApi
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from recommendations.recommendation_utils import calculate_cavity_age
from etl.epc.Record import EPCRecord
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env"
@ -181,25 +182,25 @@ class DataLoader:
if ha_name in ["HA1", "HA6", "HA16", "HA24"]:
asset_list["matching_address"] = asset_list[
self.COLUMN_CONFIG[ha_name]["address"]
].str.lower().str.strip()
].astype(str).str.lower().str.strip()
asset_list["matching_postcode"] = asset_list[
self.COLUMN_CONFIG[ha_name]["postcode"]
].str.lower().str.strip()
].astype(str).str.lower().str.strip()
elif ha_name == "HA7":
# Create matching_address by concatenating Address 1, Address 2, Address 3, Address 4, Postcode
asset_list["matching_address"] = asset_list["Address"].str.lower().str.strip() + ", " + \
asset_list["Address2"].str.lower().str.strip() + ", " + \
asset_list["Address3"].str.lower().str.strip() + ", " + \
asset_list["Postcode"].str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["Postcode"].str.lower().str.strip()
asset_list["matching_address"] = asset_list["Address"].astype(str).str.lower().str.strip() + ", " + \
asset_list["Address2"].astype(str).str.lower().str.strip() + ", " + \
asset_list["Address3"].astype(str).str.lower().str.strip() + ", " + \
asset_list["Postcode"].astype(str).str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["Postcode"].astype(str).str.lower().str.strip()
elif ha_name == "HA14":
# Create matching_address by concatenating Address 1, Address 2, Address 3, Address 4, Postcode
asset_list["matching_address"] = asset_list["Address 1"].str.lower().str.strip() + ", " + \
asset_list["Address 2"].str.lower().str.strip() + ", " + \
asset_list["Address 3"].str.lower().str.strip() + ", " + \
asset_list["Address 4"].str.lower().str.strip() + ", " + \
asset_list["Postcode"].str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["Postcode"].str.lower().str.strip()
asset_list["matching_address"] = asset_list["Address 1"].astype(str).str.lower().str.strip() + ", " + \
asset_list["Address 2"].astype(str).str.lower().str.strip() + ", " + \
asset_list["Address 3"].astype(str).str.lower().str.strip() + ", " + \
asset_list["Address 4"].astype(str).str.lower().str.strip() + ", " + \
asset_list["Postcode"].astype(str).str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["Postcode"].astype(str).str.lower().str.strip()
elif ha_name == "HA39":
# Create matching_address by concatenating add_1, add_2, add_3, add_4, add_5, post_code
@ -209,7 +210,7 @@ class DataLoader:
asset_list["add_4"].astype(str).str.lower().str.strip() + ", " + \
asset_list["add_5"].astype(str).str.lower().str.strip() + ", " + \
asset_list["post_code"].astype(str).str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["post_code"].str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["post_code"].astype(str).str.lower().str.strip()
elif ha_name == "HA107":
# Create matching_address by concatenating House No, Street, Town, District, Postcode
asset_list["matching_address"] = asset_list["House No"].astype(str).str.lower().str.strip() + ", " + \
@ -1098,8 +1099,8 @@ class DataLoader:
self.december_figures = pd.read_csv(self.december_figures_filepath)
# Remove the spaces in HA Name
self.december_figures["HA Name"] = self.december_figures["HA Name"].str.replace(" ", "")
self.december_figures["ECO4"] = self.december_figures["ECO4"].astype("Int64")
self.december_figures["GBIS"] = self.december_figures["GBIS"].astype("Int64")
for col in ["ECO4", "GBIS", "ECO4 remaining", "GBIS remaining"]:
self.december_figures[col] = self.december_figures[col].astype("Int64")
if self.use_cache:
self.data = read_pickle_from_s3(
@ -1203,7 +1204,6 @@ class DataLoader:
# Update the asset list with the categorisations and rename changes
if asset_list.shape[0] != asset_list_starting_size:
raise ValueError("The asset list has changed in size")
self.data[ha_name]["asset_list"] = asset_list
# Report on sales
sales_report = {}
@ -1259,7 +1259,31 @@ class DataLoader:
)
# We get the sales
sales_report = survey_list["installation_status"].value_counts().to_dict()
sales_report = {
"ECO4 - surveys sold": survey_list.shape[0],
**survey_list["installation_status"].value_counts().to_dict()
}
# We find some cases where properties have sold but are missing CIGA checks
survey_list_to_merge = survey_list[["asset_list_row_id"]].copy()
survey_list_to_merge["has_a_survey_record"] = True
survey_list_to_merge = survey_list_to_merge[~pd.isnull(survey_list_to_merge["asset_list_row_id"])]
asset_list = asset_list.merge(survey_list_to_merge, how='left', on="asset_list_row_id")
asset_list["ECO Eligibility"] = np.where(
(asset_list["ECO Eligibility"] == "eco4 (subject to ciga)") & (
asset_list["has_a_survey_record"] == True
),
"eco4 - passed ciga",
asset_list["ECO Eligibility"]
)
asset_list = asset_list.drop(columns=["has_a_survey_record"])
# Update the survey list with installation status
self.data[ha_name]["survey_list"] = survey_list
# Insert updated asset list
self.data[ha_name]["asset_list"] = asset_list
ha_facts_and_figures.append(
{
@ -1687,7 +1711,21 @@ def analyse_ha_data(outputs, loader):
:return:
"""
eco4_rate = 1710
gbis_rate = 600
old_eco4_rate = 1456
old_gbis_rate = 432
epc_c_threshold = 80
scheme_map = {
"ECO4": "ECO4",
"AFFORDABLE WARMTH": "ECO4",
"ECO4 A/W": "ECO4",
"ECO4 GBIS (ECO+)": "GBIS"
}
ha_analysis_results = []
total_revenue_results = []
for ha_name, datasets in outputs.items():
inputs = [x for k, x in loader.data.items() if k == ha_name][0]
@ -1702,6 +1740,88 @@ def analyse_ha_data(outputs, loader):
left_on="asset_list_row_id"
)
analysis_data["is_remaining"] = True
n_sold_eco4 = 0
n_sold_gbis = 0
if not inputs["survey_list"].empty:
# Merge on the survey list and signal everything that is remaining or not (i.e. anything that hasn't had
# a survey)
survey_list = inputs["survey_list"].copy()
# TODO: TEMP
scheme_column = survey_list.columns[0]
# We clean up the survey list installation or cancelled
survey_list["installed_or_cancelled_clean"] = survey_list["INSTALLED OR CANCELLED"].str.lower()
# Remove all punctuation
survey_list["installed_or_cancelled_clean"] = survey_list["installed_or_cancelled_clean"].str.replace(
r'[^\w\s]', '', regex=True
)
# Remove double spaces
survey_list["installed_or_cancelled_clean"] = survey_list["installed_or_cancelled_clean"].str.replace(
r'\s+', ' ', regex=True
)
# Remove trailing spaces
survey_list["installed_or_cancelled_clean"] = survey_list["installed_or_cancelled_clean"].str.strip()
# Remap the values in the scheme column
survey_list[scheme_column] = survey_list[scheme_column].replace(scheme_map)
survey_list["installation_status"] = None
survey_list["installation_status"] = np.where(
survey_list["installed_or_cancelled_clean"].isin(["installed", "installed see notes"]),
"installed",
survey_list["installation_status"]
)
survey_list["installation_status"] = np.where(
survey_list["installed_or_cancelled_clean"].isin(["cancelled"]),
"cancelled",
survey_list["installation_status"]
)
# Find partial installations
survey_list["installation_status"] = np.where(
survey_list["installed_or_cancelled_clean"].str.contains("still to be installed"),
"partially installed",
survey_list["installation_status"]
)
# Find partial cancellations
# TODO: We might have more indications of partial cancellations
survey_list["installation_status"] = np.where(
survey_list["installed_or_cancelled_clean"].isin(["loft cancelled"]),
"partially cancelled",
survey_list["installation_status"]
)
# Finally, for other cases, we set the status to "in progress"
survey_list["installation_status"] = survey_list["installation_status"].fillna("in progress")
# We concatenate the scheme name with the installation status
survey_list["installation_status"] = (
survey_list[scheme_column] + " - " + survey_list["installation_status"]
)
# TODO: END TEMP
survey_list_to_merge = survey_list[["asset_list_row_id", scheme_column]].copy()
survey_list_to_merge["is_remaining"] = False
analysis_data = analysis_data.drop(columns="is_remaining").merge(
survey_list_to_merge,
how="left", on="asset_list_row_id"
)
analysis_data["is_remaining"] = analysis_data["is_remaining"].fillna(True)
n_sold_eco4 = survey_list_to_merge[survey_list_to_merge[scheme_column] == "ECO4"].shape[0]
n_sold_gbis = survey_list_to_merge[survey_list_to_merge[scheme_column] == "GBIS"].shape[0]
# Take just remaining
analysis_data = analysis_data[analysis_data["is_remaining"]]
# Also, if the HA has started selling, we remove any that are still subject to ciga
n_eco4_missed_subject_to_ciga = 0
if not inputs["survey_list"].empty:
n_eco4_missed_subject_to_ciga = (analysis_data["ECO Eligibility"] == "eco4 (subject to ciga)").sum()
analysis_data = analysis_data[analysis_data["ECO Eligibility"] != "eco4 (subject to ciga)"]
################################################################################################
# We take the properties that strictly qualified under eco
################################################################################################
@ -1714,8 +1834,11 @@ def analyse_ha_data(outputs, loader):
eco4_identified["identification_type"]
)
# For expansive, the property can be no higher than an EPC C
eco4_identified["identification_type"] = np.where(
(eco4_identified["eco4_eligible"] == True) & (eco4_identified["eco4_strict"] == False),
(eco4_identified["eco4_eligible"] == True) & (eco4_identified["eco4_strict"] == False) & (
eco4_identified["sap"] <= epc_c_threshold
),
"expansive",
eco4_identified["identification_type"]
)
@ -1743,21 +1866,17 @@ def analyse_ha_data(outputs, loader):
"Meets fabric, fails SAP check",
"Meets cavity, loft borderline, meets sap",
]
),
) & (ciga_dependent_identified["sap"] <= epc_c_threshold),
"strict",
ciga_dependent_identified["identification_type"]
)
ciga_dependent_identified["identification_type"] = np.where(
(ciga_dependent_identified["eco4_message"].isin(["All conditions fail", "failed fabric check"])) &
(ciga_dependent_identified["walls"].isin(["Cavity wall, filled cavity"])),
"expansive",
ciga_dependent_identified["identification_type"]
)
ciga_dependent_identified["identification_type"] = np.where(
(ciga_dependent_identified["eco4_message"].isin(["Meets just cavity"])) | (
((ciga_dependent_identified["eco4_message"].isin(["Meets just cavity"])) | (
ciga_dependent_identified["walls"].isin(["Cavity wall, filled cavity"])
)) & (
(ciga_dependent_identified["sap"] <= epc_c_threshold) &
pd.isnull(ciga_dependent_identified["identification_type"])
),
"expansive",
ciga_dependent_identified["identification_type"]
@ -1775,7 +1894,9 @@ def analyse_ha_data(outputs, loader):
)
gbis_identified["identification_type"] = np.where(
(gbis_identified["gbis_eligible"] == True) & (gbis_identified["sap"] >= 69),
(gbis_identified["gbis_eligible"] == True) & (gbis_identified["sap"] <= epc_c_threshold) & (
pd.isnull(gbis_identified["identification_type"])
),
"expansive",
gbis_identified["identification_type"]
)
@ -1806,9 +1927,16 @@ def analyse_ha_data(outputs, loader):
]
surplus_gbis = surplus_gbis[surplus_gbis["is_estimated"] == False]
# Output variables
# Output variables - the data was sent to us in December, but the remaining figures are
# what was in November
november_remaining = loader.december_figures[loader.december_figures["HA Name"] == ha_name]
# ECO4
n_properties_in_asset_list = inputs["asset_list"].shape[0]
n_properties_remaining_in_asset_list = inputs["asset_list"].shape[0]
november_eco4_remaining = max(november_remaining["ECO4 remaining"].values[0], 0)
november_eco4_sold = november_remaining["No. of Tech surveys complete - Eco 4"].values[0]
eco4_sales_since_november = n_sold_eco4 - november_eco4_sold
n_warmfront_identified_eco4 = eco4_identified.shape[0] + ciga_dependent_identified.shape[0]
eco4_of_which_identified_strict = (
eco4_identified[eco4_identified["identification_type"] == "strict"].shape[0] +
@ -1820,26 +1948,37 @@ def analyse_ha_data(outputs, loader):
)
# GBIS
n_warmfront_identified_gbis = gbis_identified.shape[0]
november_gbis_remaining = max(november_remaining["GBIS remaining"].values[0], 0)
november_gbis_sold = november_remaining["No. of Tech surveys complete - GBIS"].values[0]
gbis_sales_since_november = n_sold_gbis - november_gbis_sold
gbis_of_which_identified_strict = gbis_identified[gbis_identified["identification_type"] == "strict"].shape[0]
gbis_of_which_identified_expansive = \
gbis_identified[gbis_identified["identification_type"] == "expansive"].shape[0]
to_append = {
("", "HA Name"): ha_name,
("", "# Properties in asset list"): n_properties_in_asset_list,
("", "# properties in asset list"): n_properties_remaining_in_asset_list,
############
# ECO4
############
("ECO4", "# Properties identieid by Warmfront"): n_warmfront_identified_eco4,
("ECO4", "# remaining November file"): november_eco4_remaining,
("ECO4", "# sold in November file"): november_eco4_sold,
("ECO4", "# sold (survey list)"): n_sold_eco4,
("ECO4", "# that missed CIGA check"): n_eco4_missed_subject_to_ciga,
("ECO4", "# Remaining properties (asset list)"): n_warmfront_identified_eco4,
("ECO4", "Of which identified by model - strict"): eco4_of_which_identified_strict,
("ECO4", "Of which identified by model - expansive"): eco4_of_which_identified_expansive,
("ECO4", "Of which identified by model - total"): (
eco4_of_which_identified_strict + eco4_of_which_identified_expansive),
eco4_of_which_identified_strict + eco4_of_which_identified_expansive
),
("ECO4", "Additional properties"): surplus_eco4.shape[0],
############
# GBIS
############
("GBIS", "# Properties identieid by Warmfront"): n_warmfront_identified_gbis,
("GBIS", "# remaining November file"): november_gbis_remaining,
("GBIS", "# sold in November file"): november_gbis_sold,
("GBIS", "# sold (survey list)"): n_sold_gbis,
("GBIS", "# Remaining properties (asset list)"): n_warmfront_identified_gbis,
("GBIS", "Of which identified by model - strict"): gbis_of_which_identified_strict,
("GBIS", "Of which identified by model - expansive"): gbis_of_which_identified_expansive,
("GBIS", "Of which identified by model - total"): (
@ -1850,6 +1989,24 @@ def analyse_ha_data(outputs, loader):
ha_analysis_results.append(to_append)
# Calculate the revenue results
to_append_revenue = {
("", "HA Name"): ha_name,
# Eco4 revenue
("ECO4", "£ remaining November file"): november_eco4_remaining * eco4_rate,
("ECO4", "£ sold November file"): november_eco4_sold * old_eco4_rate,
("ECO4", "£ sold since November"): eco4_sales_since_november * eco4_rate,
("ECO4", "£ stuck at ciga check"): n_eco4_missed_subject_to_ciga * eco4_rate,
("ECO4", "£ remaining (asset list)"): n_warmfront_identified_eco4 * eco4_rate,
("ECO4", "Of which identified by model - strict"): eco4_of_which_identified_strict * eco4_rate,
("ECO4", "Of which identified by model - expansive"): eco4_of_which_identified_expansive * eco4_rate,
("ECO4", "Of which identified by model - total"): eco4_rate * (
eco4_of_which_identified_strict + eco4_of_which_identified_expansive
),
("ECO4", "Additional properties"): eco4_rate * surplus_eco4.shape[0],
}
total_revenue_results.append(to_append_revenue)
ha_analysis_results = pd.DataFrame(ha_analysis_results)
ha_analysis_results.columns = pd.MultiIndex.from_tuples(ha_analysis_results.columns)
@ -1862,8 +2019,8 @@ def analyse_ha_data(outputs, loader):
facts_and_figures = facts_and_figures.rename(
columns={
# ECO4 cols
"ECO4": "ECO4 - December",
"GBIS": "GBIS - December",
"ECO4": "ECO4 - November",
"GBIS": "GBIS - November",
"eco4 (subject to ciga)": "ECO4 - subject to ciga",
"eco4": "ECO4 - doesn't need CIGA",
"eco4 - passed ciga": "ECO4 - passed CIGA",
@ -1880,19 +2037,27 @@ def analyse_ha_data(outputs, loader):
# ECO4 - doesn't need CIGA + ECO4 - passed CIGA
# 2) if ciga checks haven't been completed (i.e. ECO4 - passed ciga is missing), this sum is
# ECO4 - doesn't need CIGA + ECO4 - subject to ciga
facts_and_figures["ECO4 total (asset list)"] = np.where(
facts_and_figures["ECO4 total (asset list - pre ciga)"] = (
facts_and_figures["ECO4 - doesn't need CIGA"] +
facts_and_figures["ECO4 - subject to ciga"] +
facts_and_figures["ECO4 - passed CIGA"]
)
facts_and_figures["ECO4 total (asset list - post ciga)"] = None
facts_and_figures["ECO4 total (asset list - post ciga)"] = np.where(
facts_and_figures["ECO4 - passed CIGA"] > 0,
facts_and_figures["ECO4 - doesn't need CIGA"] + facts_and_figures["ECO4 - passed CIGA"],
facts_and_figures["ECO4 - doesn't need CIGA"] + facts_and_figures["ECO4 - subject to ciga"]
facts_and_figures["ECO4 total (asset list - post ciga)"]
)
# Re-arrange the columns
facts_and_figures = facts_and_figures[
[
'HA Name',
'ECO4 - December',
'GBIS - December',
'ECO4 total (asset list)',
'ECO4 - November',
'GBIS - November',
'ECO4 total (asset list - pre ciga)',
'ECO4 total (asset list - post ciga)',
'GBIS total (asset list)',
'ECO4 - subject to ciga',
"ECO4 - doesn't need CIGA",
@ -1916,6 +2081,8 @@ def analyse_ha_data(outputs, loader):
facts_and_figures["Missed CIGA checks opportunity"]
)
facts_and_figures.to_csv("Facts and figures sample.csv")
# Re arrage the columns
# Also sort ha_analysis_results by ha number
@ -1937,6 +2104,333 @@ def analyse_ha_data(outputs, loader):
for i, width in enumerate(get_col_widths(df)):
writer.sheets[sheet].set_column(i, i, width)
# Inspection: - Looking into the proportion of homes with "cavity, as built, insulated (assumed)" as their
# description, and what proportion of time they get identified via non-invasive surveys
# true_eco4_assets = []
# ciga_dependent_assets = []
# not_eligible = []
# as_built_insulated = []
# date_cols = {
# "HA39": "date_built",
# "HA14": "Built In Year",
# "HA6": "Construction Year",
# "HA1": "Build Date",
# "HA107": "YEAR BUILT"
# }
# for ha_name, data_objects in outputs.items():
# inputs = [x for k, x in loader.data.items() if k == ha_name][0]
#
# date_col = date_cols[ha_name]
# results_df = data_objects["results_df"].copy()
# df = inputs["asset_list"][['asset_list_row_id', "ECO Eligibility", date_col]].rename(
# columns={"row_meaning": "asset_identification_status", date_col: "date_built"}
# ).merge(
# results_df,
# how="left",
# right_on="row_id",
# left_on="asset_list_row_id"
# )
#
# # take the true ECO4
# true_eco4 = df[df["ECO Eligibility"] == "eco4"].copy()
# ciga_dependent = df[
# df["ECO Eligibility"].isin(
# [
# "eco4 (subject to ciga)",
# "failed ciga",
# "eco4 - passed ciga"
# ]
# )
# ]
# insulated_assumed = df[df["walls"] == "Cavity wall, as built, insulated"].copy()
# # We convert date built to datetime
# try:
# insulated_assumed = insulated_assumed[~pd.isnull(insulated_assumed["date_built"])]
# insulated_assumed["year_built"] = pd.to_datetime(insulated_assumed["date_built"].astype(str)).dt.year
# as_built_insulated.append(insulated_assumed)
# except Exception as e:
# print("oh well")
#
# true_eco4_assets.append(true_eco4)
# ciga_dependent_assets.append(ciga_dependent)
#
# true_eco4_assets = pd.concat(true_eco4_assets)
# ciga_dependent_assets = pd.concat(ciga_dependent_assets)
# as_built_insulated = pd.concat(as_built_insulated)
#
# true_eco4_assets["walls"].value_counts(normalize=True)
# ciga_dependent_assets["walls"].value_counts(normalize=True)
#
# from recommendations.recommendation_utils import extract_insulation_thickness
#
# true_eco4_assets["roof_insulation_thickness"] = true_eco4_assets["roof"].apply(
# lambda x: extract_insulation_thickness(x)
# )
#
# true_eco4_assets["e"] = true_eco4_assets.merge(
# pd.DataFrame(cleaned["roof-description"])[["original_description", "insulation_thickness"]],
# how="left",
# left_on="roof",
# right_on="original_description"
# )
#
# true_eco4_assets["sap"].mean()
#
# true_eco4_assets["insulation_thickness"].isin(
# ["250", "150", "200", "100", "75", "50"]
# ).sum() / true_eco4_assets.shape[0]
#
# true_eco4_assets["insulation_thickness"].isin(
# ["100"]
# ).sum() / true_eco4_assets.shape[0]
#
# as_built_insulated.groupby("property_type")["ECO Eligibility"].value_counts(normalize=True)
def get_propensity_model_data(
loader, cleaned, cleaning_data, created_at, photo_supply_lookup,
floor_area_decile_thresholds, pull_data=True
):
# TODO: Set a seed!
model_data = []
for ha_name, data_assets in loader.data.items():
logger.info("Processing HA: %s", ha_name)
if data_assets["survey_list"].empty:
continue
number_sold = data_assets["survey_list"].shape[0]
# For each HA, we read pull in the data required, and store in S3
asset_list = data_assets["asset_list"].copy()
# We determine the number of properties that we should select that are eligible
asset_list_size = asset_list.shape[0]
# Number eligible
n_eligibile = asset_list[asset_list["ECO Eligibility"] != "not eligible"].shape[0]
success_rate = n_eligibile / asset_list_size
needed_sample_size = np.ceil(number_sold / success_rate)
number_negative_samples = int(needed_sample_size - number_sold)
sold_asset_list_ids = data_assets["survey_list"]["asset_list_row_id"].tolist()
negative_sample_asset_list_ids = asset_list["asset_list_row_id"].sample(number_negative_samples).tolist()
sample_ids = sold_asset_list_ids + negative_sample_asset_list_ids
sample_asset_list = asset_list[asset_list["asset_list_row_id"].isin(sample_ids)]
# In order to have the most confidence, we should take just properties that have 1 EPC. We might need to
# cut down the number of properties that we include because of this
# Note: This is an imbalanced problem so we will need to build a model accomadating of that
data = []
errors = []
for index, property_meta in tqdm(sample_asset_list.iterrows(), total=len(sample_asset_list)):
if property_meta["matching_postcode"] is None:
continue
property_type, built_form = get_property_type_and_built_form(
property_meta=property_meta, ha_name=ha_name
)
searcher = SearchEpc(
address1=str(property_meta["HouseNo"]),
postcode=property_meta["matching_postcode"],
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
full_address=property_meta["matching_address"]
)
searcher.ordnance_survey_client.property_type = property_type
searcher.ordnance_survey_client.built_form = built_form
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
continue
if searcher.newest_epc.get("estimated"):
# We insert the row ID as our proxy for UPRN
searcher.newest_epc["uprn"] = int(property_meta["asset_list_row_id"].split(ha_name)[1])
newest_epc = searcher.newest_epc
older_epcs = searcher.older_epcs
full_sap_epc = searcher.full_sap_epc
# If we have more than 1 EPC for the moment we just continue
if older_epcs or full_sap_epc:
continue
try:
# We clean up the data
epc_records = {
'original_epc': newest_epc.copy(),
'full_sap_epc': full_sap_epc.copy(),
'old_data': older_epcs.copy(),
}
epc_record = EPCRecord(
epc_records=epc_records,
run_mode="newdata",
cleaning_data=cleaning_data
)
# If we have some data, continue
data.append(
{
"ECO Eligibility": property_meta["ECO Eligibility"],
"asset_list_row_id": property_meta["asset_list_row_id"],
**epc_record.get("prepared_epc")
}
)
except Exception as e:
errors.append(
{
"error": str(e),
"asset_list_row_id": property_meta["asset_list_row_id"],
"matching_postcode": property_meta["matching_postcode"],
"matching_address": property_meta["matching_address"]
}
)
data = pd.DataFrame(data)
# We store the results in S3 as a pickle
save_pickle_to_s3(
data=data,
bucket_name="retrofit-datalake-dev",
s3_file_name=f"propensity_model_data/{ha_name}/train.pickle"
)
# Store the errors
if errors:
save_pickle_to_s3(
data=errors,
bucket_name="retrofit-datalake-dev",
s3_file_name=f"propensity_model_data/{ha_name}/errors.pickle"
)
model_data.append(data)
return model_data
def conversion_model(loader):
# Read in the model data
model_data = []
for ha_name in loader.data.keys():
try:
picked = read_pickle_from_s3(
bucket_name="retrofit-datalake-dev",
s3_file_name=f"propensity_model_data/{ha_name}/train.pickle"
)
data = pd.DataFrame(picked)
# We merge on the sales data
sales_data = loader.data[ha_name]["survey_list"].copy()
data = data.merge(
sales_data[["asset_list_row_id", "installation_status"]],
how="left",
on="asset_list_row_id"
)
data["ha_name"] = ha_name
except Exception as e:
logger.error("Error reading in the data for %s", ha_name)
continue
model_data.append(data)
model_data = pd.concat(model_data)
model_data["response"] = model_data["installation_status"].isin(
[
"ECO4 - in progress",
"ECO4 - installed"
]
).astype(int)
# Because of how we pulled the data, we need to re-balance the sample
ha_names = model_data["ha_name"].unique()
balanced_sample = []
for ha_name in ha_names:
df = model_data[model_data["ha_name"] == ha_name]
positive_samples = df[df["response"] == 1]
negative_samples = df[df["response"] != 1]
inputs = [x for k, x in loader.data.items() if k == ha_name][0]
asset_list = inputs["asset_list"].copy()
asset_list_size = asset_list.shape[0]
n_eligibile = asset_list[asset_list["ECO Eligibility"] != "not eligible"].shape[0]
success_rate = n_eligibile / asset_list_size
needed_sample_size = np.ceil(positive_samples.shape[0] / success_rate)
number_negative_samples = int(needed_sample_size - positive_samples.shape[0])
negative_samples_subset = negative_samples.sample(number_negative_samples)
output = pd.concat([positive_samples, negative_samples_subset])
balanced_sample.append(output)
balanced_sample = pd.concat(balanced_sample)
# We work with a small sample
# Drop the ECO Eligibility column and installation_status column
# We keep the ID column
balanced_sample = balanced_sample.drop(
columns=['ECO Eligibility', 'asset_list_row_id', 'address', 'uprn_source', 'address3', 'local_authority_label',
'county', 'postcode', 'constituency', 'local_authority', 'inspection_date', 'address1',
'constituency_label', 'building_reference_number', 'address2', 'posttown', 'lodgement_datetime',
'uprn', 'lodgement_date', 'lmk_key', 'installation_status', 'ha_name']
)
# POC model
df = balanced_sample.copy()
# FIll missings with means, if they exist
numeric_cols = df.select_dtypes(include=['float64', 'int64']).columns
df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].mean())
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
df[categorical_cols] = df[categorical_cols].fillna("other")
# Reduce the number of categories to a specific number and the rest to other
max_n_categories = 10
for col in categorical_cols:
top_categories = df[col].value_counts().nlargest(max_n_categories).index
df[col] = df[col].where(df[col].isin(top_categories), other="other")
# Use a model based approach to feature selection
import xgboost as xgb
from sklearn.model_selection import train_test_split
# Assuming your outcome column is named 'target'
X = df.drop(columns=['response'])
y = df['response']
df["low_energy_fixed_light_count"].va
# Encoding categorical variables if not already done
X = pd.get_dummies(X, drop_first=True)
# Splitting the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Initialize an XGBoost classifier
model = xgb.XGBClassifier()
# Fit the model
model.fit(X_train, y_train)
# Get feature importances
feature_importances = model.feature_importances_
# Map feature importances to their corresponding column names
feature_importance_dict = {feature: importance for feature, importance in zip(X.columns, feature_importances)}
# Sort features by importance
sorted_features = sorted(feature_importance_dict.items(), key=lambda item: item[1], reverse=True)
# Display sorted features
for feature, importance in sorted_features:
print(f"{feature}: {importance}")
def patch_cleaned(cleaned):
# Patch to handle the a missing description
@ -2054,6 +2548,218 @@ def patch_cleaned(cleaned):
return cleaned
def forecast_remaining_sales(loader):
# Assumptions:
# We cap the ciga conversion rate at 75% because I expect future HAs to have a lower CIGA conversion rate
# and I don't want the numbers to change too much, depenent on the CIGA conversation rate
maximum_ciga_conversion = 0.75
gbis_rate = 600
eco4_rate = 1710
old_gbis_rate = 432
old_eco4_rate = 1456
# 1) Calculate the conversion rate from passed CIGA to actual sale
converted_ciga_jobs = []
for ha_name, input_data in loader.data.items():
asset_list = input_data["asset_list"].copy()
survey_list = input_data["survey_list"].copy()
if survey_list.empty:
continue
ciga_dependent_assets = asset_list[
asset_list["ECO Eligibility"] == "eco4 - passed ciga"
]
# These are now the ciga dependent assets at installation
ciga_dependent_assets_at_installation = ciga_dependent_assets.merge(
survey_list[["asset_list_row_id", "installation_status"]],
how="inner",
on="asset_list_row_id"
)
# We then calculate how many get cancelled
ciga_dependent_assets_sold = ciga_dependent_assets_at_installation[
ciga_dependent_assets_at_installation["installation_status"].isin(
[
"ECO4 - installed", "ECO4 - in progress"
]
)
]
ciga_dependent_assets_failed = ciga_dependent_assets_at_installation[
~ciga_dependent_assets_at_installation["installation_status"].isin(
[
"ECO4 - installed", "ECO4 - in progress"
]
)
]
converted_ciga_jobs.append(
{
"HA Name": ha_name,
"# Ciga dependent at installation": ciga_dependent_assets_at_installation.shape[0],
"# Ciga dependent successfully installed": ciga_dependent_assets_sold.shape[0],
"# Ciga dependent failed install": ciga_dependent_assets_failed.shape[0]
}
)
converted_ciga_jobs = pd.DataFrame(converted_ciga_jobs)
# We calculate a ciga pass to install conversaion rate
median_ciga_pass_to_install = (
converted_ciga_jobs["# Ciga dependent successfully installed"].sum() /
converted_ciga_jobs["# Ciga dependent at installation"].sum()
)
# 2) Calculate the conversion rate from CIGA dependent ciga passed
ciga_passrates = []
for ha_name, input_data in loader.data.items():
# If we don't have a ciga list, we can't do anything
if input_data["ciga_list"].empty:
continue
# 1) Calculate the conversion rate for CIGA to actual sale
asset_list = input_data["asset_list"].copy()
ciga_completed_assets = asset_list[
asset_list["ECO Eligibility"].isin(
[
"eco4 - passed ciga",
"failed ciga"
]
)
]
ciga_passed = ciga_completed_assets[
ciga_completed_assets["ECO Eligibility"].isin(
[
"eco4 - passed ciga"
]
)
]
ciga_passrates.append(
{
"Ha Name": ha_name,
"# CIGA dependent": ciga_completed_assets.shape[0],
"# CIGA passed": ciga_passed.shape[0],
}
)
ciga_passrates = pd.DataFrame(ciga_passrates)
median_ciga_pass_to_install = ciga_passrates["# CIGA passed"].sum() / ciga_passrates["# CIGA dependent"].sum()
# 3) Calculate the conversion rate of an ECO4 and a GBISjob, that doesn't need ciga, to install
eco4_ciga_independent_passrates = []
gbis_ciga_independent_passrates = []
for ha_name, input_data in loader.data.items():
asset_list = input_data["asset_list"].copy()
survey_list = input_data["survey_list"].copy()
if survey_list.empty:
continue
# For properties that were identified as a typical ECO4 job, we calculate the number of properties that
# installed
# vs cancelled
typical_eco4 = asset_list[asset_list["ECO Eligibility"] == "eco4"]
typical_gbis = asset_list[asset_list["ECO Eligibility"] == "gbis"]
# Merge on the surveys
typical_eco4_installed = typical_eco4.merge(
survey_list[["asset_list_row_id", "installation_status"]], how="inner", on="asset_list_row_id"
)
if not typical_eco4_installed.empty:
typical_eco4_sold = typical_eco4_installed[
typical_eco4_installed["installation_status"].isin(
[
"ECO4 - installed", "ECO4 - in progress"
]
)
]
eco4_ciga_independent_passrates.append(
{
"Ha Name": ha_name,
"# ECO4 at install stage": typical_eco4_installed.shape[0],
"# ECO4 successfully installed": typical_eco4_sold.shape[0]
}
)
typical_gbis_installed = typical_gbis.merge(
survey_list[["asset_list_row_id", "installation_status"]], how="inner", on="asset_list_row_id"
)
if not typical_gbis_installed.empty:
typical_gbis_sold = typical_gbis_installed[
typical_gbis_installed["installation_status"].isin(
[
"GBIS - in progress", "GBIS - installed"
]
)
]
gbis_ciga_independent_passrates.append(
{
"Ha Name": ha_name,
"# GBIS at install stage": typical_gbis_installed.shape[0],
"# GBIS successfully installed": typical_gbis_sold.shape[0]
}
)
eco4_ciga_independent_passrates = pd.DataFrame(eco4_ciga_independent_passrates)
gbis_ciga_independent_passrates = pd.DataFrame(gbis_ciga_independent_passrates)
median_eco4_to_install = (
eco4_ciga_independent_passrates["# ECO4 successfully installed"].sum() /
eco4_ciga_independent_passrates["# ECO4 at install stage"].sum()
)
median_gbis_to_install = (
gbis_ciga_independent_passrates["# GBIS successfully installed"].sum() /
gbis_ciga_independent_passrates["# GBIS at install stage"].sum()
)
# Produce the final output
december_figures = loader.december_figures.copy()
december_figures = december_figures.fillna(0)
results = []
for ha_name, input_data in loader.data.items():
# Original warmfront figures
original_warmfront_estimates = december_figures[december_figures["HA Name"] == ha_name]
original_warmfront_eco4 = original_warmfront_estimates["ECO4"].values[0]
original_warmfront_remaining_eco4 = original_warmfront_estimates["ECO4 remaining"].values[0]
original_warmfront_gbis = original_warmfront_estimates["GBIS"].values[0]
original_warmfront_remaining_gbis = original_warmfront_estimates["GBIS remaining"].values[0]
original_warmfront_eco4_revenue = (
original_warmfront_remaining_eco4 * eco4_rate +
(original_warmfront_eco4 - original_warmfront_remaining_eco4) * old_eco4_rate
)
original_warmfront_remaining_eco4_revenue = original_warmfront_remaining_eco4 * eco4_rate
original_warmfront_gbis_revenue = (
original_warmfront_remaining_gbis * gbis_rate +
(original_warmfront_gbis - original_warmfront_remaining_gbis) * old_gbis_rate
)
results.append(
{
("", "", "HA Name"): ha_name,
("Original Warmfront estimate", "Total - #", "ECO4 - November"): original_warmfront_eco4,
("", "Remaining - #", ""): original_warmfront_remaining_eco4,
("", "Total - £", ""): original_warmfront_eco4_revenue,
("", "Remaining - £", ""): original_warmfront_remaining_eco4_revenue,
}
)
def app():
"""
This app contains the housin association analysis for HAs 1, 6, 14, 39 and 107.
@ -2067,11 +2773,14 @@ def app():
pull_data = False
# List all of the data in the folder
directories = [str(list(entry.iterdir())[0]) for entry in DATA_FOLDER.iterdir() if entry.is_dir()]
directories = [str(file) for entry in DATA_FOLDER.iterdir() if entry.is_dir()
for file in entry.iterdir() if file.suffix == '.xlsx']
# Grab the December HA figures filepath
december_figures_filepath = "local_data/ha_data/HA_December_figures.csv"
priority_has = ["HA1", "HA6", "HA7", "HA14", "HA16", "HA24", "HA39", "HA107"]
# priority_has = ["HA1", "HA6", "HA7", "HA14", "HA16", "HA24", "HA39", "HA107"]
priority_has = ["HA1", "HA6", "HA7", "HA14", "HA16", "HA39", "HA107"]
# Filter down the directories to only the priority HAs
directories = [d for d in directories if d.split("/")[2] in priority_has]
@ -2103,3 +2812,17 @@ def app():
floor_area_decile_thresholds=floor_area_decile_thresholds,
pull_data=pull_data
)
analyse_ha_data(outputs, loader)
# import pickle
# with open("ha_analysis.pickle", "wb") as f:
# pickle.dump({"outputs": outputs, "loader": loader}, f)
# To read:
# import pickle
# with open("ha_analysis.pickle", "rb") as f:
# outputs = pickle.load(f)["outputs"]
#
# with open("loader.pickle", "rb") as f:
# loader = pickle.load(f)

View file

@ -184,7 +184,7 @@ def read_pickle_from_s3(bucket_name, s3_file_name):
logger.errpr("Incomplete credentials provided.")
return None
except Exception as e:
logger.errpr(f'Failed to download data from {bucket_name}/{s3_file_name}: {str(e)}')
logger.error(f'Failed to download data from {bucket_name}/{s3_file_name}: {str(e)}')
return None
# Deserialize data from pickle format