diff --git a/backend/app/db/connection.py b/backend/app/db/connection.py index a0bbe238..74f3bd2e 100644 --- a/backend/app/db/connection.py +++ b/backend/app/db/connection.py @@ -1,4 +1,5 @@ from sqlalchemy import create_engine +from contextlib import contextmanager from backend.app.config import get_settings from sqlmodel import Session @@ -29,3 +30,25 @@ def get_db_session(): if db_engine is None: raise RuntimeError("Database is not configured. Set DATABASE_URL in environment variables.") return Session(db_engine) + + +@contextmanager +def db_session(): + session = Session(db_engine) + try: + yield session + session.commit() + except Exception: + session.rollback() + raise + finally: + session.close() + + +@contextmanager +def db_read_session(): + session = Session(db_engine, expire_on_commit=False) + try: + yield session + finally: + session.close() diff --git a/backend/app/db/functions/recommendations_functions.py b/backend/app/db/functions/recommendations_functions.py index 5b39f86e..1ffb35d6 100644 --- a/backend/app/db/functions/recommendations_functions.py +++ b/backend/app/db/functions/recommendations_functions.py @@ -9,6 +9,7 @@ from backend.app.db.models.portfolio import ( ) from backend.app.db.models.funding import FundingPackageMeasures, FundingPackage from backend.app.db.models.inspections import InspectionModel +from backend.app.db.connection import db_session, db_read_session def prepare_plan_data( @@ -350,298 +351,143 @@ def chunked(iterable, size=100): yield iterable[i:i + size] -# def fast_delete_recommendations(session, chunk): -# placeholders = ",".join(["(:p{})".format(i) for i in range(len(chunk))]) -# params = {f"p{i}": chunk[i] for i in range(len(chunk))} -# -# sql = text(f""" -# WITH ids(property_id) AS ( -# VALUES {placeholders} -# ) -# DELETE FROM recommendation r -# USING ids -# WHERE r.property_id = ids.property_id; -# """) -# -# session.execute(sql, params, execution_options={"synchronize_session": False}) +def get_property_ids(portfolio_id: int) -> list[int]: + with db_read_session() as session: + return [ + pid for (pid,) in + session.query(PropertyModel.id) + .filter(PropertyModel.portfolio_id == portfolio_id) + .all() + ] + + +def delete_property_batch(session: Session, property_ids: list[int]): + if not property_ids: + return + + # -------------------------------------------------- + # Shared subqueries (computed once) + # -------------------------------------------------- + plan_ids = ( + select(Plan.id) + .where(Plan.property_id.in_(property_ids)) + ) + + recommendation_ids = ( + select(Recommendation.id) + .where(Recommendation.property_id.in_(property_ids)) + ) + + funding_package_ids = ( + select(FundingPackage.id) + .where(FundingPackage.plan_id.in_(plan_ids)) + ) + + # -------------------------------------------------- + # Leaf tables FIRST + # -------------------------------------------------- + session.execute( + delete(RecommendationMaterials) + .where(RecommendationMaterials.recommendation_id.in_(recommendation_ids)) + ) + + session.execute( + delete(PlanRecommendations) + .where(PlanRecommendations.plan_id.in_(plan_ids)) + ) + + session.execute( + delete(FundingPackageMeasures) + .where(FundingPackageMeasures.funding_package_id.in_(funding_package_ids)) + ) + + session.execute( + delete(InspectionModel) + .where(InspectionModel.property_id.in_(property_ids)) + ) + + # -------------------------------------------------- + # Mid-level tables + # -------------------------------------------------- + session.execute( + delete(FundingPackage) + .where(FundingPackage.id.in_(funding_package_ids)) + ) -def fast_delete_recommendations(session, chunk): session.execute( delete(Recommendation) - .where(Recommendation.property_id.in_(chunk)) + .where(Recommendation.id.in_(recommendation_ids)) ) - -def clear_portfolio(session: Session, portfolio_id: int, batch_size=100): - def print_progress(prefix, i, total): - print(f"{prefix} ({i}/{total})") - - # -------------------------- - # Collect IDs up-front - # -------------------------- - property_ids = [ - p.id for p in session.query(PropertyModel.id) - .filter(PropertyModel.portfolio_id == portfolio_id) - ] - - recommendation_ids = [ - r.id for r in session.query(Recommendation.id) - .filter(Recommendation.property_id.in_(property_ids)) - ] - - plan_ids = [ - p.id for p in session.query(Plan.id) - .filter(Plan.portfolio_id == portfolio_id) - ] - - funding_package_ids = [ - fp.id for fp in session.query(FundingPackage.id) - .filter(FundingPackage.plan_id.in_(plan_ids)) - ] - - # ========== BATCH HELPERS ========== - def chunked(lst, n): - for i in range(0, len(lst), n): - yield lst[i:i + n] - - # -------------------------- - # Deleting RecommendationMaterials - # -------------------------- - rm_chunks = list(chunked(recommendation_ids, batch_size)) - total = len(rm_chunks) - for i, chunk in enumerate(rm_chunks, start=1): - print_progress("Deleting RecommendationMaterials", i, total) - session.execute( - delete(RecommendationMaterials) - .where(RecommendationMaterials.recommendation_id.in_(chunk)) - ) - - # -------------------------- - # PlanRecommendations - # -------------------------- - pr_chunks = list(chunked(plan_ids, batch_size)) - total = len(pr_chunks) - for i, chunk in enumerate(pr_chunks, start=1): - print_progress("Deleting PlanRecommendations", i, total) - session.execute( - delete(PlanRecommendations) - .where(PlanRecommendations.plan_id.in_(chunk)) - ) - - # -------------------------- - # FundingPackageMeasures - # -------------------------- - fpm_chunks = list(chunked(funding_package_ids, batch_size)) - total = len(fpm_chunks) - for i, chunk in enumerate(fpm_chunks, start=1): - print_progress("Deleting FundingPackageMeasures", i, total) - session.execute( - delete(FundingPackageMeasures) - .where(FundingPackageMeasures.funding_package_id.in_(chunk)) - ) - - # -------------------------- - # FundingPackages - # -------------------------- - fp_chunks = list(chunked(plan_ids, batch_size)) - total = len(fp_chunks) - for i, chunk in enumerate(fp_chunks, start=1): - print_progress("Deleting FundingPackages", i, total) - session.execute( - delete(FundingPackage) - .where(FundingPackage.plan_id.in_(chunk)) - ) - - # -------------------------- - # Plans - # -------------------------- - plan_chunks = list(chunked(plan_ids, batch_size)) - total = len(plan_chunks) - for i, chunk in enumerate(plan_chunks, start=1): - print_progress("Deleting Plans", i, total) - session.execute( - delete(Plan) - .where(Plan.id.in_(chunk)) - ) - - # -------------------------- - # Scenarios - # -------------------------- - print("Deleting Scenarios…") session.execute( - delete(Scenario) - .where(Scenario.portfolio_id == portfolio_id) + delete(Plan) + .where(Plan.id.in_(plan_ids)) ) - # -------------------------- - # Recommendations (fast delete) - # -------------------------- - # rec_chunks = list(chunked(property_ids, batch_size * 5)) # larger chunks for fast delete - # total = len(rec_chunks) - # for i, chunk in enumerate(rec_chunks, start=1): - # print_progress("Deleting Recommendations", i, total) - # fast_delete_recommendations(session, chunk) - rec_chunks = list(chunked(recommendation_ids, batch_size)) - total = len(rec_chunks) - for i, chunk in enumerate(rec_chunks, start=1): - print_progress("Deleting Recommendations", i, total) - session.execute( - delete(Recommendation) - .where(Recommendation.id.in_(chunk)) - ) - - # -------------------------- - # Inspections - # -------------------------- - insp_chunks = list(chunked(property_ids, batch_size)) - total = len(insp_chunks) - for i, chunk in enumerate(insp_chunks, start=1): - print_progress("Deleting Inspections", i, total) - session.execute( - delete(InspectionModel) - .where(InspectionModel.property_id.in_(chunk)) - ) - - # -------------------------- - # PropertyTargetsModel - # -------------------------- - print("Deleting PropertyTargetsModel…") - session.execute( - delete(PropertyTargetsModel) - .where(PropertyTargetsModel.portfolio_id == portfolio_id) - ) - - # -------------------------- - # PropertyDetailsEpcModel - # -------------------------- - print("Deleting PropertyDetailsEpcModel…") + # -------------------------------------------------- + # Property-scoped tables + # -------------------------------------------------- session.execute( delete(PropertyDetailsEpcModel) - .where(PropertyDetailsEpcModel.portfolio_id == portfolio_id) + .where(PropertyDetailsEpcModel.property_id.in_(property_ids)) ) - # -------------------------- - # Properties - # -------------------------- - prop_chunks = list(chunked(property_ids, batch_size)) - total = len(prop_chunks) - for i, chunk in enumerate(prop_chunks, start=1): - print_progress("Deleting Properties", i, total) + session.execute( + delete(PropertyTargetsModel) + .where(PropertyTargetsModel.property_id.in_(property_ids)) + ) + + # -------------------------------------------------- + # Properties LAST + # -------------------------------------------------- + session.execute( + delete(PropertyModel) + .where(PropertyModel.id.in_(property_ids)) + ) + + +def portfolio_has_properties(portfolio_id: int) -> bool: + with db_read_session() as session: + return session.query( + session.query(PropertyModel) + .filter(PropertyModel.portfolio_id == portfolio_id) + .exists() + ).scalar() + + +def delete_portfolio_scenarios_if_empty(portfolio_id: int): + if portfolio_has_properties(portfolio_id): + print("Properties still exist — skipping scenario deletion") + return + + with db_session() as session: session.execute( - delete(PropertyModel) - .where(PropertyModel.id.in_(chunk)) + delete(Scenario) + .where(Scenario.portfolio_id == portfolio_id) ) - session.commit() - print("Portfolio cleared.") + print("Deleted scenarios for empty portfolio") def clear_portfolio_in_batches( - session: Session, portfolio_id: int, - property_batch_size: int = 10 + property_batch_size: int = 25, ): - # Fetch all property IDs once - property_ids = [ - pid for (pid,) in - session.query(PropertyModel.id) - .filter(PropertyModel.portfolio_id == portfolio_id) - .all() - ] + property_ids = get_property_ids(portfolio_id) - def delete_for_property_batch(prop_ids): - # ---------------------------- - # Recommendations → PlanRecommendations - # ---------------------------- - rec_subq = ( - select(Recommendation.id) - .where(Recommendation.property_id.in_(prop_ids)) - ) + if not property_ids: + print("No properties found.") + delete_portfolio_scenarios_if_empty(portfolio_id) + return - session.execute( - delete(PlanRecommendations) - .where(PlanRecommendations.recommendation_id.in_(rec_subq)) - ) + total = (len(property_ids) + property_batch_size - 1) // property_batch_size - session.execute( - delete(RecommendationMaterials) - .where(RecommendationMaterials.recommendation_id.in_(rec_subq)) - ) + for i, batch in enumerate(chunked(property_ids, property_batch_size), start=1): + print(f"Deleting batch {i}/{total} ({len(batch)} properties)") + with db_session() as session: + delete_property_batch(session, batch) - session.execute( - delete(Recommendation) - .where(Recommendation.property_id.in_(prop_ids)) - ) - - # ---------------------------- - # Inspections - # ---------------------------- - session.execute( - delete(InspectionModel) - .where(InspectionModel.property_id.in_(prop_ids)) - ) - - # ---------------------------- - # Plans (scoped to these properties) - # ---------------------------- - plan_subq = ( - select(Plan.id) - .where(Plan.property_id.in_(prop_ids)) - ) - - session.execute( - delete(PlanRecommendations) - .where(PlanRecommendations.plan_id.in_(plan_subq)) - ) - - session.execute( - delete(FundingPackageMeasures) - .where( - FundingPackageMeasures.funding_package_id.in_( - select(FundingPackage.id) - .where(FundingPackage.plan_id.in_(plan_subq)) - ) - ) - ) - - session.execute( - delete(FundingPackage) - .where(FundingPackage.plan_id.in_(plan_subq)) - ) - - session.execute( - delete(Plan) - .where(Plan.id.in_(plan_subq)) - ) - - # ---------------------------- - # Property-scoped auxiliary tables - # ---------------------------- - session.execute( - delete(PropertyDetailsEpcModel) - .where(PropertyDetailsEpcModel.property_id.in_(prop_ids)) - ) - - session.execute( - delete(PropertyTargetsModel) - .where(PropertyTargetsModel.property_id.in_(prop_ids)) - ) - - # ---------------------------- - # Properties (last) - # ---------------------------- - session.execute( - delete(PropertyModel) - .where(PropertyModel.id.in_(prop_ids)) - ) - - # -------- BATCH DELETE LOOP -------- - property_chunks = list(chunked(property_ids, property_batch_size)) - total_batches = len(property_chunks) - - for i, prop_ids in enumerate(property_chunks, start=1): - print(f"Deleting batch {i}/{total_batches} ({len(prop_ids)} properties)") - delete_for_property_batch(prop_ids) - session.commit() + # scenario deletion happens AFTER all properties are gone + delete_portfolio_scenarios_if_empty(portfolio_id) print("Portfolio cleared in batches.") diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 5de6b74e..ea41162f 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -73,7 +73,7 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest): created_at = datetime.now().isoformat() with db_session() as session: # Create a new scenario - new_scenario = create_scenario( + scenario_id = create_scenario( session=session, scenario={ "name": body.scenario_name, @@ -91,7 +91,6 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest): "multi_plan": body.multi_plan } ) - scenario_id = new_scenario.id # Insert the scenario ID into the data payload data["scenario_id"] = scenario_id diff --git a/backend/engine/engine.py b/backend/engine/engine.py index 14087f83..eb933cc0 100644 --- a/backend/engine/engine.py +++ b/backend/engine/engine.py @@ -8,10 +8,7 @@ import pandas as pd import numpy as np from uuid import UUID -from backend.Funding import Funding from backend.SearchEpc import SearchEpc -from contextlib import contextmanager -from sqlmodel import Session from etl.epc.Record import EPCRecord from sqlalchemy.exc import IntegrityError, OperationalError @@ -19,11 +16,11 @@ from starlette.responses import Response from backend.app.BatterySapScorer import BatterySAPScorer from backend.app.config import get_settings, get_prediction_buckets -from backend.app.db.connection import db_engine +from backend.app.db.connection import db_session, db_read_session import backend.app.db.functions as db_funcs from backend.app.db.functions.tasks.Tasks import SubTaskInterface -from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES +from backend.app.plan.schemas import PlanTriggerRequest from backend.app.plan.utils import ( get_cleaned, patch_epc, extract_property_request_data, parse_eco_packages, handle_error, build_cloudwatch_log_url ) @@ -31,6 +28,7 @@ from backend.app.utils import sap_to_epc import backend.app.assumptions as assumptions from backend.ml_models.api import ModelApi +from backend.ml_models.Valuation import PropertyValuation from backend.Property import Property from backend.apis.GoogleSolarApi import GoogleSolarApi from backend.addresses.Addresses import Addresses @@ -39,15 +37,12 @@ from recommendations.optimiser.CostOptimiser import CostOptimiser from recommendations.optimiser.GainOptimiser import GainOptimiser import recommendations.optimiser.optimiser_functions as optimiser_functions from recommendations.Recommendations import Recommendations -from backend.ml_models.Valuation import PropertyValuation +from recommendations.optimiser.funding_optimiser import optimise_with_scenarios from etl.bill_savings.KwhData import KwhData from etl.spatial.OpenUprnClient import OpenUprnClient from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc -from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths, optimise_with_scenarios -from recommendations.recommendation_utils import convert_thickness_to_numeric, get_wall_u_value - from utils.logger import setup_logger from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3 @@ -530,28 +525,6 @@ def extract_address_data(config, body): return uprn, address1, full_address -@contextmanager -def db_session(): - session = Session(db_engine) - try: - yield session - session.commit() - except Exception: - session.rollback() - raise - finally: - session.close() - - -@contextmanager -def db_read_session(): - session = Session(db_engine, expire_on_commit=False) - try: - yield session - finally: - session.close() - - async def model_engine(body: PlanTriggerRequest): logger.info("Model Engine triggered with body: %s", json.loads(body.model_dump_json()))