mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
334 lines
10 KiB
Python
334 lines
10 KiB
Python
"""
|
|
This script prepares the data for the financial model
|
|
"""
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv(".env.local")
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from backend.app.utils import sap_to_epc
|
|
from sqlalchemy.orm import sessionmaker
|
|
from backend.app.db.connection import db_engine, db_read_session
|
|
from backend.app.db.models.recommendations import (
|
|
Recommendation,
|
|
Plan,
|
|
PlanRecommendations,
|
|
RecommendationMaterials,
|
|
)
|
|
from backend.app.db.models.portfolio import (
|
|
PropertyModel,
|
|
PropertyDetailsEpcModel,
|
|
PropertyDetailsSpatial,
|
|
)
|
|
from backend.app.db.functions.materials_functions import get_materials
|
|
from collections import defaultdict
|
|
from sqlalchemy import func
|
|
|
|
# PORTFOLIO_ID = 206
|
|
# SCENARIOS = [389]
|
|
PORTFOLIO_ID = 524
|
|
SCENARIOS = [
|
|
1009,
|
|
]
|
|
scenario_names = {
|
|
1009: "EPC C; Most Economic",
|
|
}
|
|
|
|
|
|
def get_data(portfolio_id, scenario_ids):
|
|
session = sessionmaker(bind=db_engine)()
|
|
session.begin()
|
|
|
|
# --------------------
|
|
# Properties
|
|
# --------------------
|
|
properties_query = (
|
|
session.query(PropertyModel, PropertyDetailsEpcModel)
|
|
.join(
|
|
PropertyDetailsEpcModel,
|
|
PropertyModel.id == PropertyDetailsEpcModel.property_id,
|
|
)
|
|
.filter(PropertyModel.portfolio_id == portfolio_id)
|
|
.all()
|
|
)
|
|
|
|
properties_data = [
|
|
{
|
|
**{
|
|
col.name: getattr(p.PropertyModel, col.name)
|
|
for col in PropertyModel.__table__.columns
|
|
},
|
|
**{
|
|
col.name: getattr(p.PropertyDetailsEpcModel, col.name)
|
|
for col in PropertyDetailsEpcModel.__table__.columns
|
|
},
|
|
}
|
|
for p in properties_query
|
|
]
|
|
|
|
# --------------------
|
|
# Plans
|
|
# --------------------
|
|
latest_plans_subq = (
|
|
session.query(
|
|
Plan.scenario_id,
|
|
Plan.property_id,
|
|
func.max(Plan.created_at).label("latest_created_at"),
|
|
)
|
|
.filter(Plan.scenario_id.in_(scenario_ids))
|
|
.group_by(Plan.scenario_id, Plan.property_id)
|
|
.subquery()
|
|
)
|
|
|
|
# plans_query = session.query(Plan).filter(
|
|
# Plan.scenario_id.in_(scenario_ids)
|
|
# ).all()
|
|
|
|
plans_query = (
|
|
session.query(Plan)
|
|
.join(
|
|
latest_plans_subq,
|
|
(Plan.scenario_id == latest_plans_subq.c.scenario_id)
|
|
& (Plan.property_id == latest_plans_subq.c.property_id)
|
|
& (Plan.created_at == latest_plans_subq.c.latest_created_at),
|
|
)
|
|
.all()
|
|
)
|
|
|
|
# plans_query = (
|
|
# session.query(Plan)
|
|
# .join(
|
|
# latest_plans_subq,
|
|
# (Plan.scenario_id == latest_plans_subq.c.scenario_id) &
|
|
# (Plan.created_at == latest_plans_subq.c.latest_created_at)
|
|
# )
|
|
# .all()
|
|
# )
|
|
|
|
plans_data = [
|
|
{col.name: getattr(plan, col.name) for col in Plan.__table__.columns}
|
|
for plan in plans_query
|
|
]
|
|
|
|
plan_ids = [p["id"] for p in plans_data]
|
|
|
|
# --------------------
|
|
# Recommendations (NO materials yet)
|
|
# --------------------
|
|
recommendations_query = (
|
|
session.query(Recommendation, Plan.scenario_id, PlanRecommendations.plan_id)
|
|
.join(
|
|
PlanRecommendations,
|
|
Recommendation.id == PlanRecommendations.recommendation_id,
|
|
)
|
|
.join(Plan, Plan.id == PlanRecommendations.plan_id)
|
|
.filter(
|
|
PlanRecommendations.plan_id.in_(plan_ids),
|
|
Recommendation.default.is_(True),
|
|
Recommendation.already_installed.is_(False),
|
|
)
|
|
.all()
|
|
)
|
|
|
|
recommendations_data = [
|
|
{
|
|
**{
|
|
col.name: getattr(r.Recommendation, col.name)
|
|
for col in Recommendation.__table__.columns
|
|
},
|
|
"scenario_id": r.scenario_id,
|
|
"materials": [], # placeholder
|
|
}
|
|
for r in recommendations_query
|
|
]
|
|
|
|
recommendation_ids = [r["id"] for r in recommendations_data]
|
|
|
|
# --------------------
|
|
# Recommendation materials (SEPARATE QUERY)
|
|
# --------------------
|
|
materials_query = (
|
|
session.query(RecommendationMaterials)
|
|
.filter(RecommendationMaterials.recommendation_id.in_(recommendation_ids))
|
|
.all()
|
|
)
|
|
|
|
# Group materials by recommendation_id
|
|
materials_by_recommendation = defaultdict(list)
|
|
|
|
for m in materials_query:
|
|
materials_by_recommendation[m.recommendation_id].append(
|
|
{
|
|
"material_id": m.material_id,
|
|
"depth": m.depth,
|
|
"quantity": m.quantity,
|
|
"quantity_unit": m.quantity_unit,
|
|
"estimated_cost": m.estimated_cost,
|
|
}
|
|
)
|
|
|
|
# Attach materials safely (no filtering side effects)
|
|
for r in recommendations_data:
|
|
r["materials"] = materials_by_recommendation.get(r["id"], [])
|
|
|
|
session.close()
|
|
|
|
return properties_data, plans_data, recommendations_data
|
|
|
|
|
|
properties_data, plans_data, recommendations_data = get_data(
|
|
portfolio_id=PORTFOLIO_ID, scenario_ids=SCENARIOS
|
|
)
|
|
|
|
properties_df = pd.DataFrame(properties_data)
|
|
plans_df = pd.DataFrame(plans_data)
|
|
recommendations_df = pd.DataFrame(recommendations_data)
|
|
|
|
with db_read_session() as session:
|
|
materials = get_materials(session)
|
|
|
|
materials = pd.DataFrame(materials)
|
|
|
|
material_lookup = materials.set_index("id")[["type", "includes_battery"]].to_dict(
|
|
"index"
|
|
)
|
|
|
|
|
|
def has_solar_with_battery(materials_list):
|
|
for m in materials_list or []:
|
|
mat = material_lookup.get(m["material_id"])
|
|
if not mat:
|
|
continue
|
|
if mat["type"] == "solar_pv" and mat["includes_battery"]:
|
|
return True
|
|
return False
|
|
|
|
|
|
recommendations_df["has_solar_with_battery"] = recommendations_df["materials"].apply(
|
|
has_solar_with_battery
|
|
)
|
|
|
|
recommendations_df["measure_type"] = np.where(
|
|
recommendations_df["has_solar_with_battery"] == True,
|
|
recommendations_df["measure_type"] + "_with_battery",
|
|
recommendations_df["measure_type"],
|
|
)
|
|
|
|
# Adjust material type to indicate if there is a battery included
|
|
|
|
from utils.s3 import read_csv_from_s3, read_excel_from_s3
|
|
|
|
# asset_list = read_excel_from_s3(
|
|
# bucket_name="retrofit-plan-inputs-dev", file_key="2/404/20251211T163200754Z/asset_list.xlsx",
|
|
# header_row=0, sheet_name="Standardised Asset List"
|
|
# )
|
|
|
|
|
|
for scenario_id in SCENARIOS:
|
|
# Get recs for this scenario
|
|
recommended_measures_df = recommendations_df[
|
|
recommendations_df["scenario_id"] == scenario_id
|
|
][["property_id", "measure_type", "estimated_cost", "default"]]
|
|
recommended_measures_df = recommended_measures_df[
|
|
recommended_measures_df["default"]
|
|
]
|
|
recommended_measures_df = recommended_measures_df.drop(columns=["default"])
|
|
|
|
post_install_sap = recommendations_df[
|
|
recommendations_df["scenario_id"] == scenario_id
|
|
][["property_id", "default", "sap_points"]]
|
|
post_install_sap = post_install_sap[post_install_sap["default"]]
|
|
# Sum up the sap points by property id
|
|
post_install_sap = (
|
|
post_install_sap.groupby(["property_id"])[["sap_points"]].sum().reset_index()
|
|
)
|
|
|
|
# Find dupes by property id and measure type
|
|
dupes = recommended_measures_df.duplicated(
|
|
subset=["property_id", "measure_type"], keep=False
|
|
)
|
|
dupe_df = recommended_measures_df[dupes]
|
|
|
|
if dupe_df.shape:
|
|
# Drop dupes - happened due to a funny bug
|
|
recommended_measures_df = recommended_measures_df.drop_duplicates(
|
|
subset=["property_id", "measure_type"], keep="first"
|
|
)
|
|
|
|
recommendations_measures_pivot = recommended_measures_df.pivot(
|
|
index="property_id", columns="measure_type", values="estimated_cost"
|
|
)
|
|
recommendations_measures_pivot = recommendations_measures_pivot.reset_index()
|
|
|
|
# Total cost is the row sum, excluding the property_id column
|
|
recommendations_measures_pivot["total_retrofit_cost"] = (
|
|
recommendations_measures_pivot.drop(columns=["property_id"]).sum(axis=1)
|
|
)
|
|
|
|
df = (
|
|
properties_df[
|
|
[
|
|
"landlord_property_id",
|
|
"property_id",
|
|
"uprn",
|
|
"address",
|
|
"postcode",
|
|
"property_type",
|
|
"walls",
|
|
"roof",
|
|
"heating",
|
|
"windows",
|
|
"current_epc_rating",
|
|
"current_sap_points",
|
|
"total_floor_area",
|
|
"number_of_rooms",
|
|
"id",
|
|
]
|
|
]
|
|
.merge(recommendations_measures_pivot, how="left", on="property_id")
|
|
.merge(post_install_sap, how="left", on="property_id")
|
|
)
|
|
|
|
# df = df.drop(columns=["property_id"])
|
|
df["sap_points"] = df["sap_points"].fillna(0)
|
|
|
|
df["predicted_post_works_sap"] = df["current_sap_points"] + df["sap_points"]
|
|
df["predicted_post_works_sap"] = df["predicted_post_works_sap"]
|
|
df["predicted_post_works_epc"] = df["predicted_post_works_sap"].apply(
|
|
lambda x: sap_to_epc(x)
|
|
)
|
|
df["uprn"] = df["uprn"].astype(str)
|
|
|
|
relevant_plans = plans_df[plans_df["scenario_id"] == scenario_id]
|
|
df2 = df.merge(
|
|
relevant_plans[["property_id", "post_sap_points", "post_epc_rating"]],
|
|
how="left",
|
|
on="property_id",
|
|
suffixes=("", "_plan"),
|
|
)
|
|
print(df2["predicted_post_works_epc"].value_counts())
|
|
print(df2["post_epc_rating"].value_counts())
|
|
|
|
z = df2[
|
|
(df2["predicted_post_works_epc"] != "D")
|
|
& (df2["post_epc_rating"].astype(str) == "Epc.D")
|
|
]
|
|
|
|
df2["predicted_post_works_epc"].value_counts()
|
|
df2["post_epc_rating"].astype(str).value_counts()
|
|
|
|
df2[df2["total_retrofit_cost"] > 0].shape
|
|
|
|
getting_works = df[df["total_retrofit_cost"] > 0]
|
|
getting_works["predicted_post_works_epc"].value_counts()
|
|
|
|
32565 / getting_works.shape[0]
|
|
|
|
df[df["predicted_post_works_sap"] == ""]
|
|
|
|
# Create excel to store to
|
|
filename = f"{scenario_names[scenario_id]} - 20250113 final.xlsx"
|
|
with pd.ExcelWriter(filename) as writer:
|
|
df.to_excel(writer, sheet_name="properties", index=False)
|