rough analysis for funding eligibility

This commit is contained in:
Khalim Conn-Kowlessar 2024-09-12 18:10:27 +01:00
parent 5316477b94
commit ceb34979e4
4 changed files with 504 additions and 1 deletions

View file

@ -0,0 +1,97 @@
import os
import pandas as pd
from backend.SearchEpc import SearchEpc
from dotenv import load_dotenv
from tqdm import tqdm
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def app():
"""
Simple script to pull the EPC data for the Cleethorpes Portfolio
:return:
"""
asset_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/sfr/Cleethorpes Portoflio/Updated Tenancy Schedule "
"Portfolio.xlsx",
)
asset_list["row_id"] = asset_list.index
asset_list[" Street No."] = asset_list[" Street No."].astype(str)
epc_data = []
for _, property in tqdm(asset_list.iterrows(), total=len(asset_list)):
if property[" Street No."] == "Ground Floor Commercial":
continue
uprn = property["Uprn"]
if not pd.isnull(uprn):
searcher = SearchEpc(
address1="",
postcode="",
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
uprn=int(uprn)
)
searcher.find_property(skip_os=True)
else:
if not pd.isnull(property[" Flat No."]) and property[" Flat No."] not in ["", " "]:
address1 = property[" Flat No."].strip() + ", " + property[" Street No."].strip()
else:
address1 = property[" Street No."].strip()
if address1 == "1a Mews House 30":
address1 = "1a Rear of"
searcher = SearchEpc(
address1=address1,
postcode=property[" Postcode"].strip(),
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
uprn=None,
)
searcher.get_epc()
# Get the newest record on lodgement-date
sorted_epcs = sorted(
searcher.data["rows"], key=lambda x: x["lodgement-date"]
)
searcher.newest_epc = sorted_epcs[-1]
if searcher.newest_epc is None:
raise ValueError(f"No EPC found for UPRN: {uprn}")
epc_data.append(
{
"row_id": property["row_id"],
**searcher.newest_epc
}
)
epc_df = pd.DataFrame(epc_data)
# Merge on data
asset_list_with_epc = asset_list.merge(
epc_df[["row_id", "address", "current-energy-rating", "current-energy-efficiency", "lodgement-date"]],
how="left",
left_on="row_id",
right_on="row_id",
).rename(
columns={
"address": "EPC Address",
"current-energy-rating": "Current EPC Rating",
"current-energy-efficiency": "Current SAP Score",
"lodgement-date": "EPC Date"
}
)
asset_list_with_epc.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/sfr/Cleethorpes Portoflio/Portfolio with EPCs.xlsx",
index=False
)
epc_df.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/sfr/Cleethorpes Portoflio/epc_data.csv",
index=False
)

View file

@ -102,7 +102,7 @@ analysis_epcs = analysis_epcs[
[
"UPRN", "TENURE", "CURRENT_ENERGY_RATING", "WALLS_DESCRIPTION", "ROOF_DESCRIPTION",
"CONSTRUCTION_AGE_BAND", "TOTAL_FLOOR_AREA", "PROPERTY_TYPE", "BUILT_FORM", "MAINHEAT_DESCRIPTION",
"eligibility_type",
"eligibility_type", "PHOTO_SUPPLY", "ADDRESS1", "POSTCODE"
]
]
analysis_epcs["grouped_epc_band"] = np.where(
@ -110,6 +110,14 @@ analysis_epcs["grouped_epc_band"] = np.where(
"EPC D",
"EPC E-G"
)
analysis_epcs[pd.isnull(analysis_epcs["PHOTO_SUPPLY"])][["ADDRESS1", "POSTCODE"]].sample(1)
analysis_epcs["PHOTO_SUPPLY"] = analysis_epcs["PHOTO_SUPPLY"].fillna(0)
analysis_epcs["PHOTO_SUPPLY"] = analysis_epcs["PHOTO_SUPPLY"].astype(float)
analysis_epcs["has_solar"] = np.where(analysis_epcs["PHOTO_SUPPLY"] > 0, 1, 0)
analysis_epcs["has_solar"].value_counts()
analysis_epcs.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/bcc tender/analysis_epcs.csv", index=False)
# Create aggregations and we store this information

View file

@ -5,6 +5,7 @@ from sqlalchemy.orm import sessionmaker
from backend.app.db.connection import db_engine
from backend.app.db.models.recommendations import Recommendation, Plan, PlanRecommendations, Scenario
from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel
from utils.s3 import read_csv_from_s3
def get_data(portfolio_id, scenario_ids):
@ -415,3 +416,241 @@ def slides():
pd.set_option('display.max_rows', None)
# Show more characters in a column
pd.set_option('display.max_colwidth', None)
# preparing of this data for the following 2 needs:
# 1) dataset to share with Nextgen heating
# 2) Breakdown of results by property type
# get the asset list
asset_list = read_csv_from_s3(bucket_name="retrofit-plan-inputs-dev", filepath="8/90/pilot.csv")
asset_list = pd.DataFrame(asset_list)
# Get non-invasive recommendations
non_intrusive_recommendations = read_csv_from_s3(
bucket_name="retrofit-plan-inputs-dev",
filepath="8/90/non_invasive_recommendations.csv"
)
non_intrusive_recommendations = pd.DataFrame(non_intrusive_recommendations)
# Unnest this
import ast
survey_recs = []
for _, row in non_intrusive_recommendations.iterrows():
recs = ast.literal_eval(row["recommendations"])
ashp_rec = next((r for r in recs if r["type"] == "air_source_heat_pump"), None)
solar_rec = next((r for r in recs if r["type"] == "solar_pv"), None)
to_append = {
"uprn": row["uprn"]
}
if ashp_rec["suitable"]:
to_append = {
**to_append,
"ashp_suitable": True,
"ashp_size_kw": ashp_rec["size"],
"ashp_cost": ashp_rec["cost"],
}
if solar_rec["suitable"]:
to_append = {
**to_append,
"solar_suitable": True,
"solar_size_kwp": solar_rec["array_wattage"],
"solar_cost": solar_rec["cost"],
}
survey_recs.append(to_append)
survey_recs = pd.DataFrame(survey_recs)
asset_list["uprn"] = asset_list["uprn"].astype(int)
survey_recs["uprn"] = survey_recs["uprn"].astype(int)
vital_kwh = 7597
domna_kwh = 10850
scaling_factor = vital_kwh / domna_kwh
next_gen_dataset = properties_df[[
"uprn", "address", "postcode",
"property_type", "built_form", "current_energy_demand_heating_hotwater",
"mainfuel", "total_floor_area", "floor_height"
]].rename(
columns={
"mainfuel": "primary_fuel_type",
"total_floor_area": "gross_floor_area",
"current_energy_demand_heating_hotwater": "estimated_heating_hotwater_kwh"
}
).merge(
asset_list[["uprn", "number_of_floors"]],
how="left",
on="uprn"
).merge(
survey_recs,
how="left",
on="uprn"
)
next_gen_dataset["estimated_heating_hotwater_kwh_scaled"] = (
next_gen_dataset["estimated_heating_hotwater_kwh"] * scaling_factor
)
next_gen_dataset["ashp_suitable"] = next_gen_dataset["ashp_suitable"].fillna(False)
next_gen_dataset["solar_suitable"] = next_gen_dataset["solar_suitable"].fillna(False)
# We prepare the scenario outputs by property type
grouped_data = next_gen_dataset.copy()
grouped_data["property_sub_type"] = grouped_data["built_form"].copy()
# If a property is a flat, re-map sub_type just to flat
grouped_data.loc[grouped_data["property_type"] == "Flat", "property_sub_type"] = "Flat"
# Same for maisonettes
grouped_data.loc[grouped_data["property_type"] == "Maisonette", "property_sub_type"] = "Maisonette"
# We now pull out the recommendations impact by property type and sub type
property_scenario_impact = []
for scenario_id in scenario_ids:
# Get the recommendations for the scenario, default
scenario_recommendations = recommendations_df[
(recommendations_df["Scenario ID"] == scenario_id) &
(recommendations_df["default"] == True)
].copy()
scenario_recommendations['ligting_kwh'] = scenario_recommendations.apply(
lambda x: x['kwh_savings'] if x['type'] == 'low_energy_lighting' else 0,
axis=1)
scenario_recommendations['solar_kwh'] = scenario_recommendations.apply(
lambda x: x['kwh_savings'] if x['type'] == 'solar_pv' else 0, axis=1)
# Set 'Estimated Kwh Savings' to zero where specific kwh columns are used
scenario_recommendations['Estimated Kwh Savings'] = scenario_recommendations.apply(
lambda x: 0 if x['type'] in ['low_energy_lighting', 'solar_pv'] else x[
'kwh_savings'], axis=1)
scenario_grouped_data = scenario_recommendations.groupby(['property_id']).agg({
'Estimated Kwh Savings': 'sum',
"estimated_cost": "sum"
}).reset_index()
comparison = properties_df.drop_duplicates()[
["uprn", "property_id", "current_energy_demand_heating_hotwater"]
].merge(
scenario_grouped_data, on=["property_id"], how="left"
)
comparison["Estimated Kwh Savings"] = comparison["Estimated Kwh Savings"].fillna(0)
comparison["estimated_cost"] = comparison["estimated_cost"].fillna(0)
comparison["post_scenario_heating_hotwater_kwh"] = (
comparison["current_energy_demand_heating_hotwater"] - comparison["Estimated Kwh Savings"]
)
comparison["scenario_id"] = scenario_id
property_scenario_impact.append(comparison)
property_scenario_impact = pd.concat(property_scenario_impact)
property_scenario_impact = property_scenario_impact.drop(columns=["property_id", "Estimated Kwh Savings"])
# Scale
property_scenario_impact["post_scenario_heating_hotwater_kwh_scaled"] = (
property_scenario_impact["post_scenario_heating_hotwater_kwh"] * scaling_factor
)
grouped_data = grouped_data.merge(
property_scenario_impact, how="left", on="uprn"
)
# Agg the data
grouped_data = grouped_data.groupby(["property_type", "property_sub_type", "scenario_id"]).agg({
"estimated_heating_hotwater_kwh": "mean",
"estimated_heating_hotwater_kwh_scaled": "mean",
"estimated_cost": "mean",
"post_scenario_heating_hotwater_kwh": "mean",
"post_scenario_heating_hotwater_kwh_scaled": "mean"
}).reset_index()
scenario_names = pd.DataFrame(
[
{
"scenario_id": 47,
"scenario": "Demand Reduction cavity & roof insulation",
},
{
"scenario_id": 48,
"scenario": "Demand reduction no solid wall, floors or heating/renewables",
},
{
"scenario_id": 49,
"scenario": "Demand reduction no decant"
},
{
"scenario_id": 50,
"scenario": "Demand reduction no decant + heating & solar",
},
{
"scenario_id": 51,
"scenario": "Whole house retrofit"
}
]
)
grouped_data = grouped_data.merge(
scenario_names, how="left", on="scenario_id"
)
if not grouped_data[
grouped_data["estimated_heating_hotwater_kwh"] < grouped_data["post_scenario_heating_hotwater_kwh"]].empty:
raise Exception("someting went wrong")
if not grouped_data[grouped_data["estimated_heating_hotwater_kwh_scaled"] < grouped_data[
"post_scenario_heating_hotwater_kwh_scaled"]].empty:
raise Exception("someting went wrong")
# Reorder the columns
grouped_data = grouped_data[
[
'property_type',
'property_sub_type',
'scenario',
'estimated_heating_hotwater_kwh',
'post_scenario_heating_hotwater_kwh',
'estimated_heating_hotwater_kwh_scaled',
'post_scenario_heating_hotwater_kwh_scaled',
'estimated_cost',
]
]
grouped_data = grouped_data.rename(
columns={
"property_type": "Property Type",
"property_sub_type": "Property Sub Type",
"scenario": "Scenario",
"estimated_heating_hotwater_kwh": "Estimated Heating & Hot Water kwh",
"post_scenario_heating_hotwater_kwh": "Post Scenario Heating & Hot Water kwh",
"estimated_heating_hotwater_kwh_scaled": "Estimated Heating & Hot Water kwh (scaled)",
"post_scenario_heating_hotwater_kwh_scaled": "Post Scenario Heating & Hot Water kwh (scaled)",
"estimated_cost": "Estimated Cost or Retrofit",
}
)
grouped_data.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Newhaven/outputs/Scenario kWh Impact by Property "
"Type.xlsx",
index=False
)
property_scenario_impact = property_scenario_impact.merge(
scenario_names, how="left", on="scenario_id"
)
df_pivot = property_scenario_impact.pivot_table(index='uprn', columns='scenario',
values=['post_scenario_heating_hotwater_kwh',
'post_scenario_heating_hotwater_kwh_scaled'])
# Flattening multi-index columns
df_pivot.columns = [f'{col[0]}_{col[1]}' for col in df_pivot.columns]
# Reset the index to have a clean dataframe
df_pivot.reset_index(inplace=True)
next_gen_dataset = next_gen_dataset.merge(
df_pivot, how="left", on="uprn"
)
next_gen_dataset.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Newhaven/outputs/next_gen_dataset.csv", index=False
)

View file

@ -0,0 +1,159 @@
import msgpack
import pandas as pd
from utils.s3 import read_from_s3
from recommendations.recommendation_utils import (
estimate_number_of_floors, esimtate_pitched_roof_area, estimate_external_wall_area, estimate_perimeter
)
def app():
"""
Aims to estimate the amount of GBIS funding eligible
:return:
"""
cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-dev"
)
cleaned = msgpack.unpackb(cleaned, raw=False)
epc_data = pd.read_excel(
"/Users/khalimconn-kowlessar/Downloads/20240820 portfolio_epc_data.xlsx"
)
# For simplicity, get roofs or cavities
epc_data = epc_data.merge(
pd.DataFrame(cleaned["roof-description"]),
how="left",
left_on="ROOF_DESCRIPTION",
right_on="original_description"
)
epc_data["needs_roof_work"] = epc_data["insulation_thickness"].isin(
[
None,
"100",
'150',
'50',
'75',
'below average',
'25',
'12'
]
) & (epc_data["is_flat"] | epc_data["is_pitched"])
epc_data = epc_data.merge(
pd.DataFrame(cleaned["walls-description"]),
how="left",
left_on="WALLS_DESCRIPTION",
right_on="original_description",
suffixes=("", "_wall")
)
epc_data["needs_cavity_done"] = epc_data["is_cavity_wall"] & epc_data["insulation_thickness_wall"].isin(
['none', "below average"]
)
loft_insulation_per_m2 = 16.07
flat_roof_insulation_per_m2 = 195
cwi_per_m2 = 14.21
gbis_abs = 30
# We assume the work will take the home from a high D to a low D
def get_abs(floor_area):
if floor_area <= 72:
return 155
if floor_area <= 97:
return 169
if floor_area <= 199:
return 196.4
return 350.1
estimated_costs = []
for _, home in epc_data.iterrows():
to_append = {
"uprn": home["UPRN"],
"address": home["ADDRESS"],
"postcode": home["POSTCODE"],
}
project_abs = get_abs(home["TOTAL_FLOOR_AREA"])
available_funding = project_abs * gbis_abs
n_floors = estimate_number_of_floors(home["PROPERTY_TYPE"])
floor_height = float(home["FLOOR_HEIGHT"]) if not pd.isnull(home["FLOOR_HEIGHT"]) else 2.5
# Check if it needs the walls done
if home["needs_cavity_done"]:
# We estimate the amount of insulation required
est_perimeter = estimate_perimeter(
floor_area=float(home["TOTAL_FLOOR_AREA"]) / n_floors,
num_rooms=float(home["NUMBER_HABITABLE_ROOMS"]) / n_floors
)
insulation_needed = estimate_external_wall_area(
num_floors=n_floors,
floor_height=floor_height,
perimeter=est_perimeter,
built_form=home["BUILT_FORM"],
)
cost_of_insulation = insulation_needed * cwi_per_m2
if available_funding > cost_of_insulation:
available_funding = cost_of_insulation
to_append = {
**to_append,
"available_funding": available_funding,
"measure": "Cavity Wall Insulation",
"project_abs": project_abs
}
estimated_costs.append(to_append)
continue
if home["needs_roof_work"]:
# We estimate how much the cost of insulation would be
if home["is_pitched"]:
measure = "Loft Insulation"
roof_area = float(home["TOTAL_FLOOR_AREA"]) / n_floors
cost_of_insulation = roof_area * loft_insulation_per_m2
else:
measure = "Flat Roof Insulation"
roof_area = float(home["TOTAL_FLOOR_AREA"]) / n_floors
cost_of_insulation = roof_area * flat_roof_insulation_per_m2
if available_funding > cost_of_insulation:
available_funding = cost_of_insulation
to_append = {
**to_append,
"available_funding": available_funding,
"measure": measure,
"project_abs": project_abs
}
estimated_costs.append(to_append)
continue
estimated_costs = pd.DataFrame(estimated_costs)
estimated_costs.groupby("measure")["available_funding"].mean()
estimated_costs["measure"].value_counts()
estimated_costs.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/sfr/estimated_costs_gbis.csv")
epc_data[["UPRN", "ADDRESS", "POSTCODE"]].to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/sfr/council_tax_bands_sample.csv")
n_properties_for_ashp = epc_data[
(epc_data["PROPERTY_TYPE"] == "House") &
(epc_data["BUILT_FORM"].isin(["Detached", "Semi-Detached"]))
].shape[0]