handling db bug for trigger api

This commit is contained in:
Khalim Conn-Kowlessar 2026-01-01 14:02:09 +08:00
parent 8b88fad8d4
commit 860645e30e
4 changed files with 143 additions and 302 deletions

View file

@ -1,4 +1,5 @@
from sqlalchemy import create_engine
from contextlib import contextmanager
from backend.app.config import get_settings
from sqlmodel import Session
@ -29,3 +30,25 @@ def get_db_session():
if db_engine is None:
raise RuntimeError("Database is not configured. Set DATABASE_URL in environment variables.")
return Session(db_engine)
@contextmanager
def db_session():
session = Session(db_engine)
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
@contextmanager
def db_read_session():
session = Session(db_engine, expire_on_commit=False)
try:
yield session
finally:
session.close()

View file

@ -9,6 +9,7 @@ from backend.app.db.models.portfolio import (
)
from backend.app.db.models.funding import FundingPackageMeasures, FundingPackage
from backend.app.db.models.inspections import InspectionModel
from backend.app.db.connection import db_session, db_read_session
def prepare_plan_data(
@ -350,298 +351,143 @@ def chunked(iterable, size=100):
yield iterable[i:i + size]
# def fast_delete_recommendations(session, chunk):
# placeholders = ",".join(["(:p{})".format(i) for i in range(len(chunk))])
# params = {f"p{i}": chunk[i] for i in range(len(chunk))}
#
# sql = text(f"""
# WITH ids(property_id) AS (
# VALUES {placeholders}
# )
# DELETE FROM recommendation r
# USING ids
# WHERE r.property_id = ids.property_id;
# """)
#
# session.execute(sql, params, execution_options={"synchronize_session": False})
def get_property_ids(portfolio_id: int) -> list[int]:
with db_read_session() as session:
return [
pid for (pid,) in
session.query(PropertyModel.id)
.filter(PropertyModel.portfolio_id == portfolio_id)
.all()
]
def delete_property_batch(session: Session, property_ids: list[int]):
if not property_ids:
return
# --------------------------------------------------
# Shared subqueries (computed once)
# --------------------------------------------------
plan_ids = (
select(Plan.id)
.where(Plan.property_id.in_(property_ids))
)
recommendation_ids = (
select(Recommendation.id)
.where(Recommendation.property_id.in_(property_ids))
)
funding_package_ids = (
select(FundingPackage.id)
.where(FundingPackage.plan_id.in_(plan_ids))
)
# --------------------------------------------------
# Leaf tables FIRST
# --------------------------------------------------
session.execute(
delete(RecommendationMaterials)
.where(RecommendationMaterials.recommendation_id.in_(recommendation_ids))
)
session.execute(
delete(PlanRecommendations)
.where(PlanRecommendations.plan_id.in_(plan_ids))
)
session.execute(
delete(FundingPackageMeasures)
.where(FundingPackageMeasures.funding_package_id.in_(funding_package_ids))
)
session.execute(
delete(InspectionModel)
.where(InspectionModel.property_id.in_(property_ids))
)
# --------------------------------------------------
# Mid-level tables
# --------------------------------------------------
session.execute(
delete(FundingPackage)
.where(FundingPackage.id.in_(funding_package_ids))
)
def fast_delete_recommendations(session, chunk):
session.execute(
delete(Recommendation)
.where(Recommendation.property_id.in_(chunk))
.where(Recommendation.id.in_(recommendation_ids))
)
def clear_portfolio(session: Session, portfolio_id: int, batch_size=100):
def print_progress(prefix, i, total):
print(f"{prefix} ({i}/{total})")
# --------------------------
# Collect IDs up-front
# --------------------------
property_ids = [
p.id for p in session.query(PropertyModel.id)
.filter(PropertyModel.portfolio_id == portfolio_id)
]
recommendation_ids = [
r.id for r in session.query(Recommendation.id)
.filter(Recommendation.property_id.in_(property_ids))
]
plan_ids = [
p.id for p in session.query(Plan.id)
.filter(Plan.portfolio_id == portfolio_id)
]
funding_package_ids = [
fp.id for fp in session.query(FundingPackage.id)
.filter(FundingPackage.plan_id.in_(plan_ids))
]
# ========== BATCH HELPERS ==========
def chunked(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i + n]
# --------------------------
# Deleting RecommendationMaterials
# --------------------------
rm_chunks = list(chunked(recommendation_ids, batch_size))
total = len(rm_chunks)
for i, chunk in enumerate(rm_chunks, start=1):
print_progress("Deleting RecommendationMaterials", i, total)
session.execute(
delete(RecommendationMaterials)
.where(RecommendationMaterials.recommendation_id.in_(chunk))
)
# --------------------------
# PlanRecommendations
# --------------------------
pr_chunks = list(chunked(plan_ids, batch_size))
total = len(pr_chunks)
for i, chunk in enumerate(pr_chunks, start=1):
print_progress("Deleting PlanRecommendations", i, total)
session.execute(
delete(PlanRecommendations)
.where(PlanRecommendations.plan_id.in_(chunk))
)
# --------------------------
# FundingPackageMeasures
# --------------------------
fpm_chunks = list(chunked(funding_package_ids, batch_size))
total = len(fpm_chunks)
for i, chunk in enumerate(fpm_chunks, start=1):
print_progress("Deleting FundingPackageMeasures", i, total)
session.execute(
delete(FundingPackageMeasures)
.where(FundingPackageMeasures.funding_package_id.in_(chunk))
)
# --------------------------
# FundingPackages
# --------------------------
fp_chunks = list(chunked(plan_ids, batch_size))
total = len(fp_chunks)
for i, chunk in enumerate(fp_chunks, start=1):
print_progress("Deleting FundingPackages", i, total)
session.execute(
delete(FundingPackage)
.where(FundingPackage.plan_id.in_(chunk))
)
# --------------------------
# Plans
# --------------------------
plan_chunks = list(chunked(plan_ids, batch_size))
total = len(plan_chunks)
for i, chunk in enumerate(plan_chunks, start=1):
print_progress("Deleting Plans", i, total)
session.execute(
delete(Plan)
.where(Plan.id.in_(chunk))
)
# --------------------------
# Scenarios
# --------------------------
print("Deleting Scenarios…")
session.execute(
delete(Scenario)
.where(Scenario.portfolio_id == portfolio_id)
delete(Plan)
.where(Plan.id.in_(plan_ids))
)
# --------------------------
# Recommendations (fast delete)
# --------------------------
# rec_chunks = list(chunked(property_ids, batch_size * 5)) # larger chunks for fast delete
# total = len(rec_chunks)
# for i, chunk in enumerate(rec_chunks, start=1):
# print_progress("Deleting Recommendations", i, total)
# fast_delete_recommendations(session, chunk)
rec_chunks = list(chunked(recommendation_ids, batch_size))
total = len(rec_chunks)
for i, chunk in enumerate(rec_chunks, start=1):
print_progress("Deleting Recommendations", i, total)
session.execute(
delete(Recommendation)
.where(Recommendation.id.in_(chunk))
)
# --------------------------
# Inspections
# --------------------------
insp_chunks = list(chunked(property_ids, batch_size))
total = len(insp_chunks)
for i, chunk in enumerate(insp_chunks, start=1):
print_progress("Deleting Inspections", i, total)
session.execute(
delete(InspectionModel)
.where(InspectionModel.property_id.in_(chunk))
)
# --------------------------
# PropertyTargetsModel
# --------------------------
print("Deleting PropertyTargetsModel…")
session.execute(
delete(PropertyTargetsModel)
.where(PropertyTargetsModel.portfolio_id == portfolio_id)
)
# --------------------------
# PropertyDetailsEpcModel
# --------------------------
print("Deleting PropertyDetailsEpcModel…")
# --------------------------------------------------
# Property-scoped tables
# --------------------------------------------------
session.execute(
delete(PropertyDetailsEpcModel)
.where(PropertyDetailsEpcModel.portfolio_id == portfolio_id)
.where(PropertyDetailsEpcModel.property_id.in_(property_ids))
)
# --------------------------
# Properties
# --------------------------
prop_chunks = list(chunked(property_ids, batch_size))
total = len(prop_chunks)
for i, chunk in enumerate(prop_chunks, start=1):
print_progress("Deleting Properties", i, total)
session.execute(
delete(PropertyTargetsModel)
.where(PropertyTargetsModel.property_id.in_(property_ids))
)
# --------------------------------------------------
# Properties LAST
# --------------------------------------------------
session.execute(
delete(PropertyModel)
.where(PropertyModel.id.in_(property_ids))
)
def portfolio_has_properties(portfolio_id: int) -> bool:
with db_read_session() as session:
return session.query(
session.query(PropertyModel)
.filter(PropertyModel.portfolio_id == portfolio_id)
.exists()
).scalar()
def delete_portfolio_scenarios_if_empty(portfolio_id: int):
if portfolio_has_properties(portfolio_id):
print("Properties still exist — skipping scenario deletion")
return
with db_session() as session:
session.execute(
delete(PropertyModel)
.where(PropertyModel.id.in_(chunk))
delete(Scenario)
.where(Scenario.portfolio_id == portfolio_id)
)
session.commit()
print("Portfolio cleared.")
print("Deleted scenarios for empty portfolio")
def clear_portfolio_in_batches(
session: Session,
portfolio_id: int,
property_batch_size: int = 10
property_batch_size: int = 25,
):
# Fetch all property IDs once
property_ids = [
pid for (pid,) in
session.query(PropertyModel.id)
.filter(PropertyModel.portfolio_id == portfolio_id)
.all()
]
property_ids = get_property_ids(portfolio_id)
def delete_for_property_batch(prop_ids):
# ----------------------------
# Recommendations → PlanRecommendations
# ----------------------------
rec_subq = (
select(Recommendation.id)
.where(Recommendation.property_id.in_(prop_ids))
)
if not property_ids:
print("No properties found.")
delete_portfolio_scenarios_if_empty(portfolio_id)
return
session.execute(
delete(PlanRecommendations)
.where(PlanRecommendations.recommendation_id.in_(rec_subq))
)
total = (len(property_ids) + property_batch_size - 1) // property_batch_size
session.execute(
delete(RecommendationMaterials)
.where(RecommendationMaterials.recommendation_id.in_(rec_subq))
)
for i, batch in enumerate(chunked(property_ids, property_batch_size), start=1):
print(f"Deleting batch {i}/{total} ({len(batch)} properties)")
with db_session() as session:
delete_property_batch(session, batch)
session.execute(
delete(Recommendation)
.where(Recommendation.property_id.in_(prop_ids))
)
# ----------------------------
# Inspections
# ----------------------------
session.execute(
delete(InspectionModel)
.where(InspectionModel.property_id.in_(prop_ids))
)
# ----------------------------
# Plans (scoped to these properties)
# ----------------------------
plan_subq = (
select(Plan.id)
.where(Plan.property_id.in_(prop_ids))
)
session.execute(
delete(PlanRecommendations)
.where(PlanRecommendations.plan_id.in_(plan_subq))
)
session.execute(
delete(FundingPackageMeasures)
.where(
FundingPackageMeasures.funding_package_id.in_(
select(FundingPackage.id)
.where(FundingPackage.plan_id.in_(plan_subq))
)
)
)
session.execute(
delete(FundingPackage)
.where(FundingPackage.plan_id.in_(plan_subq))
)
session.execute(
delete(Plan)
.where(Plan.id.in_(plan_subq))
)
# ----------------------------
# Property-scoped auxiliary tables
# ----------------------------
session.execute(
delete(PropertyDetailsEpcModel)
.where(PropertyDetailsEpcModel.property_id.in_(prop_ids))
)
session.execute(
delete(PropertyTargetsModel)
.where(PropertyTargetsModel.property_id.in_(prop_ids))
)
# ----------------------------
# Properties (last)
# ----------------------------
session.execute(
delete(PropertyModel)
.where(PropertyModel.id.in_(prop_ids))
)
# -------- BATCH DELETE LOOP --------
property_chunks = list(chunked(property_ids, property_batch_size))
total_batches = len(property_chunks)
for i, prop_ids in enumerate(property_chunks, start=1):
print(f"Deleting batch {i}/{total_batches} ({len(prop_ids)} properties)")
delete_for_property_batch(prop_ids)
session.commit()
# scenario deletion happens AFTER all properties are gone
delete_portfolio_scenarios_if_empty(portfolio_id)
print("Portfolio cleared in batches.")

View file

@ -73,7 +73,7 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest):
created_at = datetime.now().isoformat()
with db_session() as session:
# Create a new scenario
new_scenario = create_scenario(
scenario_id = create_scenario(
session=session,
scenario={
"name": body.scenario_name,
@ -91,7 +91,6 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest):
"multi_plan": body.multi_plan
}
)
scenario_id = new_scenario.id
# Insert the scenario ID into the data payload
data["scenario_id"] = scenario_id

View file

@ -8,10 +8,7 @@ import pandas as pd
import numpy as np
from uuid import UUID
from backend.Funding import Funding
from backend.SearchEpc import SearchEpc
from contextlib import contextmanager
from sqlmodel import Session
from etl.epc.Record import EPCRecord
from sqlalchemy.exc import IntegrityError, OperationalError
@ -19,11 +16,11 @@ from starlette.responses import Response
from backend.app.BatterySapScorer import BatterySAPScorer
from backend.app.config import get_settings, get_prediction_buckets
from backend.app.db.connection import db_engine
from backend.app.db.connection import db_session, db_read_session
import backend.app.db.functions as db_funcs
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES
from backend.app.plan.schemas import PlanTriggerRequest
from backend.app.plan.utils import (
get_cleaned, patch_epc, extract_property_request_data, parse_eco_packages, handle_error, build_cloudwatch_log_url
)
@ -31,6 +28,7 @@ from backend.app.utils import sap_to_epc
import backend.app.assumptions as assumptions
from backend.ml_models.api import ModelApi
from backend.ml_models.Valuation import PropertyValuation
from backend.Property import Property
from backend.apis.GoogleSolarApi import GoogleSolarApi
from backend.addresses.Addresses import Addresses
@ -39,15 +37,12 @@ from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
import recommendations.optimiser.optimiser_functions as optimiser_functions
from recommendations.Recommendations import Recommendations
from backend.ml_models.Valuation import PropertyValuation
from recommendations.optimiser.funding_optimiser import optimise_with_scenarios
from etl.bill_savings.KwhData import KwhData
from etl.spatial.OpenUprnClient import OpenUprnClient
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths, optimise_with_scenarios
from recommendations.recommendation_utils import convert_thickness_to_numeric, get_wall_u_value
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3
@ -530,28 +525,6 @@ def extract_address_data(config, body):
return uprn, address1, full_address
@contextmanager
def db_session():
session = Session(db_engine)
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
@contextmanager
def db_read_session():
session = Session(db_engine, expire_on_commit=False)
try:
yield session
finally:
session.close()
async def model_engine(body: PlanTriggerRequest):
logger.info("Model Engine triggered with body: %s", json.loads(body.model_dump_json()))