from typing import Any, Dict, List, Optional from sqlalchemy import ( ColumnElement, and_, func, inspect, text, insert, delete, select, ) from sqlalchemy.orm import Session, Mapper from sqlalchemy.exc import SQLAlchemyError from sqlmodel import Session from backend.app.db.models.recommendations import ( PlanModel, Recommendation, RecommendationMaterials, PlanRecommendations, ScenarioModel, ) from backend.app.db.models.portfolio import PropertyModel from backend.app.db.connection import db_session, db_read_session def prepare_plan_data( p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc, default_recommendations, rebaselining_carbon=0, rebaselining_heat_demand=0, rebaselining_kwh=0, rebaselining_bills=0, ): """ Utility function to prepare the data that goes into the production of a plan. Is a fairly rough and unstructured function that will need improving in the future :param p: Instantiated property :param body: request body, PlanTriggerRequest :param scenario_id: unique identifier for the scenario :param eco_packages: Pre-constructed eco packages for a property :param valuations: valuation improvement data :param new_sap_points: sap points, post default recommendations :param new_epc: new epc rating, post default recommendations :param default_recommendations: list of default recommendations for a property :param rebaselining_carbon: carbon emissions adjustment for rebaselining :param rebaselining_heat_demand: heat demand adjustment for rebaselining :param rebaselining_kwh: kwh consumption adjustment for rebaselining :param rebaselining_bills: energy bill adjustment for rebaselining :return: """ # Plan carbon savings co2_savings = sum( [ r["co2_equivalent_savings"] for r in default_recommendations if not r.get("already_installed", False) ] ) post_co2_emissions = p.energy["co2_emissions"] - rebaselining_carbon - co2_savings # Plan bill savings energy_bill_savings = sum( [ r["energy_cost_savings"] for r in default_recommendations if not r.get("already_installed", False) ] ) post_energy_bill = ( sum(p.current_energy_bill.values()) - rebaselining_bills - energy_bill_savings ) # energy consumption energy_consumption_savings = sum( [ r["kwh_savings"] for r in default_recommendations if not r.get("already_installed", False) ] ) post_energy_consumption = ( p.current_energy_consumption - rebaselining_kwh - energy_consumption_savings ) valuation_post_retrofit, valuation_increase = None, None if valuations["current_value"]: valuation_increase = valuations["average_increase"] valuation_post_retrofit = valuations["average_increased_value"] # plan costing data cost_of_works = sum( [ r["total"] for r in default_recommendations if not r.get("already_installed", False) ] ) contingency_cost = sum( [ r.get("contingency", 0) for r in default_recommendations if not r.get("already_installed", False) ] ) return { "portfolio_id": body.portfolio_id, "property_id": p.id, "scenario_id": scenario_id, "is_default": True if p.is_new else False, "name": body.scenario_name, "valuation_increase_lower_bound": ( valuations["lower_bound_increased_value"] - valuations["current_value"] ), "valuation_increase_upper_bound": ( valuations["upper_bound_increased_value"] - valuations["current_value"] ), "valuation_increase_average": ( valuations["average_increased_value"] - valuations["current_value"] ), "post_sap_points": float(new_sap_points), "post_epc_rating": new_epc, "post_co2_emissions": float(post_co2_emissions), "co2_savings": float(co2_savings), "post_energy_bill": float(post_energy_bill), "energy_bill_savings": float(energy_bill_savings), "post_energy_consumption": float(post_energy_consumption), "energy_consumption_savings": float(energy_consumption_savings), "valuation_post_retrofit": valuation_post_retrofit, "valuation_increase": valuation_increase, "cost_of_works": float(cost_of_works), "contingency_cost": float(contingency_cost), "plan_type": eco_packages.get(p.id, (None, None, None))[2], } def create_plan(session: Session, plan): """ This function will create a record for the plan in the database if it does not exist. :param session: The database session :param plan: dictionary of data representing a plan to be created """ try: new_plan = PlanModel(**plan) session.add(new_plan) session.flush() session.commit() return new_plan.id except SQLAlchemyError as e: session.rollback() raise e def bulk_create_plans(session: Session, plans_to_create: list[dict]) -> dict[int, int]: if not plans_to_create: return {} payload = [ { "property_id": p["property_id"], **p["plan_data"], } for p in plans_to_create ] stmt = ( insert(PlanModel).values(payload).returning(PlanModel.id, PlanModel.property_id) ) result = session.execute(stmt).all() # property_id -> plan_id return {row.property_id: row.id for row in result} def create_scenario(session: Session, scenario: dict) -> int: existing_scenario = ( session.query(ScenarioModel) .filter_by(portfolio_id=scenario["portfolio_id"]) .first() ) scenario["is_default"] = not bool(existing_scenario) new_scenario = ScenarioModel(**scenario) session.add(new_scenario) session.flush() # ensures ID is populated scenario_id = new_scenario.id session.commit() return scenario_id def create_recommendation(session: Session, recommendation): """ This function will create a record for the recommendation in the database if it does not exist. :param session: The database session :param recommendation: dictionary of data representing a recommendation to be created """ try: new_recommendation = Recommendation(**recommendation) session.add(new_recommendation) session.flush() session.commit() return new_recommendation.id except SQLAlchemyError as e: session.rollback() raise e def create_recommendation_material( session: Session, recommendation_id, material_id, depth ): """ This function will create a record for the recommendation_material in the database if it does not exist. :param session: The databse session :param recommendation_id: ID of the recommendation :param material_id: ID of the material :param depth: depth of the material, may be null if a material where depth is not applicable """ new_recommendation_material = RecommendationMaterials( recommendation_id=recommendation_id, material_id=material_id, depth=depth ) session.add(new_recommendation_material) session.flush() return new_recommendation_material.id def create_plan_recommendations(session: Session, plan_id, recommendation_ids): """ This function will create records for the plan_recommendation in the database. :param session: The database session :param plan_id: ID of the plan :param recommendation_ids: list of recommendation IDs """ # Prepare a list of dictionaries for bulk insert data = [ {"plan_id": plan_id, "recommendation_id": rid} for rid in recommendation_ids ] # Bulk insert using SQLAlchemy's core API session.execute(insert(PlanRecommendations).values(data)) def upload_recommendations( session: Session, recommendations_to_upload, property_id, new_plan_id ): try: # Prepare data for bulk insert for Recommendation recommendations_data = [ { "property_id": property_id, "type": rec["type"], "measure_type": rec["measure_type"], "description": rec["description"], "estimated_cost": float(rec["total"]), "default": rec["default"], "starting_u_value": ( float(rec.get("starting_u_value")) if rec.get("starting_u_value") else None ), "new_u_value": ( float(rec.get("new_u_value")) if rec.get("new_u_value") else None ), "sap_points": float(rec["sap_points"]), "energy_savings": float(rec["heat_demand"]), "kwh_savings": float(rec["kwh_savings"]), "co2_equivalent_savings": float(rec["co2_equivalent_savings"]), "total_work_hours": float(rec["labour_hours"]), "energy_cost_savings": float(rec["energy_cost_savings"]), "labour_days": float(rec["labour_days"]), "already_installed": rec["already_installed"], "heat_demand": float(rec["heat_demand"]), } for rec in recommendations_to_upload ] # Insert the recommendations, get back the IDs stmt = ( insert(Recommendation) .returning(Recommendation.id) .values(recommendations_data) ) result = session.execute(stmt) uploaded_recommendation_ids = [row[0] for row in result] # Prepare data for bulk insert for RecommendationMaterials recommendation_materials_data = [ { "recommendation_id": recommendation_id, "material_id": part["id"], "depth": int(part["depth"]) if part["depth"] else None, "quantity": float(part["quantity"]) if part.get("quantity") else None, "quantity_unit": part.get("quantity_unit", None), "estimated_cost": float(part.get("total", part.get("total_cost"))), } for rec, recommendation_id in zip( recommendations_to_upload, uploaded_recommendation_ids ) for part in rec["parts"] ] session.bulk_insert_mappings( RecommendationMaterials, recommendation_materials_data ) # flush the changes to get the newly created IDs session.flush() create_plan_recommendations( session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids ) # Commit the transaction session.commit() return True except SQLAlchemyError as e: # Rollback the transaction in case of an error session.rollback() print(f"An error occurred: {e}") return False def bulk_upload_recommendations_and_materials( session: Session, recommendation_payload: list[dict], ): if not recommendation_payload: return # --------------------------------------------------------- # 1. Prepare recommendation rows # --------------------------------------------------------- recommendation_rows = [] parts_by_index = [] plan_ids_by_index = [] for rec in recommendation_payload: recommendation_rows.append( { "property_id": rec["property_id"], "type": rec["type"], "measure_type": rec["measure_type"], "description": rec["description"], "estimated_cost": rec["estimated_cost"], "default": rec["default"], "starting_u_value": rec["starting_u_value"], "new_u_value": rec["new_u_value"], "sap_points": rec["sap_points"], "heat_demand": rec["heat_demand"], "kwh_savings": rec["kwh_savings"], "co2_equivalent_savings": rec["co2_equivalent_savings"], "energy_savings": rec["energy_savings"], "energy_cost_savings": rec["energy_cost_savings"], "total_work_hours": rec["total_work_hours"], "labour_days": rec["labour_days"], "already_installed": rec["already_installed"], } ) parts_by_index.append(rec["parts"]) plan_ids_by_index.append(rec["plan_id"]) # --------------------------------------------------------- # 2. Insert recommendations and get IDs # --------------------------------------------------------- result = session.execute( insert(Recommendation).values(recommendation_rows).returning(Recommendation.id) ) recommendation_ids = [row[0] for row in result] # --------------------------------------------------------- # 3. Insert recommendation materials # --------------------------------------------------------- materials_rows = [] for recommendation_id, parts in zip(recommendation_ids, parts_by_index): for part in parts: materials_rows.append( { "recommendation_id": recommendation_id, "material_id": part["material_id"], "depth": part["depth"], "quantity": part["quantity"], "quantity_unit": part["quantity_unit"], "estimated_cost": part["estimated_cost"], } ) if materials_rows: session.execute(insert(RecommendationMaterials).values(materials_rows)) # --------------------------------------------------------- # 4. Insert plan ↔ recommendation links # --------------------------------------------------------- plan_recommendation_rows = [ { "plan_id": plan_id, "recommendation_id": recommendation_id, } for plan_id, recommendation_id in zip(plan_ids_by_index, recommendation_ids) ] session.execute(insert(PlanRecommendations).values(plan_recommendation_rows)) def chunked(iterable, size=100): for i in range(0, len(iterable), size): yield iterable[i : i + size] def get_property_ids(portfolio_id: int) -> list[int]: with db_read_session() as session: return [ pid for (pid,) in session.query(PropertyModel.id) .filter(PropertyModel.portfolio_id == portfolio_id) .all() ] def delete_property_batch(session: Session, property_ids: list[int]): if not property_ids: return params = {"property_ids": property_ids} # -------------------------------------------------- # recommendation_materials (via recommendation) # -------------------------------------------------- session.execute( text( """ DELETE FROM recommendation_materials rm USING recommendation r WHERE rm.recommendation_id = r.id AND r.property_id = ANY(:property_ids) """ ), params, ) # -------------------------------------------------- # plan_recommendations (via plan) # -------------------------------------------------- session.execute( text( """ DELETE FROM plan_recommendations pr USING plan p WHERE pr.plan_id = p.id AND p.property_id = ANY(:property_ids) """ ), params, ) # -------------------------------------------------- # funding_package_measures # -------------------------------------------------- session.execute( text( """ DELETE FROM funding_package_measures fpm USING funding_package fp, plan p WHERE fpm.funding_package_id = fp.id AND fp.plan_id = p.id AND p.property_id = ANY(:property_ids) """ ), params, ) # -------------------------------------------------- # inspections (direct) # -------------------------------------------------- session.execute( text( """ DELETE FROM inspections WHERE property_id = ANY(:property_ids) """ ), params, ) # -------------------------------------------------- # funding_package # -------------------------------------------------- session.execute( text( """ DELETE FROM funding_package fp USING plan p WHERE fp.plan_id = p.id AND p.property_id = ANY(:property_ids) """ ), params, ) # -------------------------------------------------- # recommendation (direct — CRITICAL FIX) # -------------------------------------------------- session.execute( text( """ DELETE FROM recommendation WHERE property_id = ANY(:property_ids) """ ), params, ) # -------------------------------------------------- # plan (direct) # -------------------------------------------------- session.execute( text( """ DELETE FROM plan WHERE property_id = ANY(:property_ids) """ ), params, ) # -------------------------------------------------- # property-scoped tables # -------------------------------------------------- session.execute( text( """ DELETE FROM property_details_epc WHERE property_id = ANY(:property_ids) """ ), params, ) session.execute( text( """ DELETE FROM property_targets WHERE property_id = ANY(:property_ids) """ ), params, ) # -------------------------------------------------- # properties LAST # -------------------------------------------------- session.execute( text( """ DELETE FROM property WHERE id = ANY(:property_ids) """ ), params, ) def portfolio_has_properties(portfolio_id: int) -> bool: with db_read_session() as session: return session.query( session.query(PropertyModel) .filter(PropertyModel.portfolio_id == portfolio_id) .exists() ).scalar() def delete_portfolio_scenarios_if_empty(portfolio_id: int): if portfolio_has_properties(portfolio_id): print("Properties still exist — skipping scenario deletion") return with db_session() as session: session.execute( delete(ScenarioModel).where(ScenarioModel.portfolio_id == portfolio_id) ) print("Deleted scenarios for empty portfolio") def clear_portfolio_in_batches( portfolio_id: int, property_batch_size: int = 25, ): property_ids = get_property_ids(portfolio_id) if not property_ids: print("No properties found.") delete_portfolio_scenarios_if_empty(portfolio_id) return total = (len(property_ids) + property_batch_size - 1) // property_batch_size import time for i, batch in enumerate(chunked(property_ids, property_batch_size), start=1): print(f"Deleting batch {i}/{total} ({len(batch)} properties)") start_time = time.time() with db_session() as session: delete_property_batch(session, batch) finish_time = time.time() print(f"Batch {i} deleted in {finish_time - start_time:.2f} seconds") # scenario deletion happens AFTER all properties are gone delete_portfolio_scenarios_if_empty(portfolio_id) print("Portfolio cleared in batches.") def get_plans_by_scenario_ids(ids: List[int]) -> List[PlanModel]: stmt = select(PlanModel).where(PlanModel.scenario_id.in_(ids)) with db_read_session() as session: session_any: Any = session # Typehint as Any to satisfy Pylance... return session_any.exec(stmt).scalars().all() def get_most_recent_plans_by_portfolio_id( portfolio_id: int, min_property_id: Optional[int] = None, max_property_id: Optional[int] = None, ) -> List[PlanModel]: filters = [PlanModel.portfolio_id == portfolio_id] if min_property_id is not None: filters.append(PlanModel.property_id >= min_property_id) if max_property_id is not None: filters.append(PlanModel.property_id <= max_property_id) # NOTE: This statement works for Postgres only, because of the Distinct stmt = ( select(PlanModel) .where(and_(*filters)) .distinct( PlanModel.property_id, PlanModel.scenario_id ) # one plan per property per scenario .order_by( PlanModel.property_id, PlanModel.scenario_id, PlanModel.created_at.desc(), PlanModel.id.desc(), ) ) with db_read_session() as session: session_any: Any = session # Typehint as Any to satisfy Pylance... return session_any.exec(stmt).scalars().all() def get_most_recent_plans_by_scenario_ids( scenario_ids: List[int], min_property_id: Optional[int] = None, max_property_id: Optional[int] = None, ) -> List[PlanModel]: if not scenario_ids: return [] # Base filter: scenario_id in provided list filters: List[ColumnElement[bool]] = [PlanModel.scenario_id.in_(scenario_ids)] # Add optional property ID range filters if min_property_id is not None: filters.append(PlanModel.property_id >= min_property_id) if max_property_id is not None: filters.append(PlanModel.property_id <= max_property_id) # NOTE: This statement works for Postgres only, because of the Distinct stmt = ( select(PlanModel) .where(and_(*filters)) .distinct( PlanModel.property_id, PlanModel.scenario_id ) # one plan per property per scenario .order_by( PlanModel.property_id, PlanModel.scenario_id, PlanModel.created_at.desc(), PlanModel.id.desc(), ) ) with db_read_session() as session: session_any: Any = session # Typehint as Any to satisfy Pylance return session_any.exec(stmt).scalars().all() def get_scenarios_by_portfolio_id(portfolio_id: int) -> List[ScenarioModel]: stmt = select(ScenarioModel).where(ScenarioModel.portfolio_id == portfolio_id) with db_read_session() as session: session_any: Any = session # Typehint as Any to satisfy Pylance... return session_any.exec(stmt).scalars().all() def get_scenarios_count_by_portfolio_id(portfolio_id: int) -> int: stmt = ( select(func.count()) .select_from(ScenarioModel) .where(ScenarioModel.portfolio_id == portfolio_id) ) with db_read_session() as session: session_any: Any = session # Typehint as Any to satisfy Pylance... return session_any.exec(stmt).scalar_one() def get_default_plans( portfolio_id: int, min_property_id: Optional[int] = None, max_property_id: Optional[int] = None, ) -> List[PlanModel]: filters: List[ColumnElement[bool]] = [ PlanModel.portfolio_id == portfolio_id, PlanModel.is_default.is_(True), ] if min_property_id is not None: filters.append(PlanModel.property_id >= min_property_id) if max_property_id is not None: filters.append(PlanModel.property_id <= max_property_id) stmt = select(PlanModel).where(and_(*filters)) with db_read_session() as session: session_any: Any = session # Typehint as Any to satisfy Pylance... plans: List[PlanModel] = session_any.exec(stmt).scalars().all() return plans def bulk_update_plans( plan_models: List[PlanModel], scenario_models: List[ScenarioModel], ) -> int: if not plan_models: return 0 with db_read_session() as session: plan_mapper: Mapper[Any] = inspect(PlanModel) scenario_mapper: Mapper[Any] = inspect(ScenarioModel) plan_mappings: List[Dict[str, Any]] = ( [] ) # Typehint as Any to satisfy Pylance... for plan in plan_models: data: Dict[str, Any] = { c.name: getattr(plan, c.name) for c in plan.__table__.columns if c.name != "id" } data["id"] = plan.id plan_mappings.append(data) session.bulk_update_mappings(plan_mapper, plan_mappings) scenario_mappings: List[Dict[str, Any]] = ( [] ) # Typehint as Any to satisfy Pylance... for scenario in scenario_models: data: Dict[str, Any] = { c.name: getattr(scenario, c.name) for c in scenario.__table__.columns if c.name not in {"id", "portfolio_id"} } data["id"] = scenario.id scenario_mappings.append(data) session.bulk_update_mappings(scenario_mapper, scenario_mappings) session.commit() return len(plan_models)