completed database upload refactor

This commit is contained in:
Khalim Conn-Kowlessar 2025-12-22 18:46:19 +08:00
parent c503da05a7
commit f06676f15a
3 changed files with 262 additions and 62 deletions

View file

@ -1,5 +1,6 @@
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import insert
from backend.app.db.models.funding import FundingPackage, FundingPackageMeasures
@ -69,3 +70,72 @@ def upload_funding(session: Session, p, plan_id, recommendations_to_upload):
session.rollback()
print(f"An error occurred: {e}")
return False
def bulk_upload_funding_packages(
session: Session,
funding_payload: list[dict],
):
"""
Bulk upload:
- funding_package
- funding_package_measures
Assumes caller manages the transaction.
"""
if not funding_payload:
return
# ---------------------------------------------------------
# 1. Prepare funding package rows
# ---------------------------------------------------------
funding_rows = []
measures_by_index = []
for f in funding_payload:
funding_rows.append({
"plan_id": f["plan_id"],
"scheme": f["scheme"],
"project_funding": f["project_funding"],
"total_uplift": f["total_uplift"],
"full_project_score": f["full_project_score"],
"partial_project_score": f["partial_project_score"],
"uplift_project_score": f["uplift_project_score"],
})
measures_by_index.append(f.get("measures", []))
# ---------------------------------------------------------
# 2. Insert funding packages and get IDs
# ---------------------------------------------------------
result = session.execute(
insert(FundingPackage)
.values(funding_rows)
.returning(FundingPackage.id)
)
funding_package_ids = [row[0] for row in result]
# ---------------------------------------------------------
# 3. Insert funding package measures
# ---------------------------------------------------------
measures_rows = []
for funding_package_id, measures in zip(
funding_package_ids, measures_by_index
):
for m in measures:
measures_rows.append({
"funding_package_id": funding_package_id,
"measure": m["measure"],
"material_id": m["material_id"],
"innovation_uplift": m["innovation_uplift"],
"partial_project_score": m["partial_project_score"],
"uplift_project_score": m["uplift_project_score"],
})
if measures_rows:
session.execute(
insert(FundingPackageMeasures).values(measures_rows)
)

View file

@ -96,6 +96,30 @@ def create_plan(session: Session, plan):
raise e
def bulk_create_plans(session: Session, plans_to_create: list[dict]) -> dict[int, int]:
if not plans_to_create:
return {}
payload = [
{
"property_id": p["property_id"],
**p["plan_data"],
}
for p in plans_to_create
]
stmt = (
insert(Plan)
.values(payload)
.returning(Plan.id, Plan.property_id)
)
result = session.execute(stmt).all()
# property_id -> plan_id
return {row.property_id: row.id for row in result}
def create_scenario(session: Session, scenario: dict) -> int:
existing_scenario = (
session.query(Scenario)
@ -233,6 +257,94 @@ def upload_recommendations(session: Session, recommendations_to_upload, property
return False
def bulk_upload_recommendations_and_materials(
session: Session,
recommendation_payload: list[dict],
):
if not recommendation_payload:
return
# ---------------------------------------------------------
# 1. Prepare recommendation rows
# ---------------------------------------------------------
recommendation_rows = []
parts_by_index = []
plan_ids_by_index = []
for rec in recommendation_payload:
recommendation_rows.append({
"property_id": rec["property_id"],
"type": rec["type"],
"measure_type": rec["measure_type"],
"description": rec["description"],
"estimated_cost": rec["estimated_cost"],
"default": rec["default"],
"starting_u_value": rec["starting_u_value"],
"new_u_value": rec["new_u_value"],
"sap_points": rec["sap_points"],
"heat_demand": rec["heat_demand"],
"kwh_savings": rec["kwh_savings"],
"co2_equivalent_savings": rec["co2_equivalent_savings"],
"energy_savings": rec["energy_savings"],
"energy_cost_savings": rec["energy_cost_savings"],
"total_work_hours": rec["total_work_hours"],
"labour_days": rec["labour_days"],
"already_installed": rec["already_installed"],
})
parts_by_index.append(rec["parts"])
plan_ids_by_index.append(rec["plan_id"])
# ---------------------------------------------------------
# 2. Insert recommendations and get IDs
# ---------------------------------------------------------
result = session.execute(
insert(Recommendation)
.values(recommendation_rows)
.returning(Recommendation.id)
)
recommendation_ids = [row[0] for row in result]
# ---------------------------------------------------------
# 3. Insert recommendation materials
# ---------------------------------------------------------
materials_rows = []
for recommendation_id, parts in zip(recommendation_ids, parts_by_index):
for part in parts:
materials_rows.append({
"recommendation_id": recommendation_id,
"material_id": part["material_id"],
"depth": part["depth"],
"quantity": part["quantity"],
"quantity_unit": part["quantity_unit"],
"estimated_cost": part["estimated_cost"],
})
if materials_rows:
session.execute(
insert(RecommendationMaterials).values(materials_rows)
)
# ---------------------------------------------------------
# 4. Insert plan ↔ recommendation links
# ---------------------------------------------------------
plan_recommendation_rows = [
{
"plan_id": plan_id,
"recommendation_id": recommendation_id,
}
for plan_id, recommendation_id in zip(
plan_ids_by_index, recommendation_ids
)
]
session.execute(
insert(PlanRecommendations).values(plan_recommendation_rows)
)
def chunked(iterable, size=100):
for i in range(0, len(iterable), size):
yield iterable[i:i + size]

View file

@ -1351,11 +1351,7 @@ async def model_engine(body: PlanTriggerRequest):
}
)
# TODO: New
property_updates, property_epc_details, property_spatial_updates = [], [], []
# plans_to_create = [{property_id, plan_data}]
# recommendations_to_create = [{plan_ref, recommendation_data}]
# funding_to_create = [{plan_ref, funding_data}]
plans_to_create, recommendations_to_create, funding_to_create = [], [], []
# Prepare the data that will need to be uploaded in bulk
@ -1390,7 +1386,69 @@ async def model_engine(body: PlanTriggerRequest):
# store recommendations keyed by property
for r in recommendations_for_property:
recommendations_to_create.append({"property_id": p.id, "data": r})
recommendations_to_create.append({
"property_id": p.id,
# ---- Recommendation core ----
"type": r["type"],
"measure_type": r["measure_type"],
"description": r["description"],
"estimated_cost": float(r["total"]),
"default": r["default"],
"starting_u_value": float(r["starting_u_value"]) if r.get("starting_u_value") else None,
"new_u_value": float(r["new_u_value"]) if r.get("new_u_value") else None,
"sap_points": float(r["sap_points"]),
"energy_savings": float(r["heat_demand"]),
"kwh_savings": float(r["kwh_savings"]),
"co2_equivalent_savings": float(r["co2_equivalent_savings"]),
"total_work_hours": float(r["labour_hours"]),
"energy_cost_savings": float(r["energy_cost_savings"]),
"labour_days": float(r["labour_days"]),
"already_installed": r["already_installed"],
"heat_demand": float(r["heat_demand"]),
# ---- parts ----
"parts": [
{
"material_id": part["id"],
"depth": int(part["depth"]) if part.get("depth") else None,
"quantity": float(part["quantity"]) if part.get("quantity") else None,
"quantity_unit": part.get("quantity_unit"),
"estimated_cost": float(part.get("total", part.get("total_cost"))),
}
for part in r.get("parts", [])
],
})
if recommendations_for_property and p.funded_measures:
funding_to_create.append({
"property_id": p.id,
"scheme": p.scheme,
"project_funding": float(p.project_funding),
"total_uplift": float(p.total_uplift),
"full_project_score": float(p.full_project_score),
"partial_project_score": float(p.partial_project_score),
"uplift_project_score": float(p.uplift_project_score),
"measures": [
{
"measure": (
"cavity_wall_insulation"
if part["type"] == "extension_cavity_wall_insulation"
else "sealing_fireplace"
if part["type"] == "sealing_open_fireplace"
else part["type"]
),
"material_id": (
part["parts"][0]["id"]
if part.get("parts")
else None
),
"innovation_uplift": float(part["innovation_uplift"]),
"partial_project_score": float(part["partial_project_score"]),
"uplift_project_score": float(part["uplift_project_score"]),
}
for part in p.funded_measures
],
})
# Bulk upload property data
logger.info("Uploading property data in bulk")
@ -1398,65 +1456,25 @@ async def model_engine(body: PlanTriggerRequest):
db_funcs.property_functions.bulk_update_properties(session, property_updates)
db_funcs.property_functions.bulk_upsert_property_details_epc(session, property_epc_details)
db_funcs.property_functions.bulk_upsert_property_spatial(session, property_spatial_updates)
# # Bulk create plans
plan_id_by_property = db_funcs.recommendations_functions.bulk_create_plans(session, plans_to_create)
recommendation_payload = [
{
"plan_id": plan_id_by_property[r["property_id"]],
**{k: v for k, v in r.items() if k not in ["parts"]},
"parts": r["parts"],
} for r in recommendations_to_create if r["property_id"] in plan_id_by_property
]
# TODO: End New
db_funcs.recommendations_functions.bulk_upload_recommendations_and_materials(
session, recommendation_payload
)
for i in tqdm(
range(0, len(input_properties), BATCH_SIZE), total=int(np.ceil(len(input_properties) / BATCH_SIZE))
):
try:
# Take a slice of the input_properties list to make a batch
batch_properties = input_properties[i:i + BATCH_SIZE]
with db_session() as session:
for p in batch_properties:
recommendations_to_upload = recommendations.get(p.id, [])
default_recommendations = [r for r in recommendations_to_upload if r["default"]]
total_sap_points = sum([r["sap_points"] for r in default_recommendations])
new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points
new_epc = sap_to_epc(new_sap_points)
total_cost = sum([r["total"] for r in default_recommendations])
valuations = PropertyValuation.estimate(
property_instance=p, target_epc=new_epc, total_cost=total_cost
)
property_plan_data = db_funcs.recommendations_functions.prepare_plan_data(
p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc,
default_recommendations
)
property_details_epc = p.get_property_details_epc(portfolio_id=body.portfolio_id)
property_data = p.get_full_property_data(current_valuation=valuations["current_value"])
db_funcs.property_functions.create_property_details_epc(session, property_details_epc)
db_funcs.property_functions.update_or_create_property_spatial_details(
session, p.uprn, p.spatial
)
db_funcs.property_functions.update_property_data(
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
)
if not recommendations_to_upload:
continue
new_plan_id = db_funcs.recommendations_functions.create_plan(
session, plan=property_plan_data
)
db_funcs.recommendations_functions.upload_recommendations(
session, recommendations_to_upload, p.id, new_plan_id
)
db_funcs.funding_functions.upload_funding(
session, p, new_plan_id, recommendations_to_upload
)
except Exception as e:
# Rollback the session if an error occurs
logger.warning("Failed i = %s" % str(i))
logger.error(f"An error occurred during batch starting at index {i}: {e}")
logger.error(f"property is uprn {p.uprn} id {p.id} address {p.address}")
funding_payload = [
{"plan_id": plan_id_by_property[f["property_id"]], **{k: v for k, v in f.items() if k != "property_id"}}
for f in funding_to_create if f["property_id"] in plan_id_by_property
]
db_funcs.funding_functions.bulk_upload_funding_packages(session, funding_payload)
logger.info("Work completed, updating log status")