from typing import Any, Dict, List, Optional from sqlalchemy import inspect, text, insert, delete, select, update from sqlalchemy.orm import Session, Mapper from sqlalchemy.exc import SQLAlchemyError from sqlmodel import Session from backend.app.db.models.recommendations import ( PlanModel, Recommendation, RecommendationMaterials, PlanRecommendations, ScenarioModel, ) from backend.app.db.models.portfolio import PropertyModel from backend.app.db.connection import db_session, db_read_session def prepare_plan_data( p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc, default_recommendations, rebaselining_carbon=0, rebaselining_heat_demand=0, rebaselining_kwh=0, rebaselining_bills=0, ): """ Utility function to prepare the data that goes into the production of a plan. Is a fairly rough and unstructured function that will need improving in the future :param p: Instantiated property :param body: request body, PlanTriggerRequest :param scenario_id: unique identifier for the scenario :param eco_packages: Pre-constructed eco packages for a property :param valuations: valuation improvement data :param new_sap_points: sap points, post default recommendations :param new_epc: new epc rating, post default recommendations :param default_recommendations: list of default recommendations for a property :param rebaselining_carbon: carbon emissions adjustment for rebaselining :param rebaselining_heat_demand: heat demand adjustment for rebaselining :param rebaselining_kwh: kwh consumption adjustment for rebaselining :param rebaselining_bills: energy bill adjustment for rebaselining :return: """ # Plan carbon savings co2_savings = sum( [ r["co2_equivalent_savings"] for r in default_recommendations if not r.get("already_installed", False) ] ) post_co2_emissions = p.energy["co2_emissions"] - rebaselining_carbon - co2_savings # Plan bill savings energy_bill_savings = sum( [ r["energy_cost_savings"] for r in default_recommendations if not r.get("already_installed", False) ] ) post_energy_bill = ( sum(p.current_energy_bill.values()) - rebaselining_bills - energy_bill_savings ) # energy consumption energy_consumption_savings = sum( [ r["kwh_savings"] for r in default_recommendations if not r.get("already_installed", False) ] ) post_energy_consumption = ( p.current_energy_consumption - rebaselining_kwh - energy_consumption_savings ) valuation_post_retrofit, valuation_increase = None, None if valuations["current_value"]: valuation_increase = valuations["average_increase"] valuation_post_retrofit = valuations["average_increased_value"] # plan costing data cost_of_works = sum( [ r["total"] for r in default_recommendations if not r.get("already_installed", False) ] ) contingency_cost = sum( [ r.get("contingency", 0) for r in default_recommendations if not r.get("already_installed", False) ] ) return { "portfolio_id": body.portfolio_id, "property_id": p.id, "scenario_id": scenario_id, "is_default": True if p.is_new else False, "name": body.scenario_name, "valuation_increase_lower_bound": ( valuations["lower_bound_increased_value"] - valuations["current_value"] ), "valuation_increase_upper_bound": ( valuations["upper_bound_increased_value"] - valuations["current_value"] ), "valuation_increase_average": ( valuations["average_increased_value"] - valuations["current_value"] ), "post_sap_points": float(new_sap_points), "post_epc_rating": new_epc, "post_co2_emissions": float(post_co2_emissions), "co2_savings": float(co2_savings), "post_energy_bill": float(post_energy_bill), "energy_bill_savings": float(energy_bill_savings), "post_energy_consumption": float(post_energy_consumption), "energy_consumption_savings": float(energy_consumption_savings), "valuation_post_retrofit": valuation_post_retrofit, "valuation_increase": valuation_increase, "cost_of_works": float(cost_of_works), "contingency_cost": float(contingency_cost), "plan_type": eco_packages.get(p.id, (None, None, None))[2], } def create_plan(session: Session, plan): """ This function will create a record for the plan in the database if it does not exist. :param session: The database session :param plan: dictionary of data representing a plan to be created """ try: new_plan = PlanModel(**plan) session.add(new_plan) session.flush() session.commit() return new_plan.id except SQLAlchemyError as e: session.rollback() raise e def bulk_create_plans(session: Session, plans_to_create: list[dict]) -> dict[int, int]: if not plans_to_create: return {} payload = [ { "property_id": p["property_id"], **p["plan_data"], } for p in plans_to_create ] stmt = ( insert(PlanModel).values(payload).returning(PlanModel.id, PlanModel.property_id) ) result = session.execute(stmt).all() # property_id -> plan_id return {row.property_id: row.id for row in result} def create_scenario(session: Session, scenario: dict) -> int: existing_scenario = ( session.query(ScenarioModel) .filter_by(portfolio_id=scenario["portfolio_id"]) .first() ) scenario["is_default"] = not bool(existing_scenario) new_scenario = ScenarioModel(**scenario) session.add(new_scenario) session.flush() # ensures ID is populated scenario_id = new_scenario.id session.commit() return scenario_id def create_recommendation(session: Session, recommendation): """ This function will create a record for the recommendation in the database if it does not exist. :param session: The database session :param recommendation: dictionary of data representing a recommendation to be created """ try: new_recommendation = Recommendation(**recommendation) session.add(new_recommendation) session.flush() session.commit() return new_recommendation.id except SQLAlchemyError as e: session.rollback() raise e def create_recommendation_material( session: Session, recommendation_id, material_id, depth ): """ This function will create a record for the recommendation_material in the database if it does not exist. :param session: The databse session :param recommendation_id: ID of the recommendation :param material_id: ID of the material :param depth: depth of the material, may be null if a material where depth is not applicable """ new_recommendation_material = RecommendationMaterials( recommendation_id=recommendation_id, material_id=material_id, depth=depth ) session.add(new_recommendation_material) session.flush() return new_recommendation_material.id def create_plan_recommendations(session: Session, plan_id, recommendation_ids): """ This function will create records for the plan_recommendation in the database. :param session: The database session :param plan_id: ID of the plan :param recommendation_ids: list of recommendation IDs """ # Prepare a list of dictionaries for bulk insert data = [ {"plan_id": plan_id, "recommendation_id": rid} for rid in recommendation_ids ] # Bulk insert using SQLAlchemy's core API session.execute(insert(PlanRecommendations).values(data)) def upload_recommendations( session: Session, recommendations_to_upload, property_id, new_plan_id ): try: # Prepare data for bulk insert for Recommendation recommendations_data = [ { "property_id": property_id, "type": rec["type"], "measure_type": rec["measure_type"], "description": rec["description"], "estimated_cost": float(rec["total"]), "default": rec["default"], "starting_u_value": ( float(rec.get("starting_u_value")) if rec.get("starting_u_value") else None ), "new_u_value": ( float(rec.get("new_u_value")) if rec.get("new_u_value") else None ), "sap_points": float(rec["sap_points"]), "energy_savings": float(rec["heat_demand"]), "kwh_savings": float(rec["kwh_savings"]), "co2_equivalent_savings": float(rec["co2_equivalent_savings"]), "total_work_hours": float(rec["labour_hours"]), "energy_cost_savings": float(rec["energy_cost_savings"]), "labour_days": float(rec["labour_days"]), "already_installed": rec["already_installed"], "heat_demand": float(rec["heat_demand"]), } for rec in recommendations_to_upload ] # Insert the recommendations, get back the IDs stmt = ( insert(Recommendation) .returning(Recommendation.id) .values(recommendations_data) ) result = session.execute(stmt) uploaded_recommendation_ids = [row[0] for row in result] # Prepare data for bulk insert for RecommendationMaterials recommendation_materials_data = [ { "recommendation_id": recommendation_id, "material_id": part["id"], "depth": int(part["depth"]) if part["depth"] else None, "quantity": float(part["quantity"]) if part.get("quantity") else None, "quantity_unit": part.get("quantity_unit", None), "estimated_cost": float(part.get("total", part.get("total_cost"))), } for rec, recommendation_id in zip( recommendations_to_upload, uploaded_recommendation_ids ) for part in rec["parts"] ] session.bulk_insert_mappings( RecommendationMaterials, recommendation_materials_data ) # flush the changes to get the newly created IDs session.flush() create_plan_recommendations( session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids ) # Commit the transaction session.commit() return True except SQLAlchemyError as e: # Rollback the transaction in case of an error session.rollback() print(f"An error occurred: {e}") return False def bulk_upload_recommendations_and_materials( session: Session, recommendation_payload: list[dict], ): if not recommendation_payload: return # --------------------------------------------------------- # 1. Prepare recommendation rows # --------------------------------------------------------- recommendation_rows = [] parts_by_index = [] plan_ids_by_index = [] for rec in recommendation_payload: recommendation_rows.append( { "property_id": rec["property_id"], "type": rec["type"], "measure_type": rec["measure_type"], "description": rec["description"], "estimated_cost": rec["estimated_cost"], "default": rec["default"], "starting_u_value": rec["starting_u_value"], "new_u_value": rec["new_u_value"], "sap_points": rec["sap_points"], "heat_demand": rec["heat_demand"], "kwh_savings": rec["kwh_savings"], "co2_equivalent_savings": rec["co2_equivalent_savings"], "energy_savings": rec["energy_savings"], "energy_cost_savings": rec["energy_cost_savings"], "total_work_hours": rec["total_work_hours"], "labour_days": rec["labour_days"], "already_installed": rec["already_installed"], } ) parts_by_index.append(rec["parts"]) plan_ids_by_index.append(rec["plan_id"]) # --------------------------------------------------------- # 2. Insert recommendations and get IDs # --------------------------------------------------------- result = session.execute( insert(Recommendation).values(recommendation_rows).returning(Recommendation.id) ) recommendation_ids = [row[0] for row in result] # --------------------------------------------------------- # 3. Insert recommendation materials # --------------------------------------------------------- materials_rows = [] for recommendation_id, parts in zip(recommendation_ids, parts_by_index): for part in parts: materials_rows.append( { "recommendation_id": recommendation_id, "material_id": part["material_id"], "depth": part["depth"], "quantity": part["quantity"], "quantity_unit": part["quantity_unit"], "estimated_cost": part["estimated_cost"], } ) if materials_rows: session.execute(insert(RecommendationMaterials).values(materials_rows)) # --------------------------------------------------------- # 4. Insert plan ↔ recommendation links # --------------------------------------------------------- plan_recommendation_rows = [ { "plan_id": plan_id, "recommendation_id": recommendation_id, } for plan_id, recommendation_id in zip(plan_ids_by_index, recommendation_ids) ] session.execute(insert(PlanRecommendations).values(plan_recommendation_rows)) def chunked(iterable, size=100): for i in range(0, len(iterable), size): yield iterable[i : i + size] def get_property_ids(portfolio_id: int) -> list[int]: with db_read_session() as session: return [ pid for (pid,) in session.query(PropertyModel.id) .filter(PropertyModel.portfolio_id == portfolio_id) .all() ] def delete_property_batch(session: Session, property_ids: list[int]): if not property_ids: return params = {"property_ids": property_ids} # -------------------------------------------------- # recommendation_materials (via recommendation) # -------------------------------------------------- session.execute( text( """ DELETE FROM recommendation_materials rm USING recommendation r WHERE rm.recommendation_id = r.id AND r.property_id = ANY(:property_ids) """ ), params, ) # -------------------------------------------------- # plan_recommendations (via plan) # -------------------------------------------------- session.execute( text( """ DELETE FROM plan_recommendations pr USING plan p WHERE pr.plan_id = p.id AND p.property_id = ANY(:property_ids) """ ), params, ) # -------------------------------------------------- # funding_package_measures # -------------------------------------------------- session.execute( text( """ DELETE FROM funding_package_measures fpm USING funding_package fp, plan p WHERE fpm.funding_package_id = fp.id AND fp.plan_id = p.id AND p.property_id = ANY(:property_ids) """ ), params, ) # -------------------------------------------------- # inspections (direct) # -------------------------------------------------- session.execute( text( """ DELETE FROM inspections WHERE property_id = ANY(:property_ids) """ ), params, ) # -------------------------------------------------- # funding_package # -------------------------------------------------- session.execute( text( """ DELETE FROM funding_package fp USING plan p WHERE fp.plan_id = p.id AND p.property_id = ANY(:property_ids) """ ), params, ) # -------------------------------------------------- # recommendation (direct — CRITICAL FIX) # -------------------------------------------------- session.execute( text( """ DELETE FROM recommendation WHERE property_id = ANY(:property_ids) """ ), params, ) # -------------------------------------------------- # plan (direct) # -------------------------------------------------- session.execute( text( """ DELETE FROM plan WHERE property_id = ANY(:property_ids) """ ), params, ) # -------------------------------------------------- # property-scoped tables # -------------------------------------------------- session.execute( text( """ DELETE FROM property_details_epc WHERE property_id = ANY(:property_ids) """ ), params, ) session.execute( text( """ DELETE FROM property_targets WHERE property_id = ANY(:property_ids) """ ), params, ) # -------------------------------------------------- # properties LAST # -------------------------------------------------- session.execute( text( """ DELETE FROM property WHERE id = ANY(:property_ids) """ ), params, ) def portfolio_has_properties(portfolio_id: int) -> bool: with db_read_session() as session: return session.query( session.query(PropertyModel) .filter(PropertyModel.portfolio_id == portfolio_id) .exists() ).scalar() def delete_portfolio_scenarios_if_empty(portfolio_id: int): if portfolio_has_properties(portfolio_id): print("Properties still exist — skipping scenario deletion") return with db_session() as session: session.execute( delete(ScenarioModel).where(ScenarioModel.portfolio_id == portfolio_id) ) print("Deleted scenarios for empty portfolio") def clear_portfolio_in_batches( portfolio_id: int, property_batch_size: int = 25, ): property_ids = get_property_ids(portfolio_id) if not property_ids: print("No properties found.") delete_portfolio_scenarios_if_empty(portfolio_id) return total = (len(property_ids) + property_batch_size - 1) // property_batch_size import time for i, batch in enumerate(chunked(property_ids, property_batch_size), start=1): print(f"Deleting batch {i}/{total} ({len(batch)} properties)") start_time = time.time() with db_session() as session: delete_property_batch(session, batch) finish_time = time.time() print(f"Batch {i} deleted in {finish_time - start_time:.2f} seconds") # scenario deletion happens AFTER all properties are gone delete_portfolio_scenarios_if_empty(portfolio_id) print("Portfolio cleared in batches.") def get_plans_by_portfolio_id(portfolio_id: int) -> List[PlanModel]: stmt = select(PlanModel).where(PlanModel.portfolio_id == portfolio_id) with db_read_session() as session: session_any: Any = session # Typehint as Any to satisfy Pylance... return session_any.exec(stmt).scalars().all() def get_scenario(scenario_id: int) -> Optional[ScenarioModel]: stmt = select(ScenarioModel).where(ScenarioModel.id == scenario_id) with db_read_session() as session: session_any: Any = session # Typehint as Any to satisfy Pylance... return session_any.exec(stmt).scalar_one_or_none() def bulk_update_plans( plan_models: List[PlanModel], scenario_models: List[ScenarioModel], ) -> int: if not plan_models: return 0 with db_read_session() as session: plan_mapper: Mapper[Any] = inspect(PlanModel) scenario_mapper: Mapper[Any] = inspect(ScenarioModel) plan_mappings: List[Dict[str, Any]] = ( [] ) # Typehint as Any to satisfy Pylance... for plan in plan_models: data: Dict[str, Any] = { c.name: getattr(plan, c.name) for c in plan.__table__.columns if c.name != "id" } data["id"] = plan.id plan_mappings.append(data) session.bulk_update_mappings(plan_mapper, plan_mappings) scenario_mappings: List[Dict[str, Any]] = ( [] ) # Typehint as Any to satisfy Pylance... for scenario in scenario_models: data: Dict[str, Any] = { c.name: getattr(scenario, c.name) for c in scenario.__table__.columns if c.name not in {"id", "portfolio_id"} } data["id"] = scenario.id scenario_mappings.append(data) session.bulk_update_mappings(scenario_mapper, scenario_mappings) session.commit() return len(plan_models)