diff --git a/backend/app/db/functions/funding_functions.py b/backend/app/db/functions/funding_functions.py index 51dffa21..df36d308 100644 --- a/backend/app/db/functions/funding_functions.py +++ b/backend/app/db/functions/funding_functions.py @@ -1,5 +1,6 @@ from sqlalchemy.orm import Session from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy import insert from backend.app.db.models.funding import FundingPackage, FundingPackageMeasures @@ -69,3 +70,72 @@ def upload_funding(session: Session, p, plan_id, recommendations_to_upload): session.rollback() print(f"An error occurred: {e}") return False + + +def bulk_upload_funding_packages( + session: Session, + funding_payload: list[dict], +): + """ + Bulk upload: + - funding_package + - funding_package_measures + + Assumes caller manages the transaction. + """ + + if not funding_payload: + return + + # --------------------------------------------------------- + # 1. Prepare funding package rows + # --------------------------------------------------------- + funding_rows = [] + measures_by_index = [] + + for f in funding_payload: + funding_rows.append({ + "plan_id": f["plan_id"], + "scheme": f["scheme"], + "project_funding": f["project_funding"], + "total_uplift": f["total_uplift"], + "full_project_score": f["full_project_score"], + "partial_project_score": f["partial_project_score"], + "uplift_project_score": f["uplift_project_score"], + }) + + measures_by_index.append(f.get("measures", [])) + + # --------------------------------------------------------- + # 2. Insert funding packages and get IDs + # --------------------------------------------------------- + result = session.execute( + insert(FundingPackage) + .values(funding_rows) + .returning(FundingPackage.id) + ) + + funding_package_ids = [row[0] for row in result] + + # --------------------------------------------------------- + # 3. Insert funding package measures + # --------------------------------------------------------- + measures_rows = [] + + for funding_package_id, measures in zip( + funding_package_ids, measures_by_index + ): + for m in measures: + measures_rows.append({ + "funding_package_id": funding_package_id, + "measure": m["measure"], + "material_id": m["material_id"], + "innovation_uplift": m["innovation_uplift"], + "partial_project_score": m["partial_project_score"], + "uplift_project_score": m["uplift_project_score"], + }) + + if measures_rows: + session.execute( + insert(FundingPackageMeasures).values(measures_rows) + ) diff --git a/backend/app/db/functions/recommendations_functions.py b/backend/app/db/functions/recommendations_functions.py index b70111da..5b39f86e 100644 --- a/backend/app/db/functions/recommendations_functions.py +++ b/backend/app/db/functions/recommendations_functions.py @@ -96,6 +96,30 @@ def create_plan(session: Session, plan): raise e +def bulk_create_plans(session: Session, plans_to_create: list[dict]) -> dict[int, int]: + if not plans_to_create: + return {} + + payload = [ + { + "property_id": p["property_id"], + **p["plan_data"], + } + for p in plans_to_create + ] + + stmt = ( + insert(Plan) + .values(payload) + .returning(Plan.id, Plan.property_id) + ) + + result = session.execute(stmt).all() + + # property_id -> plan_id + return {row.property_id: row.id for row in result} + + def create_scenario(session: Session, scenario: dict) -> int: existing_scenario = ( session.query(Scenario) @@ -233,6 +257,94 @@ def upload_recommendations(session: Session, recommendations_to_upload, property return False +def bulk_upload_recommendations_and_materials( + session: Session, + recommendation_payload: list[dict], +): + if not recommendation_payload: + return + + # --------------------------------------------------------- + # 1. Prepare recommendation rows + # --------------------------------------------------------- + recommendation_rows = [] + parts_by_index = [] + plan_ids_by_index = [] + + for rec in recommendation_payload: + recommendation_rows.append({ + "property_id": rec["property_id"], + "type": rec["type"], + "measure_type": rec["measure_type"], + "description": rec["description"], + "estimated_cost": rec["estimated_cost"], + "default": rec["default"], + "starting_u_value": rec["starting_u_value"], + "new_u_value": rec["new_u_value"], + "sap_points": rec["sap_points"], + "heat_demand": rec["heat_demand"], + "kwh_savings": rec["kwh_savings"], + "co2_equivalent_savings": rec["co2_equivalent_savings"], + "energy_savings": rec["energy_savings"], + "energy_cost_savings": rec["energy_cost_savings"], + "total_work_hours": rec["total_work_hours"], + "labour_days": rec["labour_days"], + "already_installed": rec["already_installed"], + }) + + parts_by_index.append(rec["parts"]) + plan_ids_by_index.append(rec["plan_id"]) + + # --------------------------------------------------------- + # 2. Insert recommendations and get IDs + # --------------------------------------------------------- + result = session.execute( + insert(Recommendation) + .values(recommendation_rows) + .returning(Recommendation.id) + ) + + recommendation_ids = [row[0] for row in result] + + # --------------------------------------------------------- + # 3. Insert recommendation materials + # --------------------------------------------------------- + materials_rows = [] + + for recommendation_id, parts in zip(recommendation_ids, parts_by_index): + for part in parts: + materials_rows.append({ + "recommendation_id": recommendation_id, + "material_id": part["material_id"], + "depth": part["depth"], + "quantity": part["quantity"], + "quantity_unit": part["quantity_unit"], + "estimated_cost": part["estimated_cost"], + }) + + if materials_rows: + session.execute( + insert(RecommendationMaterials).values(materials_rows) + ) + + # --------------------------------------------------------- + # 4. Insert plan ↔ recommendation links + # --------------------------------------------------------- + plan_recommendation_rows = [ + { + "plan_id": plan_id, + "recommendation_id": recommendation_id, + } + for plan_id, recommendation_id in zip( + plan_ids_by_index, recommendation_ids + ) + ] + + session.execute( + insert(PlanRecommendations).values(plan_recommendation_rows) + ) + + def chunked(iterable, size=100): for i in range(0, len(iterable), size): yield iterable[i:i + size] diff --git a/backend/engine/engine.py b/backend/engine/engine.py index 5aa82dcd..276444e1 100644 --- a/backend/engine/engine.py +++ b/backend/engine/engine.py @@ -1351,11 +1351,7 @@ async def model_engine(body: PlanTriggerRequest): } ) - # TODO: New property_updates, property_epc_details, property_spatial_updates = [], [], [] - # plans_to_create = [{property_id, plan_data}] - # recommendations_to_create = [{plan_ref, recommendation_data}] - # funding_to_create = [{plan_ref, funding_data}] plans_to_create, recommendations_to_create, funding_to_create = [], [], [] # Prepare the data that will need to be uploaded in bulk @@ -1390,7 +1386,69 @@ async def model_engine(body: PlanTriggerRequest): # store recommendations keyed by property for r in recommendations_for_property: - recommendations_to_create.append({"property_id": p.id, "data": r}) + recommendations_to_create.append({ + "property_id": p.id, + # ---- Recommendation core ---- + "type": r["type"], + "measure_type": r["measure_type"], + "description": r["description"], + "estimated_cost": float(r["total"]), + "default": r["default"], + "starting_u_value": float(r["starting_u_value"]) if r.get("starting_u_value") else None, + "new_u_value": float(r["new_u_value"]) if r.get("new_u_value") else None, + "sap_points": float(r["sap_points"]), + "energy_savings": float(r["heat_demand"]), + "kwh_savings": float(r["kwh_savings"]), + "co2_equivalent_savings": float(r["co2_equivalent_savings"]), + "total_work_hours": float(r["labour_hours"]), + "energy_cost_savings": float(r["energy_cost_savings"]), + "labour_days": float(r["labour_days"]), + "already_installed": r["already_installed"], + "heat_demand": float(r["heat_demand"]), + + # ---- parts ---- + "parts": [ + { + "material_id": part["id"], + "depth": int(part["depth"]) if part.get("depth") else None, + "quantity": float(part["quantity"]) if part.get("quantity") else None, + "quantity_unit": part.get("quantity_unit"), + "estimated_cost": float(part.get("total", part.get("total_cost"))), + } + for part in r.get("parts", []) + ], + }) + + if recommendations_for_property and p.funded_measures: + funding_to_create.append({ + "property_id": p.id, + "scheme": p.scheme, + "project_funding": float(p.project_funding), + "total_uplift": float(p.total_uplift), + "full_project_score": float(p.full_project_score), + "partial_project_score": float(p.partial_project_score), + "uplift_project_score": float(p.uplift_project_score), + "measures": [ + { + "measure": ( + "cavity_wall_insulation" + if part["type"] == "extension_cavity_wall_insulation" + else "sealing_fireplace" + if part["type"] == "sealing_open_fireplace" + else part["type"] + ), + "material_id": ( + part["parts"][0]["id"] + if part.get("parts") + else None + ), + "innovation_uplift": float(part["innovation_uplift"]), + "partial_project_score": float(part["partial_project_score"]), + "uplift_project_score": float(part["uplift_project_score"]), + } + for part in p.funded_measures + ], + }) # Bulk upload property data logger.info("Uploading property data in bulk") @@ -1398,65 +1456,25 @@ async def model_engine(body: PlanTriggerRequest): db_funcs.property_functions.bulk_update_properties(session, property_updates) db_funcs.property_functions.bulk_upsert_property_details_epc(session, property_epc_details) db_funcs.property_functions.bulk_upsert_property_spatial(session, property_spatial_updates) + # # Bulk create plans + plan_id_by_property = db_funcs.recommendations_functions.bulk_create_plans(session, plans_to_create) + recommendation_payload = [ + { + "plan_id": plan_id_by_property[r["property_id"]], + **{k: v for k, v in r.items() if k not in ["parts"]}, + "parts": r["parts"], + } for r in recommendations_to_create if r["property_id"] in plan_id_by_property + ] - # TODO: End New + db_funcs.recommendations_functions.bulk_upload_recommendations_and_materials( + session, recommendation_payload + ) - for i in tqdm( - range(0, len(input_properties), BATCH_SIZE), total=int(np.ceil(len(input_properties) / BATCH_SIZE)) - ): - try: - # Take a slice of the input_properties list to make a batch - batch_properties = input_properties[i:i + BATCH_SIZE] - with db_session() as session: - for p in batch_properties: - recommendations_to_upload = recommendations.get(p.id, []) - default_recommendations = [r for r in recommendations_to_upload if r["default"]] - total_sap_points = sum([r["sap_points"] for r in default_recommendations]) - new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points - new_epc = sap_to_epc(new_sap_points) - - total_cost = sum([r["total"] for r in default_recommendations]) - - valuations = PropertyValuation.estimate( - property_instance=p, target_epc=new_epc, total_cost=total_cost - ) - - property_plan_data = db_funcs.recommendations_functions.prepare_plan_data( - p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc, - default_recommendations - ) - - property_details_epc = p.get_property_details_epc(portfolio_id=body.portfolio_id) - property_data = p.get_full_property_data(current_valuation=valuations["current_value"]) - db_funcs.property_functions.create_property_details_epc(session, property_details_epc) - - db_funcs.property_functions.update_or_create_property_spatial_details( - session, p.uprn, p.spatial - ) - - db_funcs.property_functions.update_property_data( - session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data - ) - - if not recommendations_to_upload: - continue - - new_plan_id = db_funcs.recommendations_functions.create_plan( - session, plan=property_plan_data - ) - - db_funcs.recommendations_functions.upload_recommendations( - session, recommendations_to_upload, p.id, new_plan_id - ) - db_funcs.funding_functions.upload_funding( - session, p, new_plan_id, recommendations_to_upload - ) - - except Exception as e: - # Rollback the session if an error occurs - logger.warning("Failed i = %s" % str(i)) - logger.error(f"An error occurred during batch starting at index {i}: {e}") - logger.error(f"property is uprn {p.uprn} id {p.id} address {p.address}") + funding_payload = [ + {"plan_id": plan_id_by_property[f["property_id"]], **{k: v for k, v in f.items() if k != "property_id"}} + for f in funding_to_create if f["property_id"] in plan_id_by_property + ] + db_funcs.funding_functions.bulk_upload_funding_packages(session, funding_payload) logger.info("Work completed, updating log status")