mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
Merge pull request #626 from Hestia-Homes/eco-eligiblity-bug
handling db bug for trigger api
This commit is contained in:
commit
33e61507c4
4 changed files with 143 additions and 302 deletions
|
|
@ -1,4 +1,5 @@
|
|||
from sqlalchemy import create_engine
|
||||
from contextlib import contextmanager
|
||||
from backend.app.config import get_settings
|
||||
from sqlmodel import Session
|
||||
|
||||
|
|
@ -29,3 +30,25 @@ def get_db_session():
|
|||
if db_engine is None:
|
||||
raise RuntimeError("Database is not configured. Set DATABASE_URL in environment variables.")
|
||||
return Session(db_engine)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def db_session():
|
||||
session = Session(db_engine)
|
||||
try:
|
||||
yield session
|
||||
session.commit()
|
||||
except Exception:
|
||||
session.rollback()
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def db_read_session():
|
||||
session = Session(db_engine, expire_on_commit=False)
|
||||
try:
|
||||
yield session
|
||||
finally:
|
||||
session.close()
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ from backend.app.db.models.portfolio import (
|
|||
)
|
||||
from backend.app.db.models.funding import FundingPackageMeasures, FundingPackage
|
||||
from backend.app.db.models.inspections import InspectionModel
|
||||
from backend.app.db.connection import db_session, db_read_session
|
||||
|
||||
|
||||
def prepare_plan_data(
|
||||
|
|
@ -350,298 +351,143 @@ def chunked(iterable, size=100):
|
|||
yield iterable[i:i + size]
|
||||
|
||||
|
||||
# def fast_delete_recommendations(session, chunk):
|
||||
# placeholders = ",".join(["(:p{})".format(i) for i in range(len(chunk))])
|
||||
# params = {f"p{i}": chunk[i] for i in range(len(chunk))}
|
||||
#
|
||||
# sql = text(f"""
|
||||
# WITH ids(property_id) AS (
|
||||
# VALUES {placeholders}
|
||||
# )
|
||||
# DELETE FROM recommendation r
|
||||
# USING ids
|
||||
# WHERE r.property_id = ids.property_id;
|
||||
# """)
|
||||
#
|
||||
# session.execute(sql, params, execution_options={"synchronize_session": False})
|
||||
def get_property_ids(portfolio_id: int) -> list[int]:
|
||||
with db_read_session() as session:
|
||||
return [
|
||||
pid for (pid,) in
|
||||
session.query(PropertyModel.id)
|
||||
.filter(PropertyModel.portfolio_id == portfolio_id)
|
||||
.all()
|
||||
]
|
||||
|
||||
|
||||
def delete_property_batch(session: Session, property_ids: list[int]):
|
||||
if not property_ids:
|
||||
return
|
||||
|
||||
# --------------------------------------------------
|
||||
# Shared subqueries (computed once)
|
||||
# --------------------------------------------------
|
||||
plan_ids = (
|
||||
select(Plan.id)
|
||||
.where(Plan.property_id.in_(property_ids))
|
||||
)
|
||||
|
||||
recommendation_ids = (
|
||||
select(Recommendation.id)
|
||||
.where(Recommendation.property_id.in_(property_ids))
|
||||
)
|
||||
|
||||
funding_package_ids = (
|
||||
select(FundingPackage.id)
|
||||
.where(FundingPackage.plan_id.in_(plan_ids))
|
||||
)
|
||||
|
||||
# --------------------------------------------------
|
||||
# Leaf tables FIRST
|
||||
# --------------------------------------------------
|
||||
session.execute(
|
||||
delete(RecommendationMaterials)
|
||||
.where(RecommendationMaterials.recommendation_id.in_(recommendation_ids))
|
||||
)
|
||||
|
||||
session.execute(
|
||||
delete(PlanRecommendations)
|
||||
.where(PlanRecommendations.plan_id.in_(plan_ids))
|
||||
)
|
||||
|
||||
session.execute(
|
||||
delete(FundingPackageMeasures)
|
||||
.where(FundingPackageMeasures.funding_package_id.in_(funding_package_ids))
|
||||
)
|
||||
|
||||
session.execute(
|
||||
delete(InspectionModel)
|
||||
.where(InspectionModel.property_id.in_(property_ids))
|
||||
)
|
||||
|
||||
# --------------------------------------------------
|
||||
# Mid-level tables
|
||||
# --------------------------------------------------
|
||||
session.execute(
|
||||
delete(FundingPackage)
|
||||
.where(FundingPackage.id.in_(funding_package_ids))
|
||||
)
|
||||
|
||||
def fast_delete_recommendations(session, chunk):
|
||||
session.execute(
|
||||
delete(Recommendation)
|
||||
.where(Recommendation.property_id.in_(chunk))
|
||||
.where(Recommendation.id.in_(recommendation_ids))
|
||||
)
|
||||
|
||||
|
||||
def clear_portfolio(session: Session, portfolio_id: int, batch_size=100):
|
||||
def print_progress(prefix, i, total):
|
||||
print(f"{prefix} ({i}/{total})")
|
||||
|
||||
# --------------------------
|
||||
# Collect IDs up-front
|
||||
# --------------------------
|
||||
property_ids = [
|
||||
p.id for p in session.query(PropertyModel.id)
|
||||
.filter(PropertyModel.portfolio_id == portfolio_id)
|
||||
]
|
||||
|
||||
recommendation_ids = [
|
||||
r.id for r in session.query(Recommendation.id)
|
||||
.filter(Recommendation.property_id.in_(property_ids))
|
||||
]
|
||||
|
||||
plan_ids = [
|
||||
p.id for p in session.query(Plan.id)
|
||||
.filter(Plan.portfolio_id == portfolio_id)
|
||||
]
|
||||
|
||||
funding_package_ids = [
|
||||
fp.id for fp in session.query(FundingPackage.id)
|
||||
.filter(FundingPackage.plan_id.in_(plan_ids))
|
||||
]
|
||||
|
||||
# ========== BATCH HELPERS ==========
|
||||
def chunked(lst, n):
|
||||
for i in range(0, len(lst), n):
|
||||
yield lst[i:i + n]
|
||||
|
||||
# --------------------------
|
||||
# Deleting RecommendationMaterials
|
||||
# --------------------------
|
||||
rm_chunks = list(chunked(recommendation_ids, batch_size))
|
||||
total = len(rm_chunks)
|
||||
for i, chunk in enumerate(rm_chunks, start=1):
|
||||
print_progress("Deleting RecommendationMaterials", i, total)
|
||||
session.execute(
|
||||
delete(RecommendationMaterials)
|
||||
.where(RecommendationMaterials.recommendation_id.in_(chunk))
|
||||
)
|
||||
|
||||
# --------------------------
|
||||
# PlanRecommendations
|
||||
# --------------------------
|
||||
pr_chunks = list(chunked(plan_ids, batch_size))
|
||||
total = len(pr_chunks)
|
||||
for i, chunk in enumerate(pr_chunks, start=1):
|
||||
print_progress("Deleting PlanRecommendations", i, total)
|
||||
session.execute(
|
||||
delete(PlanRecommendations)
|
||||
.where(PlanRecommendations.plan_id.in_(chunk))
|
||||
)
|
||||
|
||||
# --------------------------
|
||||
# FundingPackageMeasures
|
||||
# --------------------------
|
||||
fpm_chunks = list(chunked(funding_package_ids, batch_size))
|
||||
total = len(fpm_chunks)
|
||||
for i, chunk in enumerate(fpm_chunks, start=1):
|
||||
print_progress("Deleting FundingPackageMeasures", i, total)
|
||||
session.execute(
|
||||
delete(FundingPackageMeasures)
|
||||
.where(FundingPackageMeasures.funding_package_id.in_(chunk))
|
||||
)
|
||||
|
||||
# --------------------------
|
||||
# FundingPackages
|
||||
# --------------------------
|
||||
fp_chunks = list(chunked(plan_ids, batch_size))
|
||||
total = len(fp_chunks)
|
||||
for i, chunk in enumerate(fp_chunks, start=1):
|
||||
print_progress("Deleting FundingPackages", i, total)
|
||||
session.execute(
|
||||
delete(FundingPackage)
|
||||
.where(FundingPackage.plan_id.in_(chunk))
|
||||
)
|
||||
|
||||
# --------------------------
|
||||
# Plans
|
||||
# --------------------------
|
||||
plan_chunks = list(chunked(plan_ids, batch_size))
|
||||
total = len(plan_chunks)
|
||||
for i, chunk in enumerate(plan_chunks, start=1):
|
||||
print_progress("Deleting Plans", i, total)
|
||||
session.execute(
|
||||
delete(Plan)
|
||||
.where(Plan.id.in_(chunk))
|
||||
)
|
||||
|
||||
# --------------------------
|
||||
# Scenarios
|
||||
# --------------------------
|
||||
print("Deleting Scenarios…")
|
||||
session.execute(
|
||||
delete(Scenario)
|
||||
.where(Scenario.portfolio_id == portfolio_id)
|
||||
delete(Plan)
|
||||
.where(Plan.id.in_(plan_ids))
|
||||
)
|
||||
|
||||
# --------------------------
|
||||
# Recommendations (fast delete)
|
||||
# --------------------------
|
||||
# rec_chunks = list(chunked(property_ids, batch_size * 5)) # larger chunks for fast delete
|
||||
# total = len(rec_chunks)
|
||||
# for i, chunk in enumerate(rec_chunks, start=1):
|
||||
# print_progress("Deleting Recommendations", i, total)
|
||||
# fast_delete_recommendations(session, chunk)
|
||||
rec_chunks = list(chunked(recommendation_ids, batch_size))
|
||||
total = len(rec_chunks)
|
||||
for i, chunk in enumerate(rec_chunks, start=1):
|
||||
print_progress("Deleting Recommendations", i, total)
|
||||
session.execute(
|
||||
delete(Recommendation)
|
||||
.where(Recommendation.id.in_(chunk))
|
||||
)
|
||||
|
||||
# --------------------------
|
||||
# Inspections
|
||||
# --------------------------
|
||||
insp_chunks = list(chunked(property_ids, batch_size))
|
||||
total = len(insp_chunks)
|
||||
for i, chunk in enumerate(insp_chunks, start=1):
|
||||
print_progress("Deleting Inspections", i, total)
|
||||
session.execute(
|
||||
delete(InspectionModel)
|
||||
.where(InspectionModel.property_id.in_(chunk))
|
||||
)
|
||||
|
||||
# --------------------------
|
||||
# PropertyTargetsModel
|
||||
# --------------------------
|
||||
print("Deleting PropertyTargetsModel…")
|
||||
session.execute(
|
||||
delete(PropertyTargetsModel)
|
||||
.where(PropertyTargetsModel.portfolio_id == portfolio_id)
|
||||
)
|
||||
|
||||
# --------------------------
|
||||
# PropertyDetailsEpcModel
|
||||
# --------------------------
|
||||
print("Deleting PropertyDetailsEpcModel…")
|
||||
# --------------------------------------------------
|
||||
# Property-scoped tables
|
||||
# --------------------------------------------------
|
||||
session.execute(
|
||||
delete(PropertyDetailsEpcModel)
|
||||
.where(PropertyDetailsEpcModel.portfolio_id == portfolio_id)
|
||||
.where(PropertyDetailsEpcModel.property_id.in_(property_ids))
|
||||
)
|
||||
|
||||
# --------------------------
|
||||
# Properties
|
||||
# --------------------------
|
||||
prop_chunks = list(chunked(property_ids, batch_size))
|
||||
total = len(prop_chunks)
|
||||
for i, chunk in enumerate(prop_chunks, start=1):
|
||||
print_progress("Deleting Properties", i, total)
|
||||
session.execute(
|
||||
delete(PropertyTargetsModel)
|
||||
.where(PropertyTargetsModel.property_id.in_(property_ids))
|
||||
)
|
||||
|
||||
# --------------------------------------------------
|
||||
# Properties LAST
|
||||
# --------------------------------------------------
|
||||
session.execute(
|
||||
delete(PropertyModel)
|
||||
.where(PropertyModel.id.in_(property_ids))
|
||||
)
|
||||
|
||||
|
||||
def portfolio_has_properties(portfolio_id: int) -> bool:
|
||||
with db_read_session() as session:
|
||||
return session.query(
|
||||
session.query(PropertyModel)
|
||||
.filter(PropertyModel.portfolio_id == portfolio_id)
|
||||
.exists()
|
||||
).scalar()
|
||||
|
||||
|
||||
def delete_portfolio_scenarios_if_empty(portfolio_id: int):
|
||||
if portfolio_has_properties(portfolio_id):
|
||||
print("Properties still exist — skipping scenario deletion")
|
||||
return
|
||||
|
||||
with db_session() as session:
|
||||
session.execute(
|
||||
delete(PropertyModel)
|
||||
.where(PropertyModel.id.in_(chunk))
|
||||
delete(Scenario)
|
||||
.where(Scenario.portfolio_id == portfolio_id)
|
||||
)
|
||||
|
||||
session.commit()
|
||||
print("Portfolio cleared.")
|
||||
print("Deleted scenarios for empty portfolio")
|
||||
|
||||
|
||||
def clear_portfolio_in_batches(
|
||||
session: Session,
|
||||
portfolio_id: int,
|
||||
property_batch_size: int = 10
|
||||
property_batch_size: int = 25,
|
||||
):
|
||||
# Fetch all property IDs once
|
||||
property_ids = [
|
||||
pid for (pid,) in
|
||||
session.query(PropertyModel.id)
|
||||
.filter(PropertyModel.portfolio_id == portfolio_id)
|
||||
.all()
|
||||
]
|
||||
property_ids = get_property_ids(portfolio_id)
|
||||
|
||||
def delete_for_property_batch(prop_ids):
|
||||
# ----------------------------
|
||||
# Recommendations → PlanRecommendations
|
||||
# ----------------------------
|
||||
rec_subq = (
|
||||
select(Recommendation.id)
|
||||
.where(Recommendation.property_id.in_(prop_ids))
|
||||
)
|
||||
if not property_ids:
|
||||
print("No properties found.")
|
||||
delete_portfolio_scenarios_if_empty(portfolio_id)
|
||||
return
|
||||
|
||||
session.execute(
|
||||
delete(PlanRecommendations)
|
||||
.where(PlanRecommendations.recommendation_id.in_(rec_subq))
|
||||
)
|
||||
total = (len(property_ids) + property_batch_size - 1) // property_batch_size
|
||||
|
||||
session.execute(
|
||||
delete(RecommendationMaterials)
|
||||
.where(RecommendationMaterials.recommendation_id.in_(rec_subq))
|
||||
)
|
||||
for i, batch in enumerate(chunked(property_ids, property_batch_size), start=1):
|
||||
print(f"Deleting batch {i}/{total} ({len(batch)} properties)")
|
||||
with db_session() as session:
|
||||
delete_property_batch(session, batch)
|
||||
|
||||
session.execute(
|
||||
delete(Recommendation)
|
||||
.where(Recommendation.property_id.in_(prop_ids))
|
||||
)
|
||||
|
||||
# ----------------------------
|
||||
# Inspections
|
||||
# ----------------------------
|
||||
session.execute(
|
||||
delete(InspectionModel)
|
||||
.where(InspectionModel.property_id.in_(prop_ids))
|
||||
)
|
||||
|
||||
# ----------------------------
|
||||
# Plans (scoped to these properties)
|
||||
# ----------------------------
|
||||
plan_subq = (
|
||||
select(Plan.id)
|
||||
.where(Plan.property_id.in_(prop_ids))
|
||||
)
|
||||
|
||||
session.execute(
|
||||
delete(PlanRecommendations)
|
||||
.where(PlanRecommendations.plan_id.in_(plan_subq))
|
||||
)
|
||||
|
||||
session.execute(
|
||||
delete(FundingPackageMeasures)
|
||||
.where(
|
||||
FundingPackageMeasures.funding_package_id.in_(
|
||||
select(FundingPackage.id)
|
||||
.where(FundingPackage.plan_id.in_(plan_subq))
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
session.execute(
|
||||
delete(FundingPackage)
|
||||
.where(FundingPackage.plan_id.in_(plan_subq))
|
||||
)
|
||||
|
||||
session.execute(
|
||||
delete(Plan)
|
||||
.where(Plan.id.in_(plan_subq))
|
||||
)
|
||||
|
||||
# ----------------------------
|
||||
# Property-scoped auxiliary tables
|
||||
# ----------------------------
|
||||
session.execute(
|
||||
delete(PropertyDetailsEpcModel)
|
||||
.where(PropertyDetailsEpcModel.property_id.in_(prop_ids))
|
||||
)
|
||||
|
||||
session.execute(
|
||||
delete(PropertyTargetsModel)
|
||||
.where(PropertyTargetsModel.property_id.in_(prop_ids))
|
||||
)
|
||||
|
||||
# ----------------------------
|
||||
# Properties (last)
|
||||
# ----------------------------
|
||||
session.execute(
|
||||
delete(PropertyModel)
|
||||
.where(PropertyModel.id.in_(prop_ids))
|
||||
)
|
||||
|
||||
# -------- BATCH DELETE LOOP --------
|
||||
property_chunks = list(chunked(property_ids, property_batch_size))
|
||||
total_batches = len(property_chunks)
|
||||
|
||||
for i, prop_ids in enumerate(property_chunks, start=1):
|
||||
print(f"Deleting batch {i}/{total_batches} ({len(prop_ids)} properties)")
|
||||
delete_for_property_batch(prop_ids)
|
||||
session.commit()
|
||||
# scenario deletion happens AFTER all properties are gone
|
||||
delete_portfolio_scenarios_if_empty(portfolio_id)
|
||||
|
||||
print("Portfolio cleared in batches.")
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest):
|
|||
created_at = datetime.now().isoformat()
|
||||
with db_session() as session:
|
||||
# Create a new scenario
|
||||
new_scenario = create_scenario(
|
||||
scenario_id = create_scenario(
|
||||
session=session,
|
||||
scenario={
|
||||
"name": body.scenario_name,
|
||||
|
|
@ -91,7 +91,6 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest):
|
|||
"multi_plan": body.multi_plan
|
||||
}
|
||||
)
|
||||
scenario_id = new_scenario.id
|
||||
# Insert the scenario ID into the data payload
|
||||
data["scenario_id"] = scenario_id
|
||||
|
||||
|
|
|
|||
|
|
@ -8,10 +8,7 @@ import pandas as pd
|
|||
import numpy as np
|
||||
from uuid import UUID
|
||||
|
||||
from backend.Funding import Funding
|
||||
from backend.SearchEpc import SearchEpc
|
||||
from contextlib import contextmanager
|
||||
from sqlmodel import Session
|
||||
|
||||
from etl.epc.Record import EPCRecord
|
||||
from sqlalchemy.exc import IntegrityError, OperationalError
|
||||
|
|
@ -19,11 +16,11 @@ from starlette.responses import Response
|
|||
from backend.app.BatterySapScorer import BatterySAPScorer
|
||||
|
||||
from backend.app.config import get_settings, get_prediction_buckets
|
||||
from backend.app.db.connection import db_engine
|
||||
from backend.app.db.connection import db_session, db_read_session
|
||||
import backend.app.db.functions as db_funcs
|
||||
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
|
||||
|
||||
from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES
|
||||
from backend.app.plan.schemas import PlanTriggerRequest
|
||||
from backend.app.plan.utils import (
|
||||
get_cleaned, patch_epc, extract_property_request_data, parse_eco_packages, handle_error, build_cloudwatch_log_url
|
||||
)
|
||||
|
|
@ -31,6 +28,7 @@ from backend.app.utils import sap_to_epc
|
|||
import backend.app.assumptions as assumptions
|
||||
|
||||
from backend.ml_models.api import ModelApi
|
||||
from backend.ml_models.Valuation import PropertyValuation
|
||||
from backend.Property import Property
|
||||
from backend.apis.GoogleSolarApi import GoogleSolarApi
|
||||
from backend.addresses.Addresses import Addresses
|
||||
|
|
@ -39,15 +37,12 @@ from recommendations.optimiser.CostOptimiser import CostOptimiser
|
|||
from recommendations.optimiser.GainOptimiser import GainOptimiser
|
||||
import recommendations.optimiser.optimiser_functions as optimiser_functions
|
||||
from recommendations.Recommendations import Recommendations
|
||||
from backend.ml_models.Valuation import PropertyValuation
|
||||
from recommendations.optimiser.funding_optimiser import optimise_with_scenarios
|
||||
|
||||
from etl.bill_savings.KwhData import KwhData
|
||||
from etl.spatial.OpenUprnClient import OpenUprnClient
|
||||
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
|
||||
|
||||
from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths, optimise_with_scenarios
|
||||
from recommendations.recommendation_utils import convert_thickness_to_numeric, get_wall_u_value
|
||||
|
||||
from utils.logger import setup_logger
|
||||
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3
|
||||
|
||||
|
|
@ -530,28 +525,6 @@ def extract_address_data(config, body):
|
|||
return uprn, address1, full_address
|
||||
|
||||
|
||||
@contextmanager
|
||||
def db_session():
|
||||
session = Session(db_engine)
|
||||
try:
|
||||
yield session
|
||||
session.commit()
|
||||
except Exception:
|
||||
session.rollback()
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def db_read_session():
|
||||
session = Session(db_engine, expire_on_commit=False)
|
||||
try:
|
||||
yield session
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
async def model_engine(body: PlanTriggerRequest):
|
||||
logger.info("Model Engine triggered with body: %s", json.loads(body.model_dump_json()))
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue