testing with another stupid effing method

This commit is contained in:
Khalim Conn-Kowlessar 2024-03-14 13:58:29 +00:00
parent 6a327629bf
commit ddb5de50e5
3 changed files with 241 additions and 8 deletions

2
.idea/.gitignore generated vendored
View file

@ -1,3 +1,5 @@
# Default ignored files
/shelf/
/workspace.xml
# GitHub Copilot persisted chat sessions
/copilot/chatSessions

View file

@ -7,7 +7,9 @@ import msgpack
from datetime import datetime
import pandas as pd
import numpy as np
from utils.s3 import read_from_s3, read_dataframe_from_s3_parquet, save_pickle_to_s3, read_pickle_from_s3
from utils.s3 import (
read_from_s3, read_dataframe_from_s3_parquet, save_pickle_to_s3, read_pickle_from_s3, save_dataframe_to_s3_parquet
)
from utils.logger import setup_logger
from dotenv import load_dotenv
from tqdm import tqdm
@ -2860,8 +2862,8 @@ def get_property_type_and_built_form(property_meta, ha_name):
property_type = PROPERTY_TYPE_LOOKUP[ha_name]["property_type"][property_meta["Dwelling type"]]
built_form = property_meta["built_form"]
elif ha_name == "HA7":
property_type = PROPERTY_TYPE_LOOKUP[ha_name]["property_type"][property_meta["Archetype"]]
built_form = PROPERTY_TYPE_LOOKUP[ha_name]["built_form"][property_meta["Property Type"]]
property_type = PROPERTY_TYPE_LOOKUP[ha_name]["property_type"].get(property_meta["Archetype"])
built_form = PROPERTY_TYPE_LOOKUP[ha_name]["built_form"].get(property_meta["Property Type"])
elif ha_name == "HA14":
if property_meta["Asset Type Description"] == "Block - Repair":
# We try and deduce if it's a flat or house, depending on if it has "room" or "flats" in the address
@ -4429,6 +4431,12 @@ def forecast_remaining_sales(loader):
for ha_name, input_data in loader.data.items():
# Original warmfront figures - ECO4
original_warmfront_estimates = december_figures[december_figures["HA Name"] == ha_name]
if original_warmfront_estimates.empty:
# Append an empty row
original_warmfront_estimates = december_figures.head(1).copy()
for k in original_warmfront_estimates.columns:
original_warmfront_estimates[k] = 0
original_warmfront_estimates["HA Name"] = ha_name
original_warmfront_eco4 = original_warmfront_estimates["ECO4"].values[0]
original_warmfront_remaining_eco4 = original_warmfront_estimates["ECO4 remaining"].values[0]
@ -4742,6 +4750,12 @@ def forecast_remaining_sales(loader):
if gbis_variance_2 != 0:
raise ValueError("Something went wrong in gbis_variance2")
# Update the GBIS sold, since Warmfront often sold more GBIS that expected
original_warmfront_gbis_revenue = original_warmfront_sold_gbis + original_warmfront_remaining_gbis_revenue
original_warmfront_gbis = (
original_warmfront_sold_gbis / gbis_rate + original_warmfront_remaining_gbis_revenue / gbis_rate
)
to_append = {
("", "", "", "HA Name"): ha_name,
# ECO4 - original warmfront figures
@ -5077,6 +5091,216 @@ def forecast_remaining_sales(loader):
results.to_csv(file, header=True, index=False)
def fml_data_pull(loader):
has_bruh = ["HA7"]
from backend.SearchEpc import SearchEpc
epc_api_key = "a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA="
for ha in has_bruh:
asset_list = loader.data[ha]["asset_list"].copy()
# properties found as eligibile
fml = asset_list[asset_list["ECO Eligibility"] != "not eligible"]
# For each property, search for the latest EPC
epc_data = []
for _, row in tqdm(fml.iterrows(), total=fml.shape[0]):
property_type, built_form = get_property_type_and_built_form(property_meta=row, ha_name=ha)
searcher = SearchEpc(
address1=row["HouseNo"],
postcode=row["matching_postcode"],
auth_token=epc_api_key,
os_api_key="",
property_type=property_type,
full_address=row["matching_address"],
)
searcher.ordnance_survey_client.property_type = property_type
searcher.ordnance_survey_client.built_form = built_form
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
continue
epc = {
"asset_list_row_id": row["asset_list_row_id"],
**searcher.newest_epc.copy()
}
epc_data.append(epc)
# Remove None entries
epc_data = [x for x in epc_data if x is not None]
# Save the data in S3 as a parquet
epc_data_df = pd.DataFrame(epc_data)
save_pickle_to_s3(
data=epc_data_df,
bucket_name="retrofit-datalake-dev",
s3_file_name=f"ha-analysis/revised/{ha}/epc_data.pickle"
)
def extract_lower_bound(age_band):
if pd.isna(age_band):
return 1930
try:
return int(age_band.split(':')[1].split('-')[0].strip())
except (ValueError, IndexError):
return 1930
def fml_analysis(loader):
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
from etl.epc.DataProcessor import EPCDataProcessor
assumed_ciga_pass_rate = 0.731
has_bruh = ["HA7"]
results = []
for ha_name in has_bruh:
original_figures = loader.december_figures[
loader.december_figures["HA Name"] == ha_name
].copy()
original_remaining = original_figures["ECO4 remaining"].values[0]
# Read in the epc data
asset_list = loader.data[ha_name]["asset_list"].copy()
# properties found as eligibile
fml = asset_list[asset_list["ECO Eligibility"] != "not eligible"]
epc_data = read_pickle_from_s3(
bucket_name="retrofit-datalake-dev",
s3_file_name=f"ha-analysis/revised/{ha_name}/epc_data.pickle"
)
fuck_this = fml.merge(
epc_data, how="left", on="asset_list_row_id"
)
if fuck_this.shape[0] != fml.shape[0]:
raise Exception("What the fuck bruv")
# Take just remaining
if not loader.data[ha_name]["survey_list"].empty:
raise NotImplementedError("TAKE JUST REMAINING IDIOT")
insulation_thicknesses = []
for _, x in fuck_this.iterrows():
if pd.isnull(x["roof-description"]):
continue
thickness = RoofAttributes(x["roof-description"]).process()["insulation_thickness"]
# If there is a + in the thickness, strip it out
thickness = str(thickness).replace("+", "")
insulation_thicknesses.append(
{'uprn': x["uprn"], "roof_insulation_thickness": thickness}
)
insulation_thicknesses = pd.DataFrame(insulation_thicknesses)
fuck_this = fuck_this.merge(insulation_thicknesses, how="left", on="uprn")
# clean roof insulation
fuck_this["roof_insulation_thickness"] = fuck_this["roof_insulation_thickness"].fillna("0")
fuck_this["roof_insulation_thickness"] = fuck_this[
"roof_insulation_thickness"
].str.replace("below average", "50")
fuck_this["roof_insulation_thickness"] = fuck_this[
"roof_insulation_thickness"
].str.replace("None", "0")
fuck_this["roof_insulation_thickness"] = fuck_this[
"roof_insulation_thickness"
].str.replace("none", "0")
fuck_this["roof_insulation_thickness"] = fuck_this[
"roof_insulation_thickness"
].str.replace("average", "150")
fuck_this["construction-age-band"] = fuck_this["construction-age-band"].apply(
lambda x: EPCDataProcessor.clean_construction_age_band(x)
)
fuck_this['age_lower_bound'] = fuck_this['construction-age-band'].apply(extract_lower_bound)
had_survey = fuck_this[pd.isnull(fuck_this["estimated"])]
# proportion with a survey:
proportion_with_survey = 100 * had_survey.shape[0] / fuck_this.shape[0]
# Let's look just at the ECO4 business
# For things that had a survey, take the properties that didn't need a CIGA check
no_ciga_check_needed = had_survey[
had_survey["ECO Eligibility"] == "eco4"
]
no_ciga_check_needed_with_archetype = no_ciga_check_needed[
(no_ciga_check_needed["walls-description"].str.lower().str.contains("cavity") == True) &
(no_ciga_check_needed["roof-description"].str.lower().str.contains("pitched") == True) &
(no_ciga_check_needed["current-energy-efficiency"].astype(float) <= 80)
]
if not no_ciga_check_needed_with_archetype.empty:
raise Exception("SORT ME OUT")
# Characterise no CIGA check needed
# TODO: WHAT ABOUT PASSED CIGA - don't need to apply the further deduction
ciga_check_needed = had_survey[
had_survey["ECO Eligibility"].str.contains("subject to ciga")
]
# We take just the cavity walls
# UCL paper: https://discovery.ucl.ac.uk/id/eprint/10110371/
# This paper is based on London properties
# The proportion of EPCs with building characteristics errors are shown to
# differ between variables; floor and wall type errors occur in ~10-15% of EPCs,
# compared with ~5% for wall insulation and glazing performance
ciga_check_needed_with_archetype = ciga_check_needed[
(ciga_check_needed["walls-description"].str.lower().str.contains("cavity") == True) &
(ciga_check_needed["roof-description"].str.lower().str.contains("pitched") == True) &
(ciga_check_needed["current-energy-efficiency"].astype(float) <= 80)
]
# We take properties that could feasibly be within install regions
ciga_check_needed_plausible = ciga_check_needed_with_archetype[
ciga_check_needed_with_archetype["roof_insulation_thickness"].astype(float) < 270
]
if not loader.data[ha_name]["ciga_list"].empty:
raise NotImplementedError("SORT OUT THE CIGA BRUV")
else:
ha_ciga_pass_rate = assumed_ciga_pass_rate
ciga_check_expectation = np.round(ciga_check_needed_plausible.shape[0] * ha_ciga_pass_rate)
without_ciga_expectation = no_ciga_check_needed_with_archetype.shape[0]
# Need to add on the non-ciga
total_expectation = ciga_check_expectation + without_ciga_expectation
if proportion_with_survey < 100:
# We estimate the rest
without_survey_needing_ciga = fuck_this[
(pd.isnull(fuck_this["estimated"]) == False) &
(fuck_this["ECO Eligibility"].str.contains("subject to ciga") == True)
]
# We apply the same conversion rate as the properties with a survey
without_survey_without_ciga_expected = np.round(
without_survey_needing_ciga.shape[0] * (ciga_check_expectation / ciga_check_needed.shape[0])
)
total_expectation += without_survey_without_ciga_expected
without_survey_without_ciga = fuck_this[
(pd.isnull(fuck_this["estimated"]) == False) & (fuck_this["ECO Eligibility"].isin(["eco4"]))
]
if not without_survey_without_ciga.empty:
raise Exception("Estimate the rest!!")
results.append(
{
"HA Name": ha_name,
"Original ECO4 Estimate - Remaining": original_remaining,
"Proportion with a survey": proportion_with_survey,
"total_expectation": total_expectation
}
)
def app():
"""
This app contains the housin association analysis for HAs 1, 6, 14, 39 and 107.

View file

@ -122,6 +122,13 @@ class RoofAttributes(Definitions):
result["is_valid"] = "invalid" not in description
description = description.replace("invalid", "")
# We handle an edge case where the description is "pitched, 150 loft insulation" and is missing the mm
if result["is_pitched"] or result["is_loft"]:
# Search for a regular expression that matches 150 insulation
match = re.search(r"(\d+\+?)\s*insulation", description)
if match:
result['insulation_thickness'] = match.group(1)
# insulation thickness
thickness_map = {
"ceiling insulated": "average",
@ -137,11 +144,11 @@ class RoofAttributes(Definitions):
# Remove the match from the description
# description = description.replace(key, "")
break
else:
# Extract insulation thickness in mm, if present
match = re.search(r'(\d+\+?)\s*mm', description)
if match:
result['insulation_thickness'] = match.group(1)
# Extract insulation thickness in mm, if present
match = re.search(r'(\d+\+?)\s*mm', description)
if match:
result['insulation_thickness'] = match.group(1)
if "insulation_thickness" not in result:
result['insulation_thickness'] = None