from sqlalchemy.orm import Session from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import insert from backend.app.db.models.funding import FundingPackage, FundingPackageMeasures def upload_funding(session: Session, p, plan_id, recommendations_to_upload): try: # Prepare data for bulk insert for Recommendation funding_package_data = { "plan_id": plan_id, "scheme": p.scheme, "project_funding": float(p.project_funding), "total_uplift": float(p.total_uplift), "full_project_score": float(p.full_project_score), "partial_project_score": float(p.partial_project_score), "uplift_project_score": float(p.uplift_project_score) } # upload the funding package data and get back the ID new_funding_package = FundingPackage(**funding_package_data) session.add(new_funding_package) session.flush() session.commit() funding_package_id = new_funding_package.id # We now prepare the list of funding measures to be uploaded funding_measures_data = [] for part in p.funded_measures: recommendation_id = part["id"] recommendation = next( (x for x in recommendations_to_upload if x["recommendation_id"] == recommendation_id), {} ) material_id = None if recommendation["parts"]: material_id = recommendation["parts"][0]["id"] part_type = part["type"] if part_type == "extension_cavity_wall_insulation": part_type = "cavity_wall_insulation" if part_type == "sealing_open_fireplace": part_type = "sealing_fireplace" if part == "low_energy_lighting": part_type = "low_energy_lighting_installation" funding_measures_data.append({ "funding_package_id": funding_package_id, "measure": part_type, "material_id": material_id, "innovation_uplift": float(part["innovation_uplift"]), "partial_project_score": float(part["partial_project_score"]), "uplift_project_score": float(part["uplift_project_score"]) }) # Bulk insert the funding measures data if funding_measures_data: session.bulk_insert_mappings(FundingPackageMeasures, funding_measures_data) # flush the changes to get the newly created IDs session.flush() # Commit the transaction session.commit() return True except SQLAlchemyError as e: # Rollback the transaction in case of an error session.rollback() print(f"An error occurred: {e}") return False def bulk_upload_funding_packages( session: Session, funding_payload: list[dict], ): """ Bulk upload: - funding_package - funding_package_measures Assumes caller manages the transaction. """ if not funding_payload: return # --------------------------------------------------------- # 1. Prepare funding package rows # --------------------------------------------------------- funding_rows = [] measures_by_index = [] for f in funding_payload: funding_rows.append({ "plan_id": f["plan_id"], "scheme": f["scheme"], "project_funding": f["project_funding"], "total_uplift": f["total_uplift"], "full_project_score": f["full_project_score"], "partial_project_score": f["partial_project_score"], "uplift_project_score": f["uplift_project_score"], }) measures_by_index.append(f.get("measures", [])) # --------------------------------------------------------- # 2. Insert funding packages and get IDs # --------------------------------------------------------- result = session.execute( insert(FundingPackage) .values(funding_rows) .returning(FundingPackage.id) ) funding_package_ids = [row[0] for row in result] # --------------------------------------------------------- # 3. Insert funding package measures # --------------------------------------------------------- measures_rows = [] for funding_package_id, measures in zip( funding_package_ids, measures_by_index ): for m in measures: measures_rows.append({ "funding_package_id": funding_package_id, "measure": m["measure"], "material_id": m["material_id"], "innovation_uplift": m["innovation_uplift"], "partial_project_score": m["partial_project_score"], "uplift_project_score": m["uplift_project_score"], }) if measures_rows: session.execute( insert(FundingPackageMeasures).values(measures_rows) )