mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
714 lines
23 KiB
Python
714 lines
23 KiB
Python
import pandas as pd
|
|
from sqlalchemy.orm import sessionmaker
|
|
from backend.app.db.connection import db_engine, db_read_session, db_session
|
|
from backend.app.db.models.recommendations import Recommendation, Plan, PlanRecommendations, RecommendationMaterials, \
|
|
InstalledMeasure
|
|
from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel
|
|
from sqlalchemy import func
|
|
from backend.app.utils import sap_to_epc
|
|
from typing import Dict, List, Set
|
|
from recommendations.Costs import Costs
|
|
from backend.app.db.models.portfolio import Epc
|
|
|
|
|
|
def get_all_data(portfolio_id, scenario_ids):
|
|
session = sessionmaker(bind=db_engine)()
|
|
session.begin()
|
|
|
|
# --------------------
|
|
# Properties
|
|
# --------------------
|
|
properties_query = session.query(
|
|
PropertyModel,
|
|
PropertyDetailsEpcModel
|
|
).join(
|
|
PropertyDetailsEpcModel,
|
|
PropertyModel.id == PropertyDetailsEpcModel.property_id
|
|
).filter(
|
|
PropertyModel.portfolio_id == portfolio_id
|
|
).all()
|
|
|
|
properties_data = [
|
|
{
|
|
**{col.name: getattr(p.PropertyModel, col.name)
|
|
for col in PropertyModel.__table__.columns},
|
|
**{col.name: getattr(p.PropertyDetailsEpcModel, col.name)
|
|
for col in PropertyDetailsEpcModel.__table__.columns},
|
|
}
|
|
for p in properties_query
|
|
]
|
|
|
|
# --------------------
|
|
# Plans
|
|
# --------------------
|
|
plans_query = session.query(Plan).filter(
|
|
Plan.scenario_id.in_(scenario_ids)
|
|
).all()
|
|
|
|
plans_data = [
|
|
{col.name: getattr(plan, col.name) for col in Plan.__table__.columns}
|
|
for plan in plans_query
|
|
]
|
|
|
|
plan_ids = [p["id"] for p in plans_data]
|
|
|
|
# --------------------
|
|
# Recommendations (NO materials yet)
|
|
# --------------------
|
|
recommendations_query = session.query(
|
|
Recommendation,
|
|
Plan.scenario_id
|
|
).join(
|
|
PlanRecommendations,
|
|
Recommendation.id == PlanRecommendations.recommendation_id
|
|
).join(
|
|
Plan,
|
|
Plan.id == PlanRecommendations.plan_id
|
|
).filter(
|
|
PlanRecommendations.plan_id.in_(plan_ids),
|
|
).all()
|
|
|
|
recommendations_data = [
|
|
{
|
|
**{col.name: getattr(r.Recommendation, col.name)
|
|
for col in Recommendation.__table__.columns},
|
|
"scenario_id": r.scenario_id,
|
|
"materials": [] # placeholder
|
|
}
|
|
for r in recommendations_query
|
|
]
|
|
|
|
session.close()
|
|
|
|
return properties_data, plans_data, recommendations_data
|
|
|
|
|
|
PORTFOLIO_ID = 419 # Peabody
|
|
SCENARIOS = [
|
|
# 871, # EPC C - fabric first, no solid floor, ashp 3.0
|
|
# 863, # EPC B, No EWI/IWI, No Solid Floor, ASHP 3.0 COP
|
|
# 862, # EPC B - No solid floor, ASHP COP 3.0
|
|
# 861, # EPC C, No EWI/IWI, No Solid Floor, ASHP 3.0 COP
|
|
# 859, # EPC C - no solid floor, ashp 3.0
|
|
885, # EPC B - fabric first, no solid floor, ashp 3.0
|
|
]
|
|
|
|
# properties_data, plans_data, recommendations_data = get_all_data(portfolio_id=PORTFOLIO_ID, scenario_ids=SCENARIOS)
|
|
# # Store this data as dataframes for analysis
|
|
# properties_df = pd.DataFrame(properties_data)
|
|
# plans_df = pd.DataFrame(plans_data)
|
|
# recommendations_df = pd.DataFrame(recommendations_data)
|
|
|
|
# Save CSVs
|
|
# properties_df.to_csv(
|
|
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/"
|
|
# "f_peabody_properties_data_20260108.csv",
|
|
# index=False
|
|
# )
|
|
# plans_df.to_csv(
|
|
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/"
|
|
# "f_peabody_plans_data_20260108.csv",
|
|
# index=False
|
|
# )
|
|
# recommendations_df.to_csv(
|
|
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/"
|
|
# "f_peabody_recommendations_data_20260108.csv",
|
|
# index=False
|
|
# )
|
|
# Read csvs
|
|
properties_df = pd.read_csv(
|
|
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/"
|
|
"f_peabody_properties_data_20260108.csv"
|
|
)
|
|
plans_df = pd.read_csv(
|
|
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/"
|
|
"f_peabody_plans_data_20260108.csv"
|
|
)
|
|
recommendations_df = pd.read_csv(
|
|
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/"
|
|
"f_peabody_recommendations_data_20260108.csv"
|
|
)
|
|
|
|
sustainability_data = pd.read_excel(
|
|
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/2025_11_11 - Peabody "
|
|
"- Data Extracts for Domna.xlsx",
|
|
sheet_name="Sustainability"
|
|
)
|
|
|
|
# recommendations_df = pd.read_excel(
|
|
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/EPC B, "
|
|
# "No solid floor, ASHP COP 3.0.xlsx"
|
|
# )
|
|
|
|
# recommendations_df2 = recommendations_df2.merge(
|
|
# properties_df[["id", "uprn"]],
|
|
# left_on="property_id",
|
|
# right_on="id",
|
|
# how="left"
|
|
# ).rename(columns={"id_x": "id"}).drop(columns=["id_y"])
|
|
# recommendations_df["uprn"] = recommendations_df["uprn"].astype(int).astype(str)
|
|
|
|
# We just need all of the measure types, per property
|
|
recommendation_measure_types = recommendations_df[["property_id", "measure_type"]].drop_duplicates()
|
|
recommendation_measure_types["flag"] = True
|
|
|
|
# We pivot
|
|
recommendations_measures_pivot = recommendation_measure_types.pivot(
|
|
index='property_id',
|
|
columns='measure_type',
|
|
values='flag'
|
|
)
|
|
recommendations_measures_pivot = recommendations_measures_pivot.reset_index()
|
|
|
|
# Create a total cost column
|
|
recommendations_total_cost = recommendations_df.groupby("property_id")["estimated_cost"].sum().reset_index()
|
|
recommendations_measures_pivot = recommendations_measures_pivot.merge(
|
|
recommendations_total_cost, how="left", on="property_id"
|
|
)
|
|
|
|
properties_to_recs = properties_df.rename(columns={"solar_pv": "solar_data"}).merge(
|
|
recommendations_measures_pivot, how="left", left_on="id", right_on="property_id"
|
|
)
|
|
properties_to_recs["estimated_cost"] = properties_to_recs["estimated_cost"].fillna(0)
|
|
|
|
sustainability_data["has_cavity_insulation"] = sustainability_data["Wall Insulation"].isin(
|
|
["FilledCavity", "FilledCavityPlusInternal", "FilledCavityPlusExternal"]
|
|
)
|
|
sustainability_data["has_iwi"] = sustainability_data["Wall Insulation"].isin(
|
|
["Internal", "FilledCavityPlusInternal"]
|
|
)
|
|
sustainability_data["has_ewi"] = sustainability_data["Wall Insulation"].isin(
|
|
["External", "FilledCavityPlusExternal"]
|
|
)
|
|
sustainability_data["has_loft_insulation"] = sustainability_data["Roof Insulation"].isin(
|
|
["mm300", "mm250"]
|
|
)
|
|
sustainability_data["has_glazing"] = sustainability_data["Glazing"].isin(
|
|
["Double 2002 or later", "Double but age unknown", "Triple", "DoubleKnownData", "Secondary", "TripleKnownData"]
|
|
)
|
|
|
|
sustainability_data["has_floor_insulation"] = sustainability_data["Floor Insulation"].isin(
|
|
["RetroFitted"]
|
|
)
|
|
|
|
sustainability_data["has_efficient_boiler"] = (
|
|
sustainability_data["Heating"].isin(["Boilers"]) & sustainability_data["Boiler Efficiency"].isin(["A"])
|
|
)
|
|
sustainability_data["has_ashp"] = (sustainability_data["Heating"].isin(["Heat pumps (wet)"]))
|
|
|
|
sustainability_data["has_top_heat_controls"] = (
|
|
sustainability_data["Controls Adequacy"].isin(["Top Spec"])
|
|
)
|
|
|
|
sustainability_data["has_optimal_heat_controls"] = (
|
|
sustainability_data["Controls Adequacy"].isin(["Optimal"])
|
|
)
|
|
sustainability_data["has_flat_roof_insulation"] = (
|
|
(sustainability_data["Roof Construction"] == "Flat") &
|
|
(sustainability_data["Roof Insulation"].isin(["mm50", "mm150", "mm100"]))
|
|
)
|
|
|
|
properties_to_recs["uprn"] = properties_to_recs["uprn"].astype(str)
|
|
comparison = sustainability_data.merge(
|
|
properties_to_recs[
|
|
["uprn", "cavity_wall_insulation", "external_wall_insulation", "internal_wall_insulation", "loft_insulation",
|
|
"double_glazing", "secondary_glazing", "suspended_floor_insulation", "boiler_upgrade", "air_source_heat_pump",
|
|
"time_temperature_zone_control", "roomstat_programmer_trvs", "flat_roof_insulation", "room_roof_insulation"
|
|
]
|
|
],
|
|
left_on="UPRN",
|
|
right_on="uprn",
|
|
how="left"
|
|
)
|
|
|
|
# Flag entries where we've been told that walls are already insulated, but we have recommendations for wall insulation
|
|
# ------------ Walls ------------
|
|
comparison["conflict_cavity_wall_insulation"] = (
|
|
(comparison["has_cavity_insulation"]) &
|
|
(pd.isnull(comparison["cavity_wall_insulation"]) == False)
|
|
)
|
|
comparison["conflict_iwi_wall_insulation"] = (
|
|
(comparison["has_iwi"]) &
|
|
(pd.isnull(comparison["internal_wall_insulation"]) == False)
|
|
)
|
|
comparison["conflict_ewi_wall_insulation"] = (
|
|
(comparison["has_ewi"]) &
|
|
(pd.isnull(comparison["internal_wall_insulation"]) == False)
|
|
)
|
|
|
|
cwi_conflicting = comparison[comparison["conflict_cavity_wall_insulation"] == True]
|
|
iwi_conflicting = comparison[comparison["conflict_iwi_wall_insulation"] == True]
|
|
ewi_conflicting = comparison[comparison["conflict_ewi_wall_insulation"] == True]
|
|
|
|
# ------------ Roof ------------
|
|
comparison["conflict_roof_insulation"] = (
|
|
(comparison["has_loft_insulation"]) &
|
|
(pd.isnull(comparison["loft_insulation"]) == False)
|
|
)
|
|
|
|
loft_conflicting = comparison[comparison["conflict_roof_insulation"] == True]
|
|
|
|
# ------------ Windows ------------
|
|
comparison["conflict_double_glazing"] = (
|
|
(comparison["has_glazing"]) &
|
|
(
|
|
(pd.isnull(comparison["double_glazing"]) == False) | (pd.isnull(comparison["secondary_glazing"]) == False)
|
|
)
|
|
)
|
|
windows_conflicting = comparison[comparison["conflict_double_glazing"] == True]
|
|
|
|
# ------------ Floors ------------
|
|
comparison["conflict_suspended_floor_insulation"] = (
|
|
(comparison["has_floor_insulation"]) &
|
|
(pd.isnull(comparison["suspended_floor_insulation"]) == False)
|
|
)
|
|
floors_conflicting = comparison[comparison["conflict_suspended_floor_insulation"] == True]
|
|
|
|
# ------------ Boiler Upgrade ------------
|
|
comparison["conflict_boiler_upgrade"] = (
|
|
(comparison["has_efficient_boiler"]) &
|
|
(pd.isnull(comparison["boiler_upgrade"]) == False)
|
|
)
|
|
boiler_conflicting = comparison[comparison["conflict_boiler_upgrade"] == True]
|
|
|
|
# ------------ ASHP ------------
|
|
comparison["conflict_air_source_heat_pump"] = (
|
|
(comparison["has_ashp"]) &
|
|
(pd.isnull(comparison["air_source_heat_pump"]) == False)
|
|
)
|
|
ashp_conflicting = comparison[comparison["conflict_air_source_heat_pump"] == True]
|
|
|
|
# ------------ heat controls ------------
|
|
comparison["conflict_time_temperature_zone_control"] = (
|
|
(comparison["has_top_heat_controls"]) &
|
|
(pd.isnull(comparison["time_temperature_zone_control"]) == False)
|
|
)
|
|
comparison["conflict_roomstat_programmer_trvs"] = (
|
|
(comparison["has_optimal_heat_controls"]) &
|
|
(pd.isnull(comparison["roomstat_programmer_trvs"]) == False)
|
|
)
|
|
ttzc_conflicting = comparison[comparison["conflict_time_temperature_zone_control"] == True]
|
|
rst_conflicting = comparison[comparison["conflict_roomstat_programmer_trvs"] == True]
|
|
|
|
# ------------ Flat Roof Insulation -----------
|
|
comparison["conflict_flat_roof_insulation"] = (
|
|
(comparison["has_flat_roof_insulation"]) &
|
|
(pd.isnull(comparison["flat_roof_insulation"]) == False)
|
|
)
|
|
flat_roof_conflicting = comparison[comparison["conflict_flat_roof_insulation"] == True]
|
|
|
|
# All properties with conflicts
|
|
all_conflicts = pd.concat(
|
|
[
|
|
cwi_conflicting,
|
|
iwi_conflicting,
|
|
ewi_conflicting,
|
|
loft_conflicting,
|
|
windows_conflicting,
|
|
floors_conflicting,
|
|
boiler_conflicting,
|
|
ashp_conflicting,
|
|
ttzc_conflicting,
|
|
rst_conflicting,
|
|
flat_roof_conflicting
|
|
]
|
|
)
|
|
|
|
all_conflicts["UPRN"].nunique()
|
|
|
|
|
|
# What do I need to do:
|
|
# TODO: - need to get a view of "all" measures for the property, not just recommended. We can do this but just looking
|
|
# at one scenario
|
|
# 1) I should store the current recommendations table, for the portfolio as a backup
|
|
# 2) I need a total of already installed SAP points for each property. This should probably be stored on the
|
|
# property_details_epc tabe
|
|
# 3) For anything already installed, I should mark already installed as True, and set the cost to zero
|
|
# 4) I need to update the plan cost to remove the cost of the installed measures
|
|
|
|
|
|
### Rebaselining
|
|
|
|
|
|
def get_installed_sap_adjustments_by_uprn_for_portfolio(
|
|
session,
|
|
portfolio_id: int,
|
|
) -> Dict[int, float]:
|
|
"""
|
|
Returns { uprn -> total_sap_delta }
|
|
"""
|
|
|
|
uprn_subquery = (
|
|
session.query(PropertyModel.uprn)
|
|
.filter(PropertyModel.portfolio_id == portfolio_id)
|
|
.filter(PropertyModel.uprn.isnot(None))
|
|
.subquery()
|
|
)
|
|
|
|
rows = (
|
|
session.query(
|
|
InstalledMeasure.uprn,
|
|
func.coalesce(func.sum(InstalledMeasure.sap_points), 0.0),
|
|
)
|
|
.filter(InstalledMeasure.is_active.is_(True))
|
|
.filter(InstalledMeasure.uprn.in_(uprn_subquery))
|
|
.group_by(InstalledMeasure.uprn)
|
|
.all()
|
|
)
|
|
|
|
return {uprn: float(delta) for uprn, delta in rows}
|
|
|
|
|
|
def get_installed_measure_types_by_uprn(
|
|
session,
|
|
uprn: int,
|
|
) -> Set[str]:
|
|
rows = (
|
|
session.query(InstalledMeasure.measure_type)
|
|
.filter(InstalledMeasure.uprn == uprn)
|
|
.filter(InstalledMeasure.is_active.is_(True))
|
|
.all()
|
|
)
|
|
|
|
# Convert enums → strings
|
|
return {
|
|
r[0].value if hasattr(r[0], "value") else r[0]
|
|
for r in rows
|
|
}
|
|
|
|
|
|
# ------------------------------------------------------------
|
|
# PROPERTY REBASING (READ-ONLY)
|
|
# ------------------------------------------------------------
|
|
|
|
def compute_property_sap_updates(
|
|
properties: List[PropertyModel],
|
|
sap_adjustments: Dict[int, float],
|
|
) -> List[dict]:
|
|
"""
|
|
Returns property SAP rebasing results.
|
|
Does NOT mutate DB objects.
|
|
"""
|
|
|
|
updates = []
|
|
|
|
for prop in properties:
|
|
if prop.uprn is None or prop.original_sap_points is None:
|
|
continue
|
|
|
|
sap_delta = sap_adjustments.get(prop.uprn, 0.0)
|
|
new_sap = prop.original_sap_points + sap_delta
|
|
|
|
updates.append({
|
|
"property_id": prop.id,
|
|
"uprn": prop.uprn,
|
|
"original_sap_points": prop.original_sap_points,
|
|
"installed_sap_delta": sap_delta,
|
|
"new_sap_points": new_sap,
|
|
"is_adjusted": sap_delta != 0,
|
|
})
|
|
|
|
return updates
|
|
|
|
|
|
# ------------------------------------------------------------
|
|
# PLAN RECOMPUTATION HELPERS
|
|
# ------------------------------------------------------------
|
|
|
|
def get_effective_plan_recommendations(
|
|
session,
|
|
plan_id: int,
|
|
excluded_measure_types: Set[str],
|
|
) -> List[Recommendation]:
|
|
q = (
|
|
session.query(Recommendation)
|
|
.join(PlanRecommendations)
|
|
.filter(PlanRecommendations.plan_id == plan_id)
|
|
.filter(Recommendation.default.is_(True))
|
|
)
|
|
|
|
if excluded_measure_types:
|
|
q = q.filter(
|
|
~Recommendation.measure_type.in_(excluded_measure_types)
|
|
)
|
|
|
|
return q.all()
|
|
|
|
|
|
def aggregate_plan_metrics(recommendations: list[Recommendation]):
|
|
agg = {
|
|
"sap_points": 0.0,
|
|
"co2_savings": 0.0,
|
|
"energy_bill_savings": 0.0,
|
|
"energy_consumption_savings": 0.0,
|
|
"valuation_increase": 0.0,
|
|
"cost_of_works": 0.0,
|
|
"contingency_cost": 0.0,
|
|
}
|
|
|
|
for r in recommendations:
|
|
agg["sap_points"] += r.sap_points or 0.0
|
|
agg["co2_savings"] += r.co2_equivalent_savings or 0.0
|
|
agg["energy_bill_savings"] += r.energy_cost_savings or 0.0
|
|
agg["energy_consumption_savings"] += r.energy_savings or 0.0
|
|
agg["valuation_increase"] += r.property_valuation_increase or 0.0
|
|
|
|
base_cost = r.estimated_cost or 0.0
|
|
agg["cost_of_works"] += base_cost
|
|
agg["contingency_cost"] += calculate_contingency_for_recommendation(r)
|
|
|
|
return agg
|
|
|
|
|
|
# ------------------------------------------------------------
|
|
# PLAN REBASING (READ-ONLY)
|
|
# ------------------------------------------------------------
|
|
|
|
def compute_plan_updates(
|
|
session,
|
|
plans: List[Plan],
|
|
properties_by_id: Dict[int, PropertyModel],
|
|
epcs_by_property_id: Dict[int, PropertyDetailsEpcModel],
|
|
property_sap_updates: Dict[int, dict],
|
|
) -> List[dict]:
|
|
"""
|
|
Computes plan metrics assuming properties are already rebased.
|
|
"""
|
|
|
|
updates = []
|
|
|
|
for plan in plans:
|
|
prop = properties_by_id.get(plan.property_id)
|
|
epc = epcs_by_property_id.get(plan.property_id)
|
|
prop_update = property_sap_updates.get(plan.property_id)
|
|
|
|
if not prop or not epc or not prop_update:
|
|
continue
|
|
|
|
installed_types = get_installed_measure_types_by_uprn(
|
|
session, prop.uprn
|
|
)
|
|
|
|
future_recs = get_effective_plan_recommendations(
|
|
session,
|
|
plan.id,
|
|
installed_types,
|
|
)
|
|
|
|
metrics = aggregate_plan_metrics(future_recs)
|
|
|
|
baseline_bill = (
|
|
epc.heating_cost_current
|
|
+ epc.hot_water_cost_current
|
|
+ epc.lighting_cost_current
|
|
+ epc.appliances_cost_current
|
|
+ epc.gas_standing_charge
|
|
+ epc.electricity_standing_charge
|
|
)
|
|
|
|
post_sap = prop_update["new_sap_points"] + metrics["sap_points"]
|
|
|
|
updates.append({
|
|
"plan_id": plan.id,
|
|
"property_id": plan.property_id,
|
|
|
|
# SAP / EPC
|
|
"post_sap_points": post_sap,
|
|
"post_epc_rating": sap_to_epc(post_sap),
|
|
|
|
# Carbon
|
|
"co2_savings": metrics["co2_savings"],
|
|
"post_co2_emissions": (
|
|
epc.co2_emissions - metrics["co2_savings"]
|
|
if epc.co2_emissions is not None
|
|
else None
|
|
),
|
|
|
|
# Energy bills
|
|
"energy_bill_savings": metrics["energy_bill_savings"],
|
|
"post_energy_bill": baseline_bill - metrics["energy_bill_savings"],
|
|
|
|
# Energy consumption
|
|
"energy_consumption_savings": metrics["energy_consumption_savings"],
|
|
"post_energy_consumption": (
|
|
epc.primary_energy_consumption
|
|
- metrics["energy_consumption_savings"]
|
|
),
|
|
|
|
# Valuation
|
|
"valuation_increase": metrics["valuation_increase"],
|
|
"valuation_post_retrofit": (
|
|
prop.current_valuation + metrics["valuation_increase"]
|
|
if prop.current_valuation is not None
|
|
else None
|
|
),
|
|
|
|
# Costs
|
|
"cost_of_works": metrics["cost_of_works"],
|
|
"contingency_cost": metrics["contingency_cost"],
|
|
})
|
|
|
|
return updates
|
|
|
|
|
|
def calculate_contingency_for_recommendation(
|
|
recommendation,
|
|
) -> float:
|
|
"""
|
|
Recompute contingency for a recommendation using the same
|
|
logic as the costing engine.
|
|
|
|
Assumptions:
|
|
- recommendation.estimated_cost is the 'total' cost
|
|
- contingency is a percentage of total
|
|
"""
|
|
|
|
if recommendation.estimated_cost is None:
|
|
return 0.0
|
|
|
|
# Normalise measure_type (Enum → str)
|
|
measure_type = (
|
|
recommendation.measure_type.value
|
|
if hasattr(recommendation.measure_type, "value")
|
|
else recommendation.measure_type
|
|
)
|
|
|
|
# Measure-specific contingency if defined, else global fallback
|
|
contingency_rate = Costs.CONTINGENCIES.get(
|
|
measure_type,
|
|
Costs.CONTINGENCY, # default (e.g. 10%)
|
|
)
|
|
|
|
return recommendation.estimated_cost * contingency_rate
|
|
|
|
|
|
def persist_property_sap_updates(
|
|
property_updates_by_id: dict[int, dict],
|
|
):
|
|
"""
|
|
Writes adjusted SAP values back to property table.
|
|
Safe to re-run.
|
|
"""
|
|
|
|
with db_session() as session:
|
|
properties = (
|
|
session.query(PropertyModel)
|
|
.filter(PropertyModel.id.in_(property_updates_by_id.keys()))
|
|
.all()
|
|
)
|
|
|
|
for prop in properties:
|
|
update = property_updates_by_id[prop.id]
|
|
|
|
prop.installed_measures_sap_point_adjustment = update["installed_sap_delta"]
|
|
prop.is_sap_points_adjusted_for_installed_measures = update["is_adjusted"]
|
|
prop.current_sap_points = update["new_sap_points"]
|
|
prop.current_epc_rating = sap_to_epc(update["new_sap_points"])
|
|
|
|
print(f"✅ Updated {len(properties)} properties")
|
|
|
|
|
|
def persist_plan_updates(plan_updates: list[dict]):
|
|
"""
|
|
Writes recalculated plan metrics.
|
|
Safe to re-run.
|
|
"""
|
|
|
|
with db_session() as session:
|
|
plans = (
|
|
session.query(Plan)
|
|
.filter(Plan.id.in_([u["plan_id"] for u in plan_updates]))
|
|
.all()
|
|
)
|
|
|
|
plans_by_id = {p.id: p for p in plans}
|
|
|
|
for update in plan_updates:
|
|
plan = plans_by_id.get(update["plan_id"])
|
|
if not plan:
|
|
continue
|
|
|
|
# SAP / EPC
|
|
plan.post_sap_points = update["post_sap_points"]
|
|
plan.post_epc_rating = Epc(update["post_epc_rating"])
|
|
|
|
# Carbon
|
|
plan.co2_savings = update["co2_savings"]
|
|
plan.post_co2_emissions = update["post_co2_emissions"]
|
|
|
|
# Energy
|
|
plan.energy_bill_savings = update["energy_bill_savings"]
|
|
plan.post_energy_bill = update["post_energy_bill"]
|
|
|
|
plan.energy_consumption_savings = update["energy_consumption_savings"]
|
|
plan.post_energy_consumption = update["post_energy_consumption"]
|
|
|
|
# Valuation
|
|
plan.valuation_increase = update["valuation_increase"]
|
|
plan.valuation_post_retrofit = update["valuation_post_retrofit"]
|
|
|
|
# Costs
|
|
plan.cost_of_works = update["cost_of_works"]
|
|
plan.contingency_cost = update["contingency_cost"]
|
|
|
|
print(f"✅ Updated {len(plans)} plans")
|
|
|
|
|
|
# ------------------------------------------------------------
|
|
# EXECUTION (DRY RUN)
|
|
# ------------------------------------------------------------
|
|
|
|
PORTFOLIO_ID = 430
|
|
# TODO - run the original sap points update on the peabody portfolio
|
|
|
|
with db_read_session() as session:
|
|
properties = (
|
|
session.query(PropertyModel)
|
|
.filter(PropertyModel.portfolio_id == PORTFOLIO_ID)
|
|
.all()
|
|
)
|
|
|
|
plans = (
|
|
session.query(Plan)
|
|
.filter(Plan.portfolio_id == PORTFOLIO_ID)
|
|
.all()
|
|
)
|
|
|
|
epcs = {
|
|
e.property_id: e
|
|
for e in (
|
|
session.query(PropertyDetailsEpcModel)
|
|
.join(PropertyModel)
|
|
.filter(PropertyModel.portfolio_id == PORTFOLIO_ID)
|
|
.all()
|
|
)
|
|
}
|
|
|
|
sap_adjustments = get_installed_sap_adjustments_by_uprn_for_portfolio(
|
|
session,
|
|
PORTFOLIO_ID,
|
|
)
|
|
|
|
property_updates = compute_property_sap_updates(
|
|
properties,
|
|
sap_adjustments,
|
|
)
|
|
|
|
property_updates_by_id = {
|
|
u["property_id"]: u
|
|
for u in property_updates
|
|
}
|
|
|
|
properties_by_id = {p.id: p for p in properties}
|
|
|
|
plan_updates = compute_plan_updates(
|
|
session,
|
|
plans,
|
|
properties_by_id,
|
|
epcs,
|
|
property_updates_by_id,
|
|
)
|
|
|
|
# When ready to run!
|
|
persist_property_sap_updates(property_updates_by_id)
|
|
persist_plan_updates(plan_updates)
|