Model/sfr/principal_pitch/2_export_data.py
Khalim Conn-Kowlessar 87afac8631 minor exporting data
2026-04-30 09:58:24 +01:00

334 lines
10 KiB
Python

"""
This script prepares the data for the financial model
"""
from dotenv import load_dotenv
load_dotenv(".env.local")
import pandas as pd
import numpy as np
from backend.app.utils import sap_to_epc
from sqlalchemy.orm import sessionmaker
from backend.app.db.connection import db_engine, db_read_session
from backend.app.db.models.recommendations import (
Recommendation,
PlanModel,
PlanRecommendations,
RecommendationMaterials,
)
from backend.app.db.models.portfolio import (
PropertyModel,
PropertyDetailsEpcModel,
PropertyDetailsSpatial,
)
from backend.app.db.functions.materials_functions import get_materials
from collections import defaultdict
from sqlalchemy import func
PORTFOLIO_ID = 632
SCENARIOS = [1144]
scenario_names = {
1144: "EPC C",
}
project_name = "Calico"
def get_data(portfolio_id, scenario_ids):
session = sessionmaker(bind=db_engine)()
session.begin()
# --------------------
# Properties
# --------------------
properties_query = (
session.query(PropertyModel, PropertyDetailsEpcModel)
.join(
PropertyDetailsEpcModel,
PropertyModel.id == PropertyDetailsEpcModel.property_id,
)
.filter(PropertyModel.portfolio_id == portfolio_id)
.all()
)
properties_data = [
{
**{
col.name: getattr(p.PropertyModel, col.name)
for col in PropertyModel.__table__.columns
},
**{
col.name: getattr(p.PropertyDetailsEpcModel, col.name)
for col in PropertyDetailsEpcModel.__table__.columns
},
}
for p in properties_query
]
# --------------------
# Plans
# --------------------
latest_plans_subq = (
session.query(
PlanModel.scenario_id,
PlanModel.property_id,
func.max(PlanModel.created_at).label("latest_created_at"),
)
.filter(PlanModel.scenario_id.in_(scenario_ids))
.group_by(PlanModel.scenario_id, PlanModel.property_id)
.subquery()
)
# plans_query = session.query(Plan).filter(
# Plan.scenario_id.in_(scenario_ids)
# ).all()
plans_query = (
session.query(PlanModel)
.join(
latest_plans_subq,
(PlanModel.scenario_id == latest_plans_subq.c.scenario_id)
& (PlanModel.property_id == latest_plans_subq.c.property_id)
& (PlanModel.created_at == latest_plans_subq.c.latest_created_at),
)
.all()
)
# plans_query = (
# session.query(Plan)
# .join(
# latest_plans_subq,
# (Plan.scenario_id == latest_plans_subq.c.scenario_id) &
# (Plan.created_at == latest_plans_subq.c.latest_created_at)
# )
# .all()
# )
plans_data = [
{col.name: getattr(plan, col.name) for col in PlanModel.__table__.columns}
for plan in plans_query
]
plan_ids = [p["id"] for p in plans_data]
# --------------------
# Recommendations (NO materials yet)
# --------------------
recommendations_query = (
session.query(
Recommendation, PlanModel.scenario_id, PlanRecommendations.plan_id
)
.join(
PlanRecommendations,
Recommendation.id == PlanRecommendations.recommendation_id,
)
.join(PlanModel, PlanModel.id == PlanRecommendations.plan_id)
.filter(
PlanRecommendations.plan_id.in_(plan_ids),
Recommendation.default.is_(True),
Recommendation.already_installed.is_(False),
)
.all()
)
recommendations_data = [
{
**{
col.name: getattr(r.Recommendation, col.name)
for col in Recommendation.__table__.columns
},
"scenario_id": r.scenario_id,
"materials": [], # placeholder
}
for r in recommendations_query
]
recommendation_ids = [r["id"] for r in recommendations_data]
# --------------------
# Recommendation materials (SEPARATE QUERY)
# --------------------
materials_query = (
session.query(RecommendationMaterials)
.filter(RecommendationMaterials.recommendation_id.in_(recommendation_ids))
.all()
)
# Group materials by recommendation_id
materials_by_recommendation = defaultdict(list)
for m in materials_query:
materials_by_recommendation[m.recommendation_id].append(
{
"material_id": m.material_id,
"depth": m.depth,
"quantity": m.quantity,
"quantity_unit": m.quantity_unit,
"estimated_cost": m.estimated_cost,
}
)
# Attach materials safely (no filtering side effects)
for r in recommendations_data:
r["materials"] = materials_by_recommendation.get(r["id"], [])
session.close()
return properties_data, plans_data, recommendations_data
properties_data, plans_data, recommendations_data = get_data(
portfolio_id=PORTFOLIO_ID, scenario_ids=SCENARIOS
)
properties_df = pd.DataFrame(properties_data)
plans_df = pd.DataFrame(plans_data)
recommendations_df = pd.DataFrame(recommendations_data)
with db_read_session() as session:
materials = get_materials(session)
materials = pd.DataFrame(materials)
material_lookup = materials.set_index("id")[["type", "includes_battery"]].to_dict(
"index"
)
def has_solar_with_battery(materials_list):
for m in materials_list or []:
mat = material_lookup.get(m["material_id"])
if not mat:
continue
if mat["type"] == "solar_pv" and mat["includes_battery"]:
return True
return False
recommendations_df["has_solar_with_battery"] = recommendations_df["materials"].apply(
has_solar_with_battery
)
recommendations_df["measure_type"] = np.where(
recommendations_df["has_solar_with_battery"] == True,
recommendations_df["measure_type"] + "_with_battery",
recommendations_df["measure_type"],
)
# Adjust material type to indicate if there is a battery included
from utils.s3 import read_csv_from_s3, read_excel_from_s3
# asset_list = read_excel_from_s3(
# bucket_name="retrofit-plan-inputs-dev", file_key="2/404/20251211T163200754Z/asset_list.xlsx",
# header_row=0, sheet_name="Standardised Asset List"
# )
for scenario_id in SCENARIOS:
# Get recs for this scenario
recommended_measures_df = recommendations_df[
recommendations_df["scenario_id"] == scenario_id
][["property_id", "measure_type", "estimated_cost", "default"]]
recommended_measures_df = recommended_measures_df[
recommended_measures_df["default"]
]
recommended_measures_df = recommended_measures_df.drop(columns=["default"])
post_install_sap = recommendations_df[
recommendations_df["scenario_id"] == scenario_id
][["property_id", "default", "sap_points"]]
post_install_sap = post_install_sap[post_install_sap["default"]]
# Sum up the sap points by property id
post_install_sap = (
post_install_sap.groupby(["property_id"])[["sap_points"]].sum().reset_index()
)
# Find dupes by property id and measure type
dupes = recommended_measures_df.duplicated(
subset=["property_id", "measure_type"], keep=False
)
dupe_df = recommended_measures_df[dupes]
if dupe_df.shape:
# Drop dupes - happened due to a funny bug
recommended_measures_df = recommended_measures_df.drop_duplicates(
subset=["property_id", "measure_type"], keep="first"
)
recommendations_measures_pivot = recommended_measures_df.pivot(
index="property_id", columns="measure_type", values="estimated_cost"
)
recommendations_measures_pivot = recommendations_measures_pivot.reset_index()
# Total cost is the row sum, excluding the property_id column
recommendations_measures_pivot["total_retrofit_cost"] = (
recommendations_measures_pivot.drop(columns=["property_id"]).sum(axis=1)
)
df = (
properties_df[
[
"landlord_property_id",
"property_id",
"uprn",
"address",
"postcode",
"property_type",
"walls",
"roof",
"heating",
"windows",
"current_epc_rating",
"current_sap_points",
"original_sap_points",
"total_floor_area",
"number_of_rooms",
"lodgement_date",
"is_expired",
"id",
]
]
.merge(recommendations_measures_pivot, how="left", on="property_id")
.merge(post_install_sap, how="left", on="property_id")
)
# df = df.drop(columns=["property_id"])
df["sap_points"] = df["sap_points"].fillna(0)
df["predicted_post_works_sap"] = df["current_sap_points"] + df["sap_points"]
df["predicted_post_works_sap"] = df["predicted_post_works_sap"]
df["predicted_post_works_epc"] = df["predicted_post_works_sap"].apply(
lambda x: sap_to_epc(x)
)
df["uprn"] = df["uprn"].astype(str)
# Expected columns list
expected_columns = [
"suspended_floor_insulation",
"solid_floor_insulation",
"external_wall_insulation",
"internal_wall_insulation",
"cavity_wall_insulation",
"loft_insulation",
"flat_roof_insulation",
"room_roof_insulation",
"secondary_glazing",
"double_glazing",
"solar_pv",
"high_heat_retention_storage_heaters",
"air_source_heat_pump",
"boiler_upgrade",
"roomstat_programmer_trvs",
"time_temperature_zone_control",
]
# Add missing columns with default values
for col in expected_columns:
if col not in df.columns:
df[col] = ""
# Create excel to store to
filename = f"{scenario_names[scenario_id]} - {project_name}.xlsx"
with pd.ExcelWriter(filename) as writer:
df.to_excel(writer, sheet_name="properties", index=False)