Model/backend/app/db/functions/recommendations_functions.py
2024-10-02 20:19:25 +01:00

205 lines
8.1 KiB
Python

from sqlalchemy import insert, delete
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from backend.app.db.models.recommendations import (
Plan, Recommendation, RecommendationMaterials, PlanRecommendations, Scenario
)
from backend.app.db.models.portfolio import (
PropertyModel, PropertyTargetsModel, PropertyDetailsMeter, PropertyDetailsEpcModel
)
def create_plan(session: Session, plan):
"""
This function will create a record for the plan in the database if it does not exist.
:param session: The database session
:param plan: dictionary of data representing a plan to be created
"""
try:
new_plan = Plan(**plan)
session.add(new_plan)
session.flush()
session.commit()
return new_plan.id
except SQLAlchemyError as e:
session.rollback()
raise e
def create_scenario(session: Session, scenario):
"""
This function will create a record for the scenario in the database if it does not exist.
:param session: The database session
:param scenario: dictionary of data representing a scenario to be created
"""
try:
# Before creating a new scenario, we check if there is a scenario for this portfolio id already
# If there is, it means that any new scnario created will NOT be the default scenario
existing_scenario = session.query(Scenario).filter_by(portfolio_id=scenario["portfolio_id"]).first()
scenario["is_default"] = True if not existing_scenario else False
new_scenario = Scenario(**scenario)
session.add(new_scenario)
session.flush()
session.commit()
return new_scenario
except SQLAlchemyError as e:
session.rollback()
raise e
def create_recommendation(session: Session, recommendation):
"""
This function will create a record for the recommendation in the database if it does not exist.
:param session: The database session
:param recommendation: dictionary of data representing a recommendation to be created
"""
try:
new_recommendation = Recommendation(**recommendation)
session.add(new_recommendation)
session.flush()
session.commit()
return new_recommendation.id
except SQLAlchemyError as e:
session.rollback()
raise e
def create_recommendation_material(session: Session, recommendation_id, material_id, depth):
"""
This function will create a record for the recommendation_material in the database if it does not exist.
:param session: The databse session
:param recommendation_id: ID of the recommendation
:param material_id: ID of the material
:param depth: depth of the material, may be null if a material where depth is not applicable
"""
new_recommendation_material = RecommendationMaterials(
recommendation_id=recommendation_id,
material_id=material_id,
depth=depth
)
session.add(new_recommendation_material)
session.flush()
return new_recommendation_material.id
def create_plan_recommendations(session: Session, plan_id, recommendation_ids):
"""
This function will create records for the plan_recommendation in the database.
:param session: The database session
:param plan_id: ID of the plan
:param recommendation_ids: list of recommendation IDs
"""
# Prepare a list of dictionaries for bulk insert
data = [{"plan_id": plan_id, "recommendation_id": rid} for rid in recommendation_ids]
# Bulk insert using SQLAlchemy's core API
session.execute(insert(PlanRecommendations).values(data))
def upload_recommendations(session: Session, recommendations_to_upload, property_id, new_plan_id):
try:
# Prepare data for bulk insert for Recommendation
recommendations_data = [
{
"property_id": property_id,
"type": rec["type"],
"description": rec["description"],
"estimated_cost": rec["total"],
"default": rec["default"],
"starting_u_value": rec.get("starting_u_value"),
"new_u_value": rec.get("new_u_value"),
"sap_points": rec["sap_points"],
"energy_savings": rec["heat_demand"],
"kwh_savings": rec["kwh_savings"],
"co2_equivalent_savings": rec["co2_equivalent_savings"],
"total_work_hours": rec["labour_hours"],
"energy_cost_savings": rec["energy_cost_savings"],
"labour_days": rec["labour_days"],
"already_installed": rec["already_installed"],
"head_demand": rec["heat_demand"]
}
for rec in recommendations_to_upload
]
# Insert the recommendations, get back the IDs
stmt = insert(Recommendation).returning(Recommendation.id).values(recommendations_data)
result = session.execute(stmt)
uploaded_recommendation_ids = [row[0] for row in result]
# Prepare data for bulk insert for RecommendationMaterials
recommendation_materials_data = [
{
"recommendation_id": recommendation_id,
"material_id": part["id"],
"depth": int(part["depth"]) if part["depth"] else None,
"quantity": part["quantity"],
"quantity_unit": part["quantity_unit"],
"estimated_cost": part["total"],
}
for rec, recommendation_id in zip(recommendations_to_upload, uploaded_recommendation_ids)
for part in rec["parts"]
]
session.bulk_insert_mappings(RecommendationMaterials, recommendation_materials_data)
# flush the changes to get the newly created IDs
session.flush()
create_plan_recommendations(
session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids
)
# Commit the transaction
session.commit()
return True
except SQLAlchemyError as e:
# Rollback the transaction in case of an error
session.rollback()
print(f"An error occurred: {e}")
return False
def clear_portfolio(session: Session, portfolio_id: int):
# Fetch all property IDs associated with the given portfolio
property_ids = session.query(PropertyModel.id).filter(PropertyModel.portfolio_id == portfolio_id).all()
property_ids = [p.id for p in property_ids]
# Fetch all recommendation IDs associated with the properties
recommendation_ids = session.query(Recommendation.id).filter(Recommendation.property_id.in_(property_ids)).all()
recommendation_ids = [r.id for r in recommendation_ids]
# Delete all entries from RecommendationMaterials for these recommendations
session.execute(
delete(RecommendationMaterials).where(RecommendationMaterials.recommendation_id.in_(recommendation_ids))
)
# Delete all entries from PlanRecommendations that reference plans in the portfolio
session.execute(delete(PlanRecommendations).where(PlanRecommendations.plan_id.in_(
session.query(Plan.id).filter(Plan.portfolio_id == portfolio_id).subquery().as_scalar()
)))
# Delete all Plans associated with the portfolio
session.execute(delete(Plan).where(Plan.portfolio_id == portfolio_id))
# Delete all Scenarios associated with the portfolio
session.execute(delete(Scenario).where(Scenario.portfolio_id == portfolio_id))
# Delete all Recommendations associated with the properties
session.execute(delete(Recommendation).where(Recommendation.property_id.in_(property_ids)))
# Now, delete the PropertyModels and related details
# Delete PropertyTargetsModel, PropertyDetailsMeter, PropertyDetailsEpcModel, and PropertyModel
session.execute(delete(PropertyTargetsModel).where(PropertyTargetsModel.portfolio_id == portfolio_id))
# session.execute(delete(PropertyDetailsMeter).where(PropertyDetailsMeter.uprn.in_(property_ids)))
session.execute(delete(PropertyDetailsEpcModel).where(PropertyDetailsEpcModel.portfolio_id == portfolio_id))
session.execute(delete(PropertyModel).where(PropertyModel.portfolio_id == portfolio_id))
# Commit the changes
session.commit()