Model/backend/app/db/functions/recommendations_functions.py
2026-01-10 18:52:26 +00:00

544 lines
19 KiB
Python

from sqlalchemy import text
from sqlalchemy import insert, delete
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from backend.app.db.models.recommendations import (
Plan, Recommendation, RecommendationMaterials, PlanRecommendations, Scenario
)
from backend.app.db.models.portfolio import PropertyModel
from backend.app.db.connection import db_session, db_read_session
def prepare_plan_data(
p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc, default_recommendations,
rebaselining_carbon=0, rebaselining_heat_demand=0, rebaselining_kwh=0, rebaselining_bills=0,
):
"""
Utility function to prepare the data that goes into the production of a plan. Is a fairly rough and unstructured
function that will need improving in the future
:param p: Instantiated property
:param body: request body, PlanTriggerRequest
:param scenario_id: unique identifier for the scenario
:param eco_packages: Pre-constructed eco packages for a property
:param valuations: valuation improvement data
:param new_sap_points: sap points, post default recommendations
:param new_epc: new epc rating, post default recommendations
:param default_recommendations: list of default recommendations for a property
:param rebaselining_carbon: carbon emissions adjustment for rebaselining
:param rebaselining_heat_demand: heat demand adjustment for rebaselining
:param rebaselining_kwh: kwh consumption adjustment for rebaselining
:param rebaselining_bills: energy bill adjustment for rebaselining
:return:
"""
# Plan carbon savings
co2_savings = sum(
[r["co2_equivalent_savings"] for r in default_recommendations if not r.get("already_installed", False)]
)
post_co2_emissions = p.energy["co2_emissions"] - rebaselining_carbon - co2_savings
# Plan bill savings
energy_bill_savings = sum(
[r["energy_cost_savings"] for r in default_recommendations if not r.get("already_installed", False)]
)
post_energy_bill = sum(p.current_energy_bill.values()) - rebaselining_bills - energy_bill_savings
# energy consumption
energy_consumption_savings = sum(
[r["kwh_savings"] for r in default_recommendations if not r.get("already_installed", False)]
)
post_energy_consumption = p.current_energy_consumption - rebaselining_kwh - energy_consumption_savings
valuation_post_retrofit, valuation_increase = None, None
if valuations["current_value"]:
valuation_increase = valuations["average_increase"]
valuation_post_retrofit = valuations["average_increased_value"]
# plan costing data
cost_of_works = sum([r["total"] for r in default_recommendations if not r.get("already_installed", False)])
contingency_cost = sum(
[r.get("contingency", 0) for r in default_recommendations if not r.get("already_installed", False)]
)
return {
"portfolio_id": body.portfolio_id,
"property_id": p.id,
"scenario_id": scenario_id,
"is_default": True if p.is_new else False,
"name": body.scenario_name,
"valuation_increase_lower_bound": (
valuations["lower_bound_increased_value"] - valuations["current_value"]
),
"valuation_increase_upper_bound": (
valuations["upper_bound_increased_value"] - valuations["current_value"]
),
"valuation_increase_average": (
valuations["average_increased_value"] - valuations["current_value"]
),
"post_sap_points": float(new_sap_points),
"post_epc_rating": new_epc,
"post_co2_emissions": float(post_co2_emissions),
"co2_savings": float(co2_savings),
"post_energy_bill": float(post_energy_bill),
"energy_bill_savings": float(energy_bill_savings),
"post_energy_consumption": float(post_energy_consumption),
"energy_consumption_savings": float(energy_consumption_savings),
"valuation_post_retrofit": valuation_post_retrofit,
"valuation_increase": valuation_increase,
"cost_of_works": float(cost_of_works),
"contingency_cost": float(contingency_cost),
"plan_type": eco_packages.get(p.id, (None, None, None))[2]
}
def create_plan(session: Session, plan):
"""
This function will create a record for the plan in the database if it does not exist.
:param session: The database session
:param plan: dictionary of data representing a plan to be created
"""
try:
new_plan = Plan(**plan)
session.add(new_plan)
session.flush()
session.commit()
return new_plan.id
except SQLAlchemyError as e:
session.rollback()
raise e
def bulk_create_plans(session: Session, plans_to_create: list[dict]) -> dict[int, int]:
if not plans_to_create:
return {}
payload = [
{
"property_id": p["property_id"],
**p["plan_data"],
}
for p in plans_to_create
]
stmt = (
insert(Plan)
.values(payload)
.returning(Plan.id, Plan.property_id)
)
result = session.execute(stmt).all()
# property_id -> plan_id
return {row.property_id: row.id for row in result}
def create_scenario(session: Session, scenario: dict) -> int:
existing_scenario = (
session.query(Scenario)
.filter_by(portfolio_id=scenario["portfolio_id"])
.first()
)
scenario["is_default"] = not bool(existing_scenario)
new_scenario = Scenario(**scenario)
session.add(new_scenario)
session.flush() # ensures ID is populated
scenario_id = new_scenario.id
session.commit()
return scenario_id
def create_recommendation(session: Session, recommendation):
"""
This function will create a record for the recommendation in the database if it does not exist.
:param session: The database session
:param recommendation: dictionary of data representing a recommendation to be created
"""
try:
new_recommendation = Recommendation(**recommendation)
session.add(new_recommendation)
session.flush()
session.commit()
return new_recommendation.id
except SQLAlchemyError as e:
session.rollback()
raise e
def create_recommendation_material(session: Session, recommendation_id, material_id, depth):
"""
This function will create a record for the recommendation_material in the database if it does not exist.
:param session: The databse session
:param recommendation_id: ID of the recommendation
:param material_id: ID of the material
:param depth: depth of the material, may be null if a material where depth is not applicable
"""
new_recommendation_material = RecommendationMaterials(
recommendation_id=recommendation_id,
material_id=material_id,
depth=depth
)
session.add(new_recommendation_material)
session.flush()
return new_recommendation_material.id
def create_plan_recommendations(session: Session, plan_id, recommendation_ids):
"""
This function will create records for the plan_recommendation in the database.
:param session: The database session
:param plan_id: ID of the plan
:param recommendation_ids: list of recommendation IDs
"""
# Prepare a list of dictionaries for bulk insert
data = [{"plan_id": plan_id, "recommendation_id": rid} for rid in recommendation_ids]
# Bulk insert using SQLAlchemy's core API
session.execute(insert(PlanRecommendations).values(data))
def upload_recommendations(session: Session, recommendations_to_upload, property_id, new_plan_id):
try:
# Prepare data for bulk insert for Recommendation
recommendations_data = [
{
"property_id": property_id,
"type": rec["type"],
"measure_type": rec["measure_type"],
"description": rec["description"],
"estimated_cost": float(rec["total"]),
"default": rec["default"],
"starting_u_value": float(rec.get("starting_u_value")) if rec.get("starting_u_value") else None,
"new_u_value": float(rec.get("new_u_value")) if rec.get("new_u_value") else None,
"sap_points": float(rec["sap_points"]),
"energy_savings": float(rec["heat_demand"]),
"kwh_savings": float(rec["kwh_savings"]),
"co2_equivalent_savings": float(rec["co2_equivalent_savings"]),
"total_work_hours": float(rec["labour_hours"]),
"energy_cost_savings": float(rec["energy_cost_savings"]),
"labour_days": float(rec["labour_days"]),
"already_installed": rec["already_installed"],
"heat_demand": float(rec["heat_demand"])
}
for rec in recommendations_to_upload
]
# Insert the recommendations, get back the IDs
stmt = insert(Recommendation).returning(Recommendation.id).values(recommendations_data)
result = session.execute(stmt)
uploaded_recommendation_ids = [row[0] for row in result]
# Prepare data for bulk insert for RecommendationMaterials
recommendation_materials_data = [
{
"recommendation_id": recommendation_id,
"material_id": part["id"],
"depth": int(part["depth"]) if part["depth"] else None,
"quantity": float(part["quantity"]) if part.get("quantity") else None,
"quantity_unit": part.get("quantity_unit", None),
"estimated_cost": float(part.get("total", part.get("total_cost"))),
}
for rec, recommendation_id in zip(recommendations_to_upload, uploaded_recommendation_ids)
for part in rec["parts"]
]
session.bulk_insert_mappings(RecommendationMaterials, recommendation_materials_data)
# flush the changes to get the newly created IDs
session.flush()
create_plan_recommendations(
session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids
)
# Commit the transaction
session.commit()
return True
except SQLAlchemyError as e:
# Rollback the transaction in case of an error
session.rollback()
print(f"An error occurred: {e}")
return False
def bulk_upload_recommendations_and_materials(
session: Session,
recommendation_payload: list[dict],
):
if not recommendation_payload:
return
# ---------------------------------------------------------
# 1. Prepare recommendation rows
# ---------------------------------------------------------
recommendation_rows = []
parts_by_index = []
plan_ids_by_index = []
for rec in recommendation_payload:
recommendation_rows.append({
"property_id": rec["property_id"],
"type": rec["type"],
"measure_type": rec["measure_type"],
"description": rec["description"],
"estimated_cost": rec["estimated_cost"],
"default": rec["default"],
"starting_u_value": rec["starting_u_value"],
"new_u_value": rec["new_u_value"],
"sap_points": rec["sap_points"],
"heat_demand": rec["heat_demand"],
"kwh_savings": rec["kwh_savings"],
"co2_equivalent_savings": rec["co2_equivalent_savings"],
"energy_savings": rec["energy_savings"],
"energy_cost_savings": rec["energy_cost_savings"],
"total_work_hours": rec["total_work_hours"],
"labour_days": rec["labour_days"],
"already_installed": rec["already_installed"],
})
parts_by_index.append(rec["parts"])
plan_ids_by_index.append(rec["plan_id"])
# ---------------------------------------------------------
# 2. Insert recommendations and get IDs
# ---------------------------------------------------------
result = session.execute(
insert(Recommendation)
.values(recommendation_rows)
.returning(Recommendation.id)
)
recommendation_ids = [row[0] for row in result]
# ---------------------------------------------------------
# 3. Insert recommendation materials
# ---------------------------------------------------------
materials_rows = []
for recommendation_id, parts in zip(recommendation_ids, parts_by_index):
for part in parts:
materials_rows.append({
"recommendation_id": recommendation_id,
"material_id": part["material_id"],
"depth": part["depth"],
"quantity": part["quantity"],
"quantity_unit": part["quantity_unit"],
"estimated_cost": part["estimated_cost"],
})
if materials_rows:
session.execute(
insert(RecommendationMaterials).values(materials_rows)
)
# ---------------------------------------------------------
# 4. Insert plan ↔ recommendation links
# ---------------------------------------------------------
plan_recommendation_rows = [
{
"plan_id": plan_id,
"recommendation_id": recommendation_id,
}
for plan_id, recommendation_id in zip(
plan_ids_by_index, recommendation_ids
)
]
session.execute(
insert(PlanRecommendations).values(plan_recommendation_rows)
)
def chunked(iterable, size=100):
for i in range(0, len(iterable), size):
yield iterable[i:i + size]
def get_property_ids(portfolio_id: int) -> list[int]:
with db_read_session() as session:
return [
pid for (pid,) in
session.query(PropertyModel.id)
.filter(PropertyModel.portfolio_id == portfolio_id)
.all()
]
def delete_property_batch(session: Session, property_ids: list[int]):
if not property_ids:
return
params = {"property_ids": property_ids}
# --------------------------------------------------
# recommendation_materials (via recommendation)
# --------------------------------------------------
session.execute(
text("""
DELETE FROM recommendation_materials rm
USING recommendation r
WHERE rm.recommendation_id = r.id
AND r.property_id = ANY(:property_ids)
"""),
params,
)
# --------------------------------------------------
# plan_recommendations (via plan)
# --------------------------------------------------
session.execute(
text("""
DELETE FROM plan_recommendations pr
USING plan p
WHERE pr.plan_id = p.id
AND p.property_id = ANY(:property_ids)
"""),
params,
)
# --------------------------------------------------
# funding_package_measures
# --------------------------------------------------
session.execute(
text("""
DELETE FROM funding_package_measures fpm
USING funding_package fp, plan p
WHERE fpm.funding_package_id = fp.id
AND fp.plan_id = p.id
AND p.property_id = ANY(:property_ids)
"""),
params,
)
# --------------------------------------------------
# inspections (direct)
# --------------------------------------------------
session.execute(
text("""
DELETE FROM inspections
WHERE property_id = ANY(:property_ids)
"""),
params,
)
# --------------------------------------------------
# funding_package
# --------------------------------------------------
session.execute(
text("""
DELETE FROM funding_package fp
USING plan p
WHERE fp.plan_id = p.id
AND p.property_id = ANY(:property_ids)
"""),
params,
)
# --------------------------------------------------
# recommendation (direct — CRITICAL FIX)
# --------------------------------------------------
session.execute(
text("""
DELETE FROM recommendation
WHERE property_id = ANY(:property_ids)
"""),
params,
)
# --------------------------------------------------
# plan (direct)
# --------------------------------------------------
session.execute(
text("""
DELETE FROM plan
WHERE property_id = ANY(:property_ids)
"""),
params,
)
# --------------------------------------------------
# property-scoped tables
# --------------------------------------------------
session.execute(
text("""
DELETE FROM property_details_epc
WHERE property_id = ANY(:property_ids)
"""),
params,
)
session.execute(
text("""
DELETE FROM property_targets
WHERE property_id = ANY(:property_ids)
"""),
params,
)
# --------------------------------------------------
# properties LAST
# --------------------------------------------------
session.execute(
text("""
DELETE FROM property
WHERE id = ANY(:property_ids)
"""),
params,
)
def portfolio_has_properties(portfolio_id: int) -> bool:
with db_read_session() as session:
return session.query(
session.query(PropertyModel)
.filter(PropertyModel.portfolio_id == portfolio_id)
.exists()
).scalar()
def delete_portfolio_scenarios_if_empty(portfolio_id: int):
if portfolio_has_properties(portfolio_id):
print("Properties still exist — skipping scenario deletion")
return
with db_session() as session:
session.execute(
delete(Scenario)
.where(Scenario.portfolio_id == portfolio_id)
)
print("Deleted scenarios for empty portfolio")
def clear_portfolio_in_batches(
portfolio_id: int,
property_batch_size: int = 25,
):
property_ids = get_property_ids(portfolio_id)
if not property_ids:
print("No properties found.")
delete_portfolio_scenarios_if_empty(portfolio_id)
return
total = (len(property_ids) + property_batch_size - 1) // property_batch_size
import time
for i, batch in enumerate(chunked(property_ids, property_batch_size), start=1):
print(f"Deleting batch {i}/{total} ({len(batch)} properties)")
start_time = time.time()
with db_session() as session:
delete_property_batch(session, batch)
finish_time = time.time()
print(f"Batch {i} deleted in {finish_time - start_time:.2f} seconds")
# scenario deletion happens AFTER all properties are gone
delete_portfolio_scenarios_if_empty(portfolio_id)
print("Portfolio cleared in batches.")