mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
622 lines
20 KiB
Python
622 lines
20 KiB
Python
from sqlalchemy import text
|
|
from sqlalchemy import insert, delete
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from backend.app.db.models.recommendations import (
|
|
Plan,
|
|
Recommendation,
|
|
RecommendationMaterials,
|
|
PlanRecommendations,
|
|
Scenario,
|
|
)
|
|
from backend.app.db.models.portfolio import PropertyModel
|
|
from backend.app.db.connection import db_session, db_read_session
|
|
|
|
|
|
def prepare_plan_data(
|
|
p,
|
|
body,
|
|
scenario_id,
|
|
eco_packages,
|
|
valuations,
|
|
new_sap_points,
|
|
new_epc,
|
|
default_recommendations,
|
|
rebaselining_carbon=0,
|
|
rebaselining_heat_demand=0,
|
|
rebaselining_kwh=0,
|
|
rebaselining_bills=0,
|
|
):
|
|
"""
|
|
Utility function to prepare the data that goes into the production of a plan. Is a fairly rough and unstructured
|
|
function that will need improving in the future
|
|
:param p: Instantiated property
|
|
:param body: request body, PlanTriggerRequest
|
|
:param scenario_id: unique identifier for the scenario
|
|
:param eco_packages: Pre-constructed eco packages for a property
|
|
:param valuations: valuation improvement data
|
|
:param new_sap_points: sap points, post default recommendations
|
|
:param new_epc: new epc rating, post default recommendations
|
|
:param default_recommendations: list of default recommendations for a property
|
|
:param rebaselining_carbon: carbon emissions adjustment for rebaselining
|
|
:param rebaselining_heat_demand: heat demand adjustment for rebaselining
|
|
:param rebaselining_kwh: kwh consumption adjustment for rebaselining
|
|
:param rebaselining_bills: energy bill adjustment for rebaselining
|
|
:return:
|
|
"""
|
|
# Plan carbon savings
|
|
co2_savings = sum(
|
|
[
|
|
r["co2_equivalent_savings"]
|
|
for r in default_recommendations
|
|
if not r.get("already_installed", False)
|
|
]
|
|
)
|
|
post_co2_emissions = p.energy["co2_emissions"] - rebaselining_carbon - co2_savings
|
|
|
|
# Plan bill savings
|
|
energy_bill_savings = sum(
|
|
[
|
|
r["energy_cost_savings"]
|
|
for r in default_recommendations
|
|
if not r.get("already_installed", False)
|
|
]
|
|
)
|
|
post_energy_bill = (
|
|
sum(p.current_energy_bill.values()) - rebaselining_bills - energy_bill_savings
|
|
)
|
|
|
|
# energy consumption
|
|
energy_consumption_savings = sum(
|
|
[
|
|
r["kwh_savings"]
|
|
for r in default_recommendations
|
|
if not r.get("already_installed", False)
|
|
]
|
|
)
|
|
post_energy_consumption = (
|
|
p.current_energy_consumption - rebaselining_kwh - energy_consumption_savings
|
|
)
|
|
|
|
valuation_post_retrofit, valuation_increase = None, None
|
|
if valuations["current_value"]:
|
|
valuation_increase = valuations["average_increase"]
|
|
valuation_post_retrofit = valuations["average_increased_value"]
|
|
|
|
# plan costing data
|
|
cost_of_works = sum(
|
|
[
|
|
r["total"]
|
|
for r in default_recommendations
|
|
if not r.get("already_installed", False)
|
|
]
|
|
)
|
|
contingency_cost = sum(
|
|
[
|
|
r.get("contingency", 0)
|
|
for r in default_recommendations
|
|
if not r.get("already_installed", False)
|
|
]
|
|
)
|
|
|
|
return {
|
|
"portfolio_id": body.portfolio_id,
|
|
"property_id": p.id,
|
|
"scenario_id": scenario_id,
|
|
"is_default": True if p.is_new else False,
|
|
"name": body.scenario_name,
|
|
"valuation_increase_lower_bound": (
|
|
valuations["lower_bound_increased_value"] - valuations["current_value"]
|
|
),
|
|
"valuation_increase_upper_bound": (
|
|
valuations["upper_bound_increased_value"] - valuations["current_value"]
|
|
),
|
|
"valuation_increase_average": (
|
|
valuations["average_increased_value"] - valuations["current_value"]
|
|
),
|
|
"post_sap_points": float(new_sap_points),
|
|
"post_epc_rating": new_epc,
|
|
"post_co2_emissions": float(post_co2_emissions),
|
|
"co2_savings": float(co2_savings),
|
|
"post_energy_bill": float(post_energy_bill),
|
|
"energy_bill_savings": float(energy_bill_savings),
|
|
"post_energy_consumption": float(post_energy_consumption),
|
|
"energy_consumption_savings": float(energy_consumption_savings),
|
|
"valuation_post_retrofit": valuation_post_retrofit,
|
|
"valuation_increase": valuation_increase,
|
|
"cost_of_works": float(cost_of_works),
|
|
"contingency_cost": float(contingency_cost),
|
|
"plan_type": eco_packages.get(p.id, (None, None, None))[2],
|
|
}
|
|
|
|
|
|
def create_plan(session: Session, plan):
|
|
"""
|
|
This function will create a record for the plan in the database if it does not exist.
|
|
:param session: The database session
|
|
:param plan: dictionary of data representing a plan to be created
|
|
"""
|
|
try:
|
|
new_plan = Plan(**plan)
|
|
session.add(new_plan)
|
|
session.flush()
|
|
session.commit()
|
|
return new_plan.id
|
|
except SQLAlchemyError as e:
|
|
session.rollback()
|
|
raise e
|
|
|
|
|
|
def bulk_create_plans(session: Session, plans_to_create: list[dict]) -> dict[int, int]:
|
|
if not plans_to_create:
|
|
return {}
|
|
|
|
payload = [
|
|
{
|
|
"property_id": p["property_id"],
|
|
**p["plan_data"],
|
|
}
|
|
for p in plans_to_create
|
|
]
|
|
|
|
stmt = insert(Plan).values(payload).returning(Plan.id, Plan.property_id)
|
|
|
|
result = session.execute(stmt).all()
|
|
|
|
# property_id -> plan_id
|
|
return {row.property_id: row.id for row in result}
|
|
|
|
|
|
def create_scenario(session: Session, scenario: dict) -> int:
|
|
existing_scenario = (
|
|
session.query(Scenario).filter_by(portfolio_id=scenario["portfolio_id"]).first()
|
|
)
|
|
|
|
scenario["is_default"] = not bool(existing_scenario)
|
|
|
|
new_scenario = Scenario(**scenario)
|
|
session.add(new_scenario)
|
|
session.flush() # ensures ID is populated
|
|
|
|
scenario_id = new_scenario.id
|
|
session.commit()
|
|
|
|
return scenario_id
|
|
|
|
|
|
def create_recommendation(session: Session, recommendation):
|
|
"""
|
|
This function will create a record for the recommendation in the database if it does not exist.
|
|
:param session: The database session
|
|
:param recommendation: dictionary of data representing a recommendation to be created
|
|
"""
|
|
try:
|
|
new_recommendation = Recommendation(**recommendation)
|
|
session.add(new_recommendation)
|
|
session.flush()
|
|
session.commit()
|
|
return new_recommendation.id
|
|
except SQLAlchemyError as e:
|
|
session.rollback()
|
|
raise e
|
|
|
|
|
|
def create_recommendation_material(
|
|
session: Session, recommendation_id, material_id, depth
|
|
):
|
|
"""
|
|
This function will create a record for the recommendation_material in the database if it does not exist.
|
|
:param session: The databse session
|
|
:param recommendation_id: ID of the recommendation
|
|
:param material_id: ID of the material
|
|
:param depth: depth of the material, may be null if a material where depth is not applicable
|
|
"""
|
|
|
|
new_recommendation_material = RecommendationMaterials(
|
|
recommendation_id=recommendation_id, material_id=material_id, depth=depth
|
|
)
|
|
session.add(new_recommendation_material)
|
|
session.flush()
|
|
|
|
return new_recommendation_material.id
|
|
|
|
|
|
def create_plan_recommendations(session: Session, plan_id, recommendation_ids):
|
|
"""
|
|
This function will create records for the plan_recommendation in the database.
|
|
:param session: The database session
|
|
:param plan_id: ID of the plan
|
|
:param recommendation_ids: list of recommendation IDs
|
|
"""
|
|
|
|
# Prepare a list of dictionaries for bulk insert
|
|
data = [
|
|
{"plan_id": plan_id, "recommendation_id": rid} for rid in recommendation_ids
|
|
]
|
|
|
|
# Bulk insert using SQLAlchemy's core API
|
|
session.execute(insert(PlanRecommendations).values(data))
|
|
|
|
|
|
def upload_recommendations(
|
|
session: Session, recommendations_to_upload, property_id, new_plan_id
|
|
):
|
|
try:
|
|
# Prepare data for bulk insert for Recommendation
|
|
recommendations_data = [
|
|
{
|
|
"property_id": property_id,
|
|
"type": rec["type"],
|
|
"measure_type": rec["measure_type"],
|
|
"description": rec["description"],
|
|
"estimated_cost": float(rec["total"]),
|
|
"default": rec["default"],
|
|
"starting_u_value": (
|
|
float(rec.get("starting_u_value"))
|
|
if rec.get("starting_u_value")
|
|
else None
|
|
),
|
|
"new_u_value": (
|
|
float(rec.get("new_u_value")) if rec.get("new_u_value") else None
|
|
),
|
|
"sap_points": float(rec["sap_points"]),
|
|
"energy_savings": float(rec["heat_demand"]),
|
|
"kwh_savings": float(rec["kwh_savings"]),
|
|
"co2_equivalent_savings": float(rec["co2_equivalent_savings"]),
|
|
"total_work_hours": float(rec["labour_hours"]),
|
|
"energy_cost_savings": float(rec["energy_cost_savings"]),
|
|
"labour_days": float(rec["labour_days"]),
|
|
"already_installed": rec["already_installed"],
|
|
"heat_demand": float(rec["heat_demand"]),
|
|
}
|
|
for rec in recommendations_to_upload
|
|
]
|
|
|
|
# Insert the recommendations, get back the IDs
|
|
stmt = (
|
|
insert(Recommendation)
|
|
.returning(Recommendation.id)
|
|
.values(recommendations_data)
|
|
)
|
|
result = session.execute(stmt)
|
|
uploaded_recommendation_ids = [row[0] for row in result]
|
|
|
|
# Prepare data for bulk insert for RecommendationMaterials
|
|
recommendation_materials_data = [
|
|
{
|
|
"recommendation_id": recommendation_id,
|
|
"material_id": part["id"],
|
|
"depth": int(part["depth"]) if part["depth"] else None,
|
|
"quantity": float(part["quantity"]) if part.get("quantity") else None,
|
|
"quantity_unit": part.get("quantity_unit", None),
|
|
"estimated_cost": float(part.get("total", part.get("total_cost"))),
|
|
}
|
|
for rec, recommendation_id in zip(
|
|
recommendations_to_upload, uploaded_recommendation_ids
|
|
)
|
|
for part in rec["parts"]
|
|
]
|
|
|
|
session.bulk_insert_mappings(
|
|
RecommendationMaterials, recommendation_materials_data
|
|
)
|
|
|
|
# flush the changes to get the newly created IDs
|
|
session.flush()
|
|
|
|
create_plan_recommendations(
|
|
session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids
|
|
)
|
|
|
|
# Commit the transaction
|
|
session.commit()
|
|
|
|
return True
|
|
|
|
except SQLAlchemyError as e:
|
|
# Rollback the transaction in case of an error
|
|
session.rollback()
|
|
print(f"An error occurred: {e}")
|
|
return False
|
|
|
|
|
|
def bulk_upload_recommendations_and_materials(
|
|
session: Session,
|
|
recommendation_payload: list[dict],
|
|
):
|
|
if not recommendation_payload:
|
|
return
|
|
|
|
# ---------------------------------------------------------
|
|
# 1. Prepare recommendation rows
|
|
# ---------------------------------------------------------
|
|
recommendation_rows = []
|
|
parts_by_index = []
|
|
plan_ids_by_index = []
|
|
|
|
for rec in recommendation_payload:
|
|
recommendation_rows.append(
|
|
{
|
|
"property_id": rec["property_id"],
|
|
"type": rec["type"],
|
|
"measure_type": rec["measure_type"],
|
|
"description": rec["description"],
|
|
"estimated_cost": rec["estimated_cost"],
|
|
"default": rec["default"],
|
|
"starting_u_value": rec["starting_u_value"],
|
|
"new_u_value": rec["new_u_value"],
|
|
"sap_points": rec["sap_points"],
|
|
"heat_demand": rec["heat_demand"],
|
|
"kwh_savings": rec["kwh_savings"],
|
|
"co2_equivalent_savings": rec["co2_equivalent_savings"],
|
|
"energy_savings": rec["energy_savings"],
|
|
"energy_cost_savings": rec["energy_cost_savings"],
|
|
"total_work_hours": rec["total_work_hours"],
|
|
"labour_days": rec["labour_days"],
|
|
"already_installed": rec["already_installed"],
|
|
}
|
|
)
|
|
|
|
parts_by_index.append(rec["parts"])
|
|
plan_ids_by_index.append(rec["plan_id"])
|
|
|
|
# ---------------------------------------------------------
|
|
# 2. Insert recommendations and get IDs
|
|
# ---------------------------------------------------------
|
|
result = session.execute(
|
|
insert(Recommendation).values(recommendation_rows).returning(Recommendation.id)
|
|
)
|
|
|
|
recommendation_ids = [row[0] for row in result]
|
|
|
|
# ---------------------------------------------------------
|
|
# 3. Insert recommendation materials
|
|
# ---------------------------------------------------------
|
|
materials_rows = []
|
|
|
|
for recommendation_id, parts in zip(recommendation_ids, parts_by_index):
|
|
for part in parts:
|
|
materials_rows.append(
|
|
{
|
|
"recommendation_id": recommendation_id,
|
|
"material_id": part["material_id"],
|
|
"depth": part["depth"],
|
|
"quantity": part["quantity"],
|
|
"quantity_unit": part["quantity_unit"],
|
|
"estimated_cost": part["estimated_cost"],
|
|
}
|
|
)
|
|
|
|
if materials_rows:
|
|
session.execute(insert(RecommendationMaterials).values(materials_rows))
|
|
|
|
# ---------------------------------------------------------
|
|
# 4. Insert plan ↔ recommendation links
|
|
# ---------------------------------------------------------
|
|
plan_recommendation_rows = [
|
|
{
|
|
"plan_id": plan_id,
|
|
"recommendation_id": recommendation_id,
|
|
}
|
|
for plan_id, recommendation_id in zip(plan_ids_by_index, recommendation_ids)
|
|
]
|
|
|
|
session.execute(insert(PlanRecommendations).values(plan_recommendation_rows))
|
|
|
|
|
|
def chunked(iterable, size=100):
|
|
for i in range(0, len(iterable), size):
|
|
yield iterable[i : i + size]
|
|
|
|
|
|
def get_property_ids(portfolio_id: int) -> list[int]:
|
|
with db_read_session() as session:
|
|
return [
|
|
pid
|
|
for (pid,) in session.query(PropertyModel.id)
|
|
.filter(PropertyModel.portfolio_id == portfolio_id)
|
|
.all()
|
|
]
|
|
|
|
|
|
def delete_property_batch(session: Session, property_ids: list[int]):
|
|
if not property_ids:
|
|
return
|
|
|
|
params = {"property_ids": property_ids}
|
|
|
|
# --------------------------------------------------
|
|
# recommendation_materials (via recommendation)
|
|
# --------------------------------------------------
|
|
session.execute(
|
|
text(
|
|
"""
|
|
DELETE FROM recommendation_materials rm
|
|
USING recommendation r
|
|
WHERE rm.recommendation_id = r.id
|
|
AND r.property_id = ANY(:property_ids)
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
|
|
# --------------------------------------------------
|
|
# plan_recommendations (via plan)
|
|
# --------------------------------------------------
|
|
session.execute(
|
|
text(
|
|
"""
|
|
DELETE FROM plan_recommendations pr
|
|
USING plan p
|
|
WHERE pr.plan_id = p.id
|
|
AND p.property_id = ANY(:property_ids)
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
|
|
# --------------------------------------------------
|
|
# funding_package_measures
|
|
# --------------------------------------------------
|
|
session.execute(
|
|
text(
|
|
"""
|
|
DELETE FROM funding_package_measures fpm
|
|
USING funding_package fp, plan p
|
|
WHERE fpm.funding_package_id = fp.id
|
|
AND fp.plan_id = p.id
|
|
AND p.property_id = ANY(:property_ids)
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
|
|
# --------------------------------------------------
|
|
# inspections (direct)
|
|
# --------------------------------------------------
|
|
session.execute(
|
|
text(
|
|
"""
|
|
DELETE FROM inspections
|
|
WHERE property_id = ANY(:property_ids)
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
|
|
# --------------------------------------------------
|
|
# funding_package
|
|
# --------------------------------------------------
|
|
session.execute(
|
|
text(
|
|
"""
|
|
DELETE FROM funding_package fp
|
|
USING plan p
|
|
WHERE fp.plan_id = p.id
|
|
AND p.property_id = ANY(:property_ids)
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
|
|
# --------------------------------------------------
|
|
# recommendation (direct — CRITICAL FIX)
|
|
# --------------------------------------------------
|
|
session.execute(
|
|
text(
|
|
"""
|
|
DELETE FROM recommendation
|
|
WHERE property_id = ANY(:property_ids)
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
|
|
# --------------------------------------------------
|
|
# plan (direct)
|
|
# --------------------------------------------------
|
|
session.execute(
|
|
text(
|
|
"""
|
|
DELETE FROM plan
|
|
WHERE property_id = ANY(:property_ids)
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
|
|
# --------------------------------------------------
|
|
# property-scoped tables
|
|
# --------------------------------------------------
|
|
session.execute(
|
|
text(
|
|
"""
|
|
DELETE FROM property_details_epc
|
|
WHERE property_id = ANY(:property_ids)
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
|
|
session.execute(
|
|
text(
|
|
"""
|
|
DELETE FROM property_targets
|
|
WHERE property_id = ANY(:property_ids)
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
|
|
# --------------------------------------------------
|
|
# properties LAST
|
|
# --------------------------------------------------
|
|
session.execute(
|
|
text(
|
|
"""
|
|
DELETE FROM property
|
|
WHERE id = ANY(:property_ids)
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
|
|
|
|
def portfolio_has_properties(portfolio_id: int) -> bool:
|
|
with db_read_session() as session:
|
|
return session.query(
|
|
session.query(PropertyModel)
|
|
.filter(PropertyModel.portfolio_id == portfolio_id)
|
|
.exists()
|
|
).scalar()
|
|
|
|
|
|
def delete_portfolio_scenarios_if_empty(portfolio_id: int):
|
|
if portfolio_has_properties(portfolio_id):
|
|
print("Properties still exist — skipping scenario deletion")
|
|
return
|
|
|
|
with db_session() as session:
|
|
session.execute(delete(Scenario).where(Scenario.portfolio_id == portfolio_id))
|
|
|
|
print("Deleted scenarios for empty portfolio")
|
|
|
|
|
|
def clear_portfolio_in_batches(
|
|
portfolio_id: int,
|
|
property_batch_size: int = 25,
|
|
):
|
|
property_ids = get_property_ids(portfolio_id)
|
|
|
|
if not property_ids:
|
|
print("No properties found.")
|
|
delete_portfolio_scenarios_if_empty(portfolio_id)
|
|
return
|
|
|
|
total = (len(property_ids) + property_batch_size - 1) // property_batch_size
|
|
import time
|
|
|
|
for i, batch in enumerate(chunked(property_ids, property_batch_size), start=1):
|
|
print(f"Deleting batch {i}/{total} ({len(batch)} properties)")
|
|
start_time = time.time()
|
|
with db_session() as session:
|
|
delete_property_batch(session, batch)
|
|
finish_time = time.time()
|
|
print(f"Batch {i} deleted in {finish_time - start_time:.2f} seconds")
|
|
|
|
# scenario deletion happens AFTER all properties are gone
|
|
delete_portfolio_scenarios_if_empty(portfolio_id)
|
|
|
|
print("Portfolio cleared in batches.")
|
|
|
|
|
|
def get_plans_by_portfolio_id(portfolio_id: int) -> list[Plan]:
|
|
raise NotImplementedError
|
|
|
|
|
|
def get_scenario(scenario_id: int) -> list[Scenario]:
|
|
raise NotImplementedError
|
|
|
|
|
|
def set_plan_default(plan_id: int, is_default: bool) -> bool:
|
|
raise NotImplementedError
|