Model/sfr/principal_pitch/2_export_data.py
2026-01-16 14:17:46 +00:00

387 lines
12 KiB
Python

"""
This script prepares the data for the financial model
"""
import pandas as pd
import numpy as np
from backend.app.utils import sap_to_epc
from sqlalchemy.orm import sessionmaker
from backend.app.db.connection import db_engine, db_read_session
from backend.app.db.models.recommendations import Recommendation, Plan, PlanRecommendations, RecommendationMaterials
from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel, PropertyDetailsSpatial
from backend.app.db.functions.materials_functions import get_materials
from collections import defaultdict
# PORTFOLIO_ID = 206
# SCENARIOS = [389]
PORTFOLIO_ID = 449 # Peabody
SCENARIOS = [
922
]
scenario_names = {
922: "EPC C",
}
def get_data(portfolio_id, scenario_ids):
session = sessionmaker(bind=db_engine)()
session.begin()
# --------------------
# Properties
# --------------------
properties_query = session.query(
PropertyModel,
PropertyDetailsEpcModel
).join(
PropertyDetailsEpcModel,
PropertyModel.id == PropertyDetailsEpcModel.property_id
).filter(
PropertyModel.portfolio_id == portfolio_id
).all()
properties_data = [
{
**{col.name: getattr(p.PropertyModel, col.name)
for col in PropertyModel.__table__.columns},
**{col.name: getattr(p.PropertyDetailsEpcModel, col.name)
for col in PropertyDetailsEpcModel.__table__.columns},
}
for p in properties_query
]
# --------------------
# Plans
# --------------------
plans_query = session.query(Plan).filter(
Plan.scenario_id.in_(scenario_ids)
).all()
plans_data = [
{col.name: getattr(plan, col.name) for col in Plan.__table__.columns}
for plan in plans_query
]
plan_ids = [p["id"] for p in plans_data]
# --------------------
# Recommendations (NO materials yet)
# --------------------
recommendations_query = session.query(
Recommendation,
Plan.scenario_id
).join(
PlanRecommendations,
Recommendation.id == PlanRecommendations.recommendation_id
).join(
Plan,
Plan.id == PlanRecommendations.plan_id
).filter(
PlanRecommendations.plan_id.in_(plan_ids),
Recommendation.default.is_(True),
Recommendation.already_installed.is_(False)
).all()
recommendations_data = [
{
**{col.name: getattr(r.Recommendation, col.name)
for col in Recommendation.__table__.columns},
"scenario_id": r.scenario_id,
"materials": [] # placeholder
}
for r in recommendations_query
]
recommendation_ids = [r["id"] for r in recommendations_data]
# --------------------
# Recommendation materials (SEPARATE QUERY)
# --------------------
materials_query = session.query(
RecommendationMaterials
).filter(
RecommendationMaterials.recommendation_id.in_(recommendation_ids)
).all()
# Group materials by recommendation_id
materials_by_recommendation = defaultdict(list)
for m in materials_query:
materials_by_recommendation[m.recommendation_id].append({
"material_id": m.material_id,
"depth": m.depth,
"quantity": m.quantity,
"quantity_unit": m.quantity_unit,
"estimated_cost": m.estimated_cost,
})
# Attach materials safely (no filtering side effects)
for r in recommendations_data:
r["materials"] = materials_by_recommendation.get(r["id"], [])
session.close()
return properties_data, plans_data, recommendations_data
properties_data, plans_data, recommendations_data = get_data(portfolio_id=PORTFOLIO_ID, scenario_ids=SCENARIOS)
properties_df = pd.DataFrame(properties_data)
plans_df = pd.DataFrame(plans_data)
recommendations_df = pd.DataFrame(recommendations_data)
with db_read_session() as session:
materials = get_materials(session)
materials = pd.DataFrame(materials)
material_lookup = (
materials
.set_index("id")[["type", "includes_battery"]]
.to_dict("index")
)
def has_solar_with_battery(materials_list):
for m in materials_list or []:
mat = material_lookup.get(m["material_id"])
if not mat:
continue
if mat["type"] == "solar_pv" and mat["includes_battery"]:
return True
return False
recommendations_df["has_solar_with_battery"] = (
recommendations_df["materials"].apply(has_solar_with_battery)
)
recommendations_df["measure_type"] = np.where(
recommendations_df["has_solar_with_battery"] == True,
recommendations_df["measure_type"] + "_with_battery",
recommendations_df["measure_type"]
)
# Adjust material type to indicate if there is a battery included
from utils.s3 import read_csv_from_s3, read_excel_from_s3
# asset_list = read_excel_from_s3(
# bucket_name="retrofit-plan-inputs-dev", file_key="2/404/20251211T163200754Z/asset_list.xlsx",
# header_row=0, sheet_name="Standardised Asset List"
# )
for scenario_id in SCENARIOS:
# Get recs for this scenario
recommended_measures_df = recommendations_df[recommendations_df["scenario_id"] == scenario_id][
["property_id", "measure_type", "estimated_cost", "default"]
]
recommended_measures_df = recommended_measures_df[recommended_measures_df["default"]]
recommended_measures_df = recommended_measures_df.drop(columns=["default"])
post_install_sap = recommendations_df[recommendations_df["scenario_id"] == scenario_id][
["property_id", "default", "sap_points"]]
post_install_sap = post_install_sap[post_install_sap["default"]]
# Sum up the sap points by property id
post_install_sap = post_install_sap.groupby(["property_id"])[["sap_points"]].sum().reset_index()
# Find dupes by property id and measure type
dupes = recommended_measures_df.duplicated(subset=["property_id", "measure_type"], keep=False)
dupe_df = recommended_measures_df[dupes]
if dupe_df.shape:
# Drop dupes - happened due to a funny bug
recommended_measures_df = recommended_measures_df.drop_duplicates(
subset=["property_id", "measure_type"], keep='first'
)
recommendations_measures_pivot = recommended_measures_df.pivot(
index='property_id',
columns='measure_type',
values='estimated_cost'
)
recommendations_measures_pivot = recommendations_measures_pivot.reset_index()
# Total cost is the row sum, excluding the property_id column
recommendations_measures_pivot["total_retrofit_cost"] = recommendations_measures_pivot.drop(
columns=["property_id"]
).sum(axis=1)
df = properties_df[
[
"landlord_property_id", "property_id", "uprn", "address", "postcode", "property_type", "walls", "roof",
"heating", "windows", "current_epc_rating", "current_sap_points", "total_floor_area", "number_of_rooms",
]
].merge(
recommendations_measures_pivot, how="left", on="property_id"
).merge(
post_install_sap, how="left", on="property_id"
)
df = df.drop(columns=["property_id"])
df["sap_points"] = df["sap_points"].fillna(0)
df["predicted_post_works_sap"] = df["current_sap_points"] + df["sap_points"]
df["predicted_post_works_sap"] = df["predicted_post_works_sap"].round()
df["predicted_post_works_epc"] = df["predicted_post_works_sap"].apply(lambda x: sap_to_epc(x))
df["uprn"] = df["uprn"].astype(str)
# Create excel to store to
filename = (f"{scenario_names[scenario_id]}-clarion.xlsx")
with pd.ExcelWriter(filename) as writer:
df.to_excel(writer, sheet_name="properties", index=False)
#( Junte) don't need anything below this
# asset_list = pd.DataFrame(asset_list)
# asset_list = asset_list.rename(
# columns={
# "postcode": "domna_postcode"
# }
# )
# if "domna_full_address":
# # For Peabody
# asset_list["domna_full_address"] = asset_list["domna_address_1"]
#
# asset_list = asset_list[["domna_full_address", "domna_postcode", "epc_os_uprn", ]].copy()
# asset_list = asset_list.rename(columns={"epc_os_uprn": "uprn"})
# asset_list["uprn"] = asset_list["uprn"].astype("Int64").astype(str)
# asset_list = asset_list.merge(
# df.drop(columns=["address", "postcode", "property_type", "total_floor_area"]),
# how="left",
# on="uprn"
# )
# Get conservation area data from property details spatial. based on the UPRNs
def get_conservation_area_data(uprns):
session = sessionmaker(bind=db_engine)()
session.begin()
# Query to get conservation area data
spatial_query = session.query(
PropertyDetailsSpatial
).filter(
PropertyDetailsSpatial.uprn.in_(uprns) # Filter by UPRNs
).all()
# Transform spatial data to include all fields dynamically
spatial_data = [
{col.name: getattr(spatial, col.name) for col in PropertyDetailsSpatial.__table__.columns}
for spatial in spatial_query
]
session.close()
return pd.DataFrame(spatial_data)
uprns = asset_list[
~pd.isna(asset_list["uprn"]) & (asset_list["uprn"] != "<NA>")
]["uprn"].astype(int).unique().tolist()
conservation_area_data = get_conservation_area_data(uprns)
conservation_area_data["uprn"] = conservation_area_data["uprn"].astype(str)
asset_list = asset_list.merge(
conservation_area_data[["uprn", "conservation_status", "is_listed_building", "is_heritage_building"]],
how="left",
on="uprn"
)
# For exporting
df.to_excel(
"EPC C -without floors proposed measures - "
"with ID.xlsx",
index=False
)
# asset_list.to_excel(
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Lincs Rural/epc_measures.xlsx",
# index=False
# )
condition_costs = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/sfr/Spring JV/Condition costs.xlsx",
sheet_name="Prices - Khalim",
header=35
)
# Remove unnamed columns and reset index
condition_costs = condition_costs.loc[:, ~condition_costs.columns.str.contains('^Unnamed')]
condition_costs = condition_costs.reset_index(drop=True)
# We now estimate condition cost
def simulate_condition(asset_list, condition_costs):
"""
This function is for testing, and will simulate condition cost from 1-10 for each property to see what the
costing array looks like.
:param df:
:return:
"""
condition_df = []
for _, row in asset_list.iterrows():
n_bathrooms = row["bathrooms"]
conditions = {}
for condition in reversed(range(1, 11)):
condition_cost = condition_costs[
condition_costs["Condition"] == condition
].drop(columns=["Condition"]).iloc[0]
# Each cost is scaled by floor area
condition_cost = condition_cost * row["total_floor_area"]
condition_cost["Bathroom"] = condition_cost["Bathroom"] * n_bathrooms
total_condition_cost = condition_cost.sum()
conditions["Condition " + str(condition)] = (total_condition_cost)
condition_df.append(
{
"uprn": row["uprn"],
**conditions
}
)
condition_df = pd.DataFrame(condition_df)
asset_list = asset_list.merge(
condition_df,
how="left",
on="uprn"
)
return asset_list
# asset_list = simulate_condition(asset_list, condition_costs)
# We calculate the condition cost based on the condition
for _, row in asset_list.iterrows():
condition = row["condition_score"]
if condition in [None, ""]:
continue
condition = int(float(condition))
condition_cost = condition_costs[
condition_costs["Condition"] == condition
].drop(columns=["Condition"]).iloc[0]
# Each cost is scaled by floor area
condition_cost = condition_cost * float(row["total_floor_area"])
n_bathrooms = row["n_bathrooms"]
condition_cost["Bathroom"] = condition_cost["Bathroom"] * float(n_bathrooms)
total_condition_cost = condition_cost.sum()
asset_list.loc[asset_list["uprn"] == row["uprn"], "domna_condition_cost"] = total_condition_cost
# Store output
asset_list.to_excel(
"fabric_clarian_packages.xlsx",
index=False
)
condition_cost_comparison = asset_list[
["condition_score", "decoration_sum_min ", "decoration_sum_max", "domna_condition_cost"]
]