Model/backend/app/db/functions/recommendations_functions.py
2026-02-20 17:10:53 +00:00

730 lines
23 KiB
Python

from typing import Any, Dict, List, Tuple
from sqlalchemy import inspect, text, insert, delete, select
from sqlalchemy.orm import Session, Mapper
from sqlalchemy.exc import SQLAlchemyError
from sqlmodel import Session
from backend.app.db.models.recommendations import (
PlanModel,
Recommendation,
RecommendationMaterials,
PlanRecommendations,
ScenarioModel,
)
from backend.app.db.models.portfolio import PropertyModel
from backend.app.db.connection import db_session, db_read_session
def prepare_plan_data(
p,
body,
scenario_id,
eco_packages,
valuations,
new_sap_points,
new_epc,
default_recommendations,
rebaselining_carbon=0,
rebaselining_heat_demand=0,
rebaselining_kwh=0,
rebaselining_bills=0,
):
"""
Utility function to prepare the data that goes into the production of a plan. Is a fairly rough and unstructured
function that will need improving in the future
:param p: Instantiated property
:param body: request body, PlanTriggerRequest
:param scenario_id: unique identifier for the scenario
:param eco_packages: Pre-constructed eco packages for a property
:param valuations: valuation improvement data
:param new_sap_points: sap points, post default recommendations
:param new_epc: new epc rating, post default recommendations
:param default_recommendations: list of default recommendations for a property
:param rebaselining_carbon: carbon emissions adjustment for rebaselining
:param rebaselining_heat_demand: heat demand adjustment for rebaselining
:param rebaselining_kwh: kwh consumption adjustment for rebaselining
:param rebaselining_bills: energy bill adjustment for rebaselining
:return:
"""
# Plan carbon savings
co2_savings = sum(
[
r["co2_equivalent_savings"]
for r in default_recommendations
if not r.get("already_installed", False)
]
)
post_co2_emissions = p.energy["co2_emissions"] - rebaselining_carbon - co2_savings
# Plan bill savings
energy_bill_savings = sum(
[
r["energy_cost_savings"]
for r in default_recommendations
if not r.get("already_installed", False)
]
)
post_energy_bill = (
sum(p.current_energy_bill.values()) - rebaselining_bills - energy_bill_savings
)
# energy consumption
energy_consumption_savings = sum(
[
r["kwh_savings"]
for r in default_recommendations
if not r.get("already_installed", False)
]
)
post_energy_consumption = (
p.current_energy_consumption - rebaselining_kwh - energy_consumption_savings
)
valuation_post_retrofit, valuation_increase = None, None
if valuations["current_value"]:
valuation_increase = valuations["average_increase"]
valuation_post_retrofit = valuations["average_increased_value"]
# plan costing data
cost_of_works = sum(
[
r["total"]
for r in default_recommendations
if not r.get("already_installed", False)
]
)
contingency_cost = sum(
[
r.get("contingency", 0)
for r in default_recommendations
if not r.get("already_installed", False)
]
)
return {
"portfolio_id": body.portfolio_id,
"property_id": p.id,
"scenario_id": scenario_id,
"is_default": True if p.is_new else False,
"name": body.scenario_name,
"valuation_increase_lower_bound": (
valuations["lower_bound_increased_value"] - valuations["current_value"]
),
"valuation_increase_upper_bound": (
valuations["upper_bound_increased_value"] - valuations["current_value"]
),
"valuation_increase_average": (
valuations["average_increased_value"] - valuations["current_value"]
),
"post_sap_points": float(new_sap_points),
"post_epc_rating": new_epc,
"post_co2_emissions": float(post_co2_emissions),
"co2_savings": float(co2_savings),
"post_energy_bill": float(post_energy_bill),
"energy_bill_savings": float(energy_bill_savings),
"post_energy_consumption": float(post_energy_consumption),
"energy_consumption_savings": float(energy_consumption_savings),
"valuation_post_retrofit": valuation_post_retrofit,
"valuation_increase": valuation_increase,
"cost_of_works": float(cost_of_works),
"contingency_cost": float(contingency_cost),
"plan_type": eco_packages.get(p.id, (None, None, None))[2],
}
def create_plan(session: Session, plan):
"""
This function will create a record for the plan in the database if it does not exist.
:param session: The database session
:param plan: dictionary of data representing a plan to be created
"""
try:
new_plan = PlanModel(**plan)
session.add(new_plan)
session.flush()
session.commit()
return new_plan.id
except SQLAlchemyError as e:
session.rollback()
raise e
def bulk_create_plans(session: Session, plans_to_create: list[dict]) -> dict[int, int]:
if not plans_to_create:
return {}
payload = [
{
"property_id": p["property_id"],
**p["plan_data"],
}
for p in plans_to_create
]
stmt = (
insert(PlanModel).values(payload).returning(PlanModel.id, PlanModel.property_id)
)
result = session.execute(stmt).all()
# property_id -> plan_id
return {row.property_id: row.id for row in result}
def create_scenario(session: Session, scenario: dict) -> int:
existing_scenario = (
session.query(ScenarioModel)
.filter_by(portfolio_id=scenario["portfolio_id"])
.first()
)
scenario["is_default"] = not bool(existing_scenario)
new_scenario = ScenarioModel(**scenario)
session.add(new_scenario)
session.flush() # ensures ID is populated
scenario_id = new_scenario.id
session.commit()
return scenario_id
def create_recommendation(session: Session, recommendation):
"""
This function will create a record for the recommendation in the database if it does not exist.
:param session: The database session
:param recommendation: dictionary of data representing a recommendation to be created
"""
try:
new_recommendation = Recommendation(**recommendation)
session.add(new_recommendation)
session.flush()
session.commit()
return new_recommendation.id
except SQLAlchemyError as e:
session.rollback()
raise e
def create_recommendation_material(
session: Session, recommendation_id, material_id, depth
):
"""
This function will create a record for the recommendation_material in the database if it does not exist.
:param session: The databse session
:param recommendation_id: ID of the recommendation
:param material_id: ID of the material
:param depth: depth of the material, may be null if a material where depth is not applicable
"""
new_recommendation_material = RecommendationMaterials(
recommendation_id=recommendation_id, material_id=material_id, depth=depth
)
session.add(new_recommendation_material)
session.flush()
return new_recommendation_material.id
def create_plan_recommendations(session: Session, plan_id, recommendation_ids):
"""
This function will create records for the plan_recommendation in the database.
:param session: The database session
:param plan_id: ID of the plan
:param recommendation_ids: list of recommendation IDs
"""
# Prepare a list of dictionaries for bulk insert
data = [
{"plan_id": plan_id, "recommendation_id": rid} for rid in recommendation_ids
]
# Bulk insert using SQLAlchemy's core API
session.execute(insert(PlanRecommendations).values(data))
def upload_recommendations(
session: Session, recommendations_to_upload, property_id, new_plan_id
):
try:
# Prepare data for bulk insert for Recommendation
recommendations_data = [
{
"property_id": property_id,
"type": rec["type"],
"measure_type": rec["measure_type"],
"description": rec["description"],
"estimated_cost": float(rec["total"]),
"default": rec["default"],
"starting_u_value": (
float(rec.get("starting_u_value"))
if rec.get("starting_u_value")
else None
),
"new_u_value": (
float(rec.get("new_u_value")) if rec.get("new_u_value") else None
),
"sap_points": float(rec["sap_points"]),
"energy_savings": float(rec["heat_demand"]),
"kwh_savings": float(rec["kwh_savings"]),
"co2_equivalent_savings": float(rec["co2_equivalent_savings"]),
"total_work_hours": float(rec["labour_hours"]),
"energy_cost_savings": float(rec["energy_cost_savings"]),
"labour_days": float(rec["labour_days"]),
"already_installed": rec["already_installed"],
"heat_demand": float(rec["heat_demand"]),
}
for rec in recommendations_to_upload
]
# Insert the recommendations, get back the IDs
stmt = (
insert(Recommendation)
.returning(Recommendation.id)
.values(recommendations_data)
)
result = session.execute(stmt)
uploaded_recommendation_ids = [row[0] for row in result]
# Prepare data for bulk insert for RecommendationMaterials
recommendation_materials_data = [
{
"recommendation_id": recommendation_id,
"material_id": part["id"],
"depth": int(part["depth"]) if part["depth"] else None,
"quantity": float(part["quantity"]) if part.get("quantity") else None,
"quantity_unit": part.get("quantity_unit", None),
"estimated_cost": float(part.get("total", part.get("total_cost"))),
}
for rec, recommendation_id in zip(
recommendations_to_upload, uploaded_recommendation_ids
)
for part in rec["parts"]
]
session.bulk_insert_mappings(
RecommendationMaterials, recommendation_materials_data
)
# flush the changes to get the newly created IDs
session.flush()
create_plan_recommendations(
session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids
)
# Commit the transaction
session.commit()
return True
except SQLAlchemyError as e:
# Rollback the transaction in case of an error
session.rollback()
print(f"An error occurred: {e}")
return False
def bulk_upload_recommendations_and_materials(
session: Session,
recommendation_payload: list[dict],
):
if not recommendation_payload:
return
# ---------------------------------------------------------
# 1. Prepare recommendation rows
# ---------------------------------------------------------
recommendation_rows = []
parts_by_index = []
plan_ids_by_index = []
for rec in recommendation_payload:
recommendation_rows.append(
{
"property_id": rec["property_id"],
"type": rec["type"],
"measure_type": rec["measure_type"],
"description": rec["description"],
"estimated_cost": rec["estimated_cost"],
"default": rec["default"],
"starting_u_value": rec["starting_u_value"],
"new_u_value": rec["new_u_value"],
"sap_points": rec["sap_points"],
"heat_demand": rec["heat_demand"],
"kwh_savings": rec["kwh_savings"],
"co2_equivalent_savings": rec["co2_equivalent_savings"],
"energy_savings": rec["energy_savings"],
"energy_cost_savings": rec["energy_cost_savings"],
"total_work_hours": rec["total_work_hours"],
"labour_days": rec["labour_days"],
"already_installed": rec["already_installed"],
}
)
parts_by_index.append(rec["parts"])
plan_ids_by_index.append(rec["plan_id"])
# ---------------------------------------------------------
# 2. Insert recommendations and get IDs
# ---------------------------------------------------------
result = session.execute(
insert(Recommendation).values(recommendation_rows).returning(Recommendation.id)
)
recommendation_ids = [row[0] for row in result]
# ---------------------------------------------------------
# 3. Insert recommendation materials
# ---------------------------------------------------------
materials_rows = []
for recommendation_id, parts in zip(recommendation_ids, parts_by_index):
for part in parts:
materials_rows.append(
{
"recommendation_id": recommendation_id,
"material_id": part["material_id"],
"depth": part["depth"],
"quantity": part["quantity"],
"quantity_unit": part["quantity_unit"],
"estimated_cost": part["estimated_cost"],
}
)
if materials_rows:
session.execute(insert(RecommendationMaterials).values(materials_rows))
# ---------------------------------------------------------
# 4. Insert plan ↔ recommendation links
# ---------------------------------------------------------
plan_recommendation_rows = [
{
"plan_id": plan_id,
"recommendation_id": recommendation_id,
}
for plan_id, recommendation_id in zip(plan_ids_by_index, recommendation_ids)
]
session.execute(insert(PlanRecommendations).values(plan_recommendation_rows))
def chunked(iterable, size=100):
for i in range(0, len(iterable), size):
yield iterable[i : i + size]
def get_property_ids(portfolio_id: int) -> list[int]:
with db_read_session() as session:
return [
pid
for (pid,) in session.query(PropertyModel.id)
.filter(PropertyModel.portfolio_id == portfolio_id)
.all()
]
def delete_property_batch(session: Session, property_ids: list[int]):
if not property_ids:
return
params = {"property_ids": property_ids}
# --------------------------------------------------
# recommendation_materials (via recommendation)
# --------------------------------------------------
session.execute(
text(
"""
DELETE FROM recommendation_materials rm
USING recommendation r
WHERE rm.recommendation_id = r.id
AND r.property_id = ANY(:property_ids)
"""
),
params,
)
# --------------------------------------------------
# plan_recommendations (via plan)
# --------------------------------------------------
session.execute(
text(
"""
DELETE FROM plan_recommendations pr
USING plan p
WHERE pr.plan_id = p.id
AND p.property_id = ANY(:property_ids)
"""
),
params,
)
# --------------------------------------------------
# funding_package_measures
# --------------------------------------------------
session.execute(
text(
"""
DELETE FROM funding_package_measures fpm
USING funding_package fp, plan p
WHERE fpm.funding_package_id = fp.id
AND fp.plan_id = p.id
AND p.property_id = ANY(:property_ids)
"""
),
params,
)
# --------------------------------------------------
# inspections (direct)
# --------------------------------------------------
session.execute(
text(
"""
DELETE FROM inspections
WHERE property_id = ANY(:property_ids)
"""
),
params,
)
# --------------------------------------------------
# funding_package
# --------------------------------------------------
session.execute(
text(
"""
DELETE FROM funding_package fp
USING plan p
WHERE fp.plan_id = p.id
AND p.property_id = ANY(:property_ids)
"""
),
params,
)
# --------------------------------------------------
# recommendation (direct — CRITICAL FIX)
# --------------------------------------------------
session.execute(
text(
"""
DELETE FROM recommendation
WHERE property_id = ANY(:property_ids)
"""
),
params,
)
# --------------------------------------------------
# plan (direct)
# --------------------------------------------------
session.execute(
text(
"""
DELETE FROM plan
WHERE property_id = ANY(:property_ids)
"""
),
params,
)
# --------------------------------------------------
# property-scoped tables
# --------------------------------------------------
session.execute(
text(
"""
DELETE FROM property_details_epc
WHERE property_id = ANY(:property_ids)
"""
),
params,
)
session.execute(
text(
"""
DELETE FROM property_targets
WHERE property_id = ANY(:property_ids)
"""
),
params,
)
# --------------------------------------------------
# properties LAST
# --------------------------------------------------
session.execute(
text(
"""
DELETE FROM property
WHERE id = ANY(:property_ids)
"""
),
params,
)
def portfolio_has_properties(portfolio_id: int) -> bool:
with db_read_session() as session:
return session.query(
session.query(PropertyModel)
.filter(PropertyModel.portfolio_id == portfolio_id)
.exists()
).scalar()
def delete_portfolio_scenarios_if_empty(portfolio_id: int):
if portfolio_has_properties(portfolio_id):
print("Properties still exist — skipping scenario deletion")
return
with db_session() as session:
session.execute(
delete(ScenarioModel).where(ScenarioModel.portfolio_id == portfolio_id)
)
print("Deleted scenarios for empty portfolio")
def clear_portfolio_in_batches(
portfolio_id: int,
property_batch_size: int = 25,
):
property_ids = get_property_ids(portfolio_id)
if not property_ids:
print("No properties found.")
delete_portfolio_scenarios_if_empty(portfolio_id)
return
total = (len(property_ids) + property_batch_size - 1) // property_batch_size
import time
for i, batch in enumerate(chunked(property_ids, property_batch_size), start=1):
print(f"Deleting batch {i}/{total} ({len(batch)} properties)")
start_time = time.time()
with db_session() as session:
delete_property_batch(session, batch)
finish_time = time.time()
print(f"Batch {i} deleted in {finish_time - start_time:.2f} seconds")
# scenario deletion happens AFTER all properties are gone
delete_portfolio_scenarios_if_empty(portfolio_id)
print("Portfolio cleared in batches.")
def get_plans_by_scenario_ids(ids: List[int]) -> List[PlanModel]:
stmt = select(PlanModel).where(PlanModel.scenario_id.in_(ids))
with db_read_session() as session:
session_any: Any = session # Typehint as Any to satisfy Pylance...
return session_any.exec(stmt).scalars().all()
def get_most_recent_plans_by_portfolio_id(portfolio_id: int) -> List[PlanModel]:
# NOTE: This statement works for Postgres only, because of the Distinct
stmt = (
select(PlanModel)
.where(PlanModel.portfolio_id == portfolio_id)
.distinct(
PlanModel.property_id, PlanModel.scenario_id
) # one plan per property per scenario
.order_by(
PlanModel.property_id,
PlanModel.scenario_id,
PlanModel.created_at.desc(),
PlanModel.id.desc(),
)
)
with db_read_session() as session:
session_any: Any = session # Typehint as Any to satisfy Pylance...
return session_any.exec(stmt).scalars().all()
def get_most_recent_plans_by_scenario_ids(scenario_ids: List[int]) -> List[PlanModel]:
# NOTE: This statement works for Postgres only, because of the Distinct
stmt = (
select(PlanModel)
.where(PlanModel.scenario_id.in_(scenario_ids))
.distinct(
PlanModel.property_id, PlanModel.scenario_id
) # one plan per property per scenario
.order_by(
PlanModel.property_id,
PlanModel.scenario_id,
PlanModel.created_at.desc(),
PlanModel.id.desc(),
)
)
with db_read_session() as session:
session_any: Any = session # Typehint as Any to satisfy Pylance
return session_any.exec(stmt).scalars().all()
def get_scenarios_by_portfolio_id(portfolio_id: int) -> List[ScenarioModel]:
stmt = select(ScenarioModel).where(ScenarioModel.portfolio_id == portfolio_id)
with db_read_session() as session:
session_any: Any = session # Typehint as Any to satisfy Pylance...
return session_any.exec(stmt).scalars().all()
def get_default_plans(
portfolio_id: int,
) -> List[PlanModel]:
plan_stmt = select(PlanModel).where(
(PlanModel.portfolio_id == portfolio_id) & (PlanModel.is_default == True)
)
with db_read_session() as session:
session_any: Any = session # Typehint as Any to satisfy Pylance...
plans: List[PlanModel] = session_any.exec(plan_stmt).scalars().all()
return plans
def bulk_update_plans(
plan_models: List[PlanModel],
scenario_models: List[ScenarioModel],
) -> int:
if not plan_models:
return 0
with db_read_session() as session:
plan_mapper: Mapper[Any] = inspect(PlanModel)
scenario_mapper: Mapper[Any] = inspect(ScenarioModel)
plan_mappings: List[Dict[str, Any]] = (
[]
) # Typehint as Any to satisfy Pylance...
for plan in plan_models:
data: Dict[str, Any] = {
c.name: getattr(plan, c.name)
for c in plan.__table__.columns
if c.name != "id"
}
data["id"] = plan.id
plan_mappings.append(data)
session.bulk_update_mappings(plan_mapper, plan_mappings)
scenario_mappings: List[Dict[str, Any]] = (
[]
) # Typehint as Any to satisfy Pylance...
for scenario in scenario_models:
data: Dict[str, Any] = {
c.name: getattr(scenario, c.name)
for c in scenario.__table__.columns
if c.name not in {"id", "portfolio_id"}
}
data["id"] = scenario.id
scenario_mappings.append(data)
session.bulk_update_mappings(scenario_mapper, scenario_mappings)
session.commit()
return len(plan_models)