From 6d381c2b658d031db5bacefb211230a740d30333 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Mon, 21 Aug 2023 11:03:54 +0100 Subject: [PATCH] refactored database upload so it's more efficient --- .../app/db/functions/materials_functions.py | 9 +- .../app/db/functions/portfolio_functions.py | 56 +++--- .../app/db/functions/property_functions.py | 160 +++++++++--------- .../db/functions/recommendations_functions.py | 104 ++++++++---- backend/app/plan/router.py | 53 +++--- 5 files changed, 194 insertions(+), 188 deletions(-) diff --git a/backend/app/db/functions/materials_functions.py b/backend/app/db/functions/materials_functions.py index dd67bc4b..a9995cf4 100644 --- a/backend/app/db/functions/materials_functions.py +++ b/backend/app/db/functions/materials_functions.py @@ -1,15 +1,12 @@ -from backend.app.db.connection import db_engine from backend.app.db.models.materials import Material -from sqlalchemy.orm import sessionmaker -def get_materials(): +def get_materials(session): """ This function will retrieve all materials from the database. :return: A list of Material objects if successful, an empty list otherwise. """ - Session = sessionmaker(bind=db_engine) - with Session() as session: - materials = session.query(Material).filter(Material.is_active).all() + + materials = session.query(Material).filter(Material.is_active).all() return materials if materials else [] diff --git a/backend/app/db/functions/portfolio_functions.py b/backend/app/db/functions/portfolio_functions.py index 9da590dd..e3ca001a 100644 --- a/backend/app/db/functions/portfolio_functions.py +++ b/backend/app/db/functions/portfolio_functions.py @@ -1,39 +1,35 @@ -from sqlalchemy.orm import sessionmaker from sqlalchemy import func -from backend.app.db.connection import db_engine from backend.app.db.models.recommendations import Plan, PlanRecommendations, Recommendation from backend.app.db.models.portfolio import Portfolio -def aggregate_portfolio_recommendations(portfolio_id: int): - Session = sessionmaker(bind=db_engine) - with Session() as session: - # Aggregate multiple fields - aggregates = ( - session.query( - func.sum(Recommendation.estimated_cost).label("cost"), - # For future usage we will aggregate multiple fields in this step - # func.sum(Recommendation.heat_demand).label("total_heat_demand"), - # func.sum(Recommendation.energy_savings).label("total_energy_savings") - ) - .join(PlanRecommendations, PlanRecommendations.recommendation_id == Recommendation.id) - .join(Plan, Plan.id == PlanRecommendations.plan_id) - .filter(Plan.portfolio_id == portfolio_id, Plan.is_default == True, Recommendation.default == True) - .one() +def aggregate_portfolio_recommendations(session, portfolio_id: int): + # Aggregate multiple fields + aggregates = ( + session.query( + func.sum(Recommendation.estimated_cost).label("cost"), + # For future usage we will aggregate multiple fields in this step + # func.sum(Recommendation.heat_demand).label("total_heat_demand"), + # func.sum(Recommendation.energy_savings).label("total_energy_savings") ) + .join(PlanRecommendations, PlanRecommendations.recommendation_id == Recommendation.id) + .join(Plan, Plan.id == PlanRecommendations.plan_id) + .filter(Plan.portfolio_id == portfolio_id, Plan.is_default == True, Recommendation.default == True) + .one() + ) - aggregates_dict = { - "cost": aggregates.cost or 0, - # "total_heat_demand": aggregates.total_heat_demand or 0, - # "total_energy_savings": aggregates.total_energy_savings or 0 - } + aggregates_dict = { + "cost": aggregates.cost or 0, + # "total_heat_demand": aggregates.total_heat_demand or 0, + # "total_energy_savings": aggregates.total_energy_savings or 0 + } - # Get the portfolio and update the fields - portfolio = session.query(Portfolio).filter_by(id=portfolio_id).one() - # Update the data - for key, value in aggregates_dict.items(): - setattr(portfolio, key, value) + # Get the portfolio and update the fields + portfolio = session.query(Portfolio).filter_by(id=portfolio_id).one() + # Update the data + for key, value in aggregates_dict.items(): + setattr(portfolio, key, value) - # Merge the updated portfolio back into the session - session.merge(portfolio) - session.commit() + # Merge the updated portfolio back into the session + session.merge(portfolio) + session.commit() diff --git a/backend/app/db/functions/property_functions.py b/backend/app/db/functions/property_functions.py index 79317b91..9fe7c5d5 100644 --- a/backend/app/db/functions/property_functions.py +++ b/backend/app/db/functions/property_functions.py @@ -3,132 +3,128 @@ ### import datetime import pytz -from sqlalchemy.orm import sessionmaker from backend.app.db.models.portfolio import ( PropertyModel, PropertyCreationStatus, PortfolioStatus, PropertyTargetsModel, PropertyDetailsEpcModel ) -from backend.app.db.connection import db_engine from sqlalchemy.orm.exc import NoResultFound -def create_property(portfolio_id: int, address: str, postcode: str) -> (int, bool): +def create_property(session, portfolio_id: int, address: str, postcode: str) -> (int, bool): """ This function will create a record for the property in the database if it does not exist. If it does exist, it will just update the updated_at field. + :param session: The database session :param portfolio_id: The ID of the portfolio the property belongs to :param address: The address of the property :param postcode: The postcode of the property :return: The ID of the property and a boolean indicating whether it was created or not """ - Session = sessionmaker(bind=db_engine) - with Session() as session: - try: - # Attempt to fetch the existing property - existing_property = session.query(PropertyModel).filter_by( - address=address, postcode=postcode, portfolio_id=portfolio_id - ).one() + try: + # Attempt to fetch the existing property + existing_property = session.query(PropertyModel).filter_by( + address=address, postcode=postcode, portfolio_id=portfolio_id + ).one() - # Update the 'updated_at' field - existing_property.updated_at = datetime.datetime.now(pytz.utc) + # Update the 'updated_at' field + existing_property.updated_at = datetime.datetime.now(pytz.utc) - # Merge the updated property back into the session - session.merge(existing_property) - session.commit() + # Merge the updated property back into the session + session.merge(existing_property) + session.commit() - return existing_property.id, False + return existing_property.id, False - except NoResultFound: - # Property doesn't exist, create a new one - new_property = PropertyModel( - address=address, - postcode=postcode, - portfolio_id=portfolio_id, - creation_status=PropertyCreationStatus.LOADING, - status=PortfolioStatus.ASSESSMENT.value, - has_pre_condition_report=False, - has_recommendations=False - ) + except NoResultFound: + # Property doesn't exist, create a new one + new_property = PropertyModel( + address=address, + postcode=postcode, + portfolio_id=portfolio_id, + creation_status=PropertyCreationStatus.LOADING, + status=PortfolioStatus.ASSESSMENT.value, + has_pre_condition_report=False, + has_recommendations=False + ) - # Add the new property to the session - session.add(new_property) + # Add the new property to the session + session.add(new_property) - session.commit() + session.commit() - return new_property.id, True + return new_property.id, True -def create_property_targets(property_id: int, portfolio_id: int, epc_target=None, heat_demand_target=None): +def create_property_targets(session, property_id: int, portfolio_id: int, epc_target=None, heat_demand_target=None): """ This function will create a record for the property targets in the database if it does not exist. + :param session: The database session :param property_id: The ID of the property the targets belong to :param portfolio_id: The ID of the portfolio the property belongs to :param epc_target: Goal EPC value for the property :param heat_demand_target: Heat demand target for the property in kwh/m^2/year :return: """ - Session = sessionmaker(bind=db_engine) - with Session() as session: - new_target = PropertyTargetsModel( - property_id=property_id, - portfolio_id=portfolio_id, - epc=epc_target, - heat_demand=heat_demand_target - ) - session.add(new_target) + + new_target = PropertyTargetsModel( + property_id=property_id, + portfolio_id=portfolio_id, + epc=epc_target, + heat_demand=heat_demand_target + ) + session.add(new_target) + session.commit() + + return True + + +def update_property_data(session, property_id: int, portfolio_id: int, property_data: dict): + now = datetime.datetime.now(pytz.utc) + + try: + # Attempt to fetch the existing property + existing_property = session.query(PropertyModel).filter_by( + id=property_id, portfolio_id=portfolio_id + ).one() + + # Update the fields with the data in property_data + for key, value in property_data.items(): + setattr(existing_property, key, value) + + existing_property.updated_at = now + + # Merge the updated property back into the session and commit + session.merge(existing_property) session.commit() - return True - - -def update_property_data(property_id: int, portfolio_id: int, property_data: dict): - Session = sessionmaker(bind=db_engine) - now = datetime.datetime.now(pytz.utc) - with Session() as session: - try: - # Attempt to fetch the existing property - existing_property = session.query(PropertyModel).filter_by( - id=property_id, portfolio_id=portfolio_id - ).one() - - # Update the fields with the data in property_data - for key, value in property_data.items(): - setattr(existing_property, key, value) - - existing_property.updated_at = now - - # Merge the updated property back into the session and commit - session.merge(existing_property) - session.commit() - - except NoResultFound: - raise Exception(f"Property with property_id {property_id} and portfolio_id {portfolio_id} not found") + except NoResultFound: + raise Exception(f"Property with property_id {property_id} and portfolio_id {portfolio_id} not found") return True -def create_property_details_epc(property_details_epc: dict): +def create_property_details_epc(session, property_details_epc: dict): """ This function will create or update a record for the property details EPC in the database. + :param session: The database session :param property_details_epc: A dictionary containing details about the property EPC. :return: True if successful, False otherwise. """ - Session = sessionmaker(bind=db_engine) - with Session() as session: - existing_record = session.query(PropertyDetailsEpcModel).filter_by( - portfolio_id=property_details_epc["portfolio_id"], - property_id=property_details_epc["property_id"] - ).first() - if existing_record: - # If the record exists, update its fields - for key, value in property_details_epc.items(): - setattr(existing_record, key, value) - else: - # If the record doesn't exist, create a new one - new_property_details_epc = PropertyDetailsEpcModel(**property_details_epc) - session.add(new_property_details_epc) + existing_record = session.query(PropertyDetailsEpcModel).filter_by( + portfolio_id=property_details_epc["portfolio_id"], + property_id=property_details_epc["property_id"] + ).first() - session.commit() + if existing_record: + # If the record exists, update its fields + for key, value in property_details_epc.items(): + setattr(existing_record, key, value) + else: + # If the record doesn't exist, create a new one + new_property_details_epc = PropertyDetailsEpcModel(**property_details_epc) + session.add(new_property_details_epc) + + session.commit() return True diff --git a/backend/app/db/functions/recommendations_functions.py b/backend/app/db/functions/recommendations_functions.py index e4f22fd2..ae5d0a5a 100644 --- a/backend/app/db/functions/recommendations_functions.py +++ b/backend/app/db/functions/recommendations_functions.py @@ -1,73 +1,103 @@ from sqlalchemy import text -from sqlalchemy.orm import sessionmaker -from backend.app.db.connection import db_engine from backend.app.db.models.recommendations import Plan, Recommendation, RecommendationMaterials -def create_plan(plan): +def create_plan(session, plan): """ This function will create a record for the plan in the database if it does not exist. :param plan: dictionary of data representing a plan to be created """ - Session = sessionmaker(bind=db_engine) - with Session() as session: - new_plan = Plan(**plan) - session.add(new_plan) - session.commit() + new_plan = Plan(**plan) + session.add(new_plan) + session.commit() - return new_plan.id + return new_plan.id -def create_recommendation(recommendation): +def create_recommendation(session, recommendation): """ This function will create a record for the recommendation in the database if it does not exist. + :param session: The database session :param recommendation: dictionary of data representing a recommendation to be created """ - Session = sessionmaker(bind=db_engine) - with Session() as session: - new_recommendation = Recommendation(**recommendation) - session.add(new_recommendation) - session.commit() + new_recommendation = Recommendation(**recommendation) + session.add(new_recommendation) + session.commit() - return new_recommendation.id + return new_recommendation.id -def create_recommendation_material(recommendation_id, material_id, depth): +def create_recommendation_material(session, recommendation_id, material_id, depth): """ This function will create a record for the recommendation_material in the database if it does not exist. + :param session: The databse session :param recommendation_id: ID of the recommendation :param material_id: ID of the material :param depth: depth of the material, may be null if a material where depth is not applicable """ - Session = sessionmaker(bind=db_engine) - with Session() as session: - new_recommendation_material = RecommendationMaterials( - recommendation_id=recommendation_id, - material_id=material_id, - depth=depth - ) - session.add(new_recommendation_material) - session.commit() + new_recommendation_material = RecommendationMaterials( + recommendation_id=recommendation_id, + material_id=material_id, + depth=depth + ) + session.add(new_recommendation_material) + session.commit() - return new_recommendation_material.id + return new_recommendation_material.id -def create_plan_recommendations(plan_id, recommendation_ids): +def create_plan_recommendations(session, plan_id, recommendation_ids): """ This function will create a record for the plan_recommendation in the database if it does not exist. :param plan_id: ID of the plan :param recommendation_ids: list of recommendation IDs """ - Session = sessionmaker(bind=db_engine) - with Session() as session: - for recommendation_id in recommendation_ids: - session.execute( - text( - 'INSERT INTO plan_recommendations (plan_id, recommendation_id) VALUES (:plan_id, ' - ':recommendation_id)'), - {'plan_id': plan_id, 'recommendation_id': recommendation_id} + + for recommendation_id in recommendation_ids: + session.execute( + text( + 'INSERT INTO plan_recommendations (plan_id, recommendation_id) VALUES (:plan_id, ' + ':recommendation_id)'), + {'plan_id': plan_id, 'recommendation_id': recommendation_id} + ) + session.commit() + + +def upload_recommendations(session, recommendations_to_upload, property_id): + uploaded_recommendation_ids = [] + + for rec in recommendations_to_upload: + new_recommendation = Recommendation( + property_id=property_id, + type=rec["type"], + description=rec["description"], + estimated_cost=rec["cost"], + default=rec["default"], + starting_u_value=rec.get("starting_u_value"), + new_u_value=rec.get("new_u_value"), + sap_points=rec["sap_points"] + ) + session.add(new_recommendation) + uploaded_recommendation_ids.append(new_recommendation) + + # Flush to get the IDs for the newly created recommendations + session.flush() + + # Create recommendation-materials + recommendation_materials = [] + for rec, recommendation in zip(recommendations_to_upload, uploaded_recommendation_ids): + for part in rec["parts"]: + recommendation_material = RecommendationMaterials( + recommendation_id=recommendation.id, + material_id=part["id"], + depth=part["depths"][0] if part["depths"] else None, ) - session.commit() + recommendation_materials.append(recommendation_material) + + session.add_all(recommendation_materials) + session.commit() + + return [rec.id for rec in uploaded_recommendation_ids] diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 3b9c5242..3806b92a 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -13,6 +13,7 @@ from recommendations.WallRecommendations import WallRecommendations from utils.uvalue_estimates import classify_decile_newvalues from backend.app.db.utils import row2dict from starlette.responses import Response +from sqlalchemy.orm import sessionmaker # database interaction functions from backend.app.db.functions.property_functions import ( @@ -20,9 +21,11 @@ from backend.app.db.functions.property_functions import ( ) from backend.app.db.functions.materials_functions import get_materials from backend.app.db.functions.recommendations_functions import ( - create_plan, create_recommendation, create_recommendation_material, create_plan_recommendations + create_plan, create_recommendation, create_recommendation_material, create_plan_recommendations, + upload_recommendations ) from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations +from backend.app.db.connection import db_engine from model_data.optimiser.GainOptimiser import GainOptimiser from model_data.optimiser.CostOptimiser import CostOptimiser @@ -127,6 +130,10 @@ def insert_temp_recommendation_id(property_recommendations): @router.post("/trigger") async def trigger_plan(body: PlanTriggerRequest): + logger.info("Connecting to db") + Session = sessionmaker(bind=db_engine) + session = Session() + logger.info("Getting the inputs") # Read in the trigger file from s3 bucket_name = get_settings().PLAN_TRIGGER_BUCKET @@ -141,7 +148,7 @@ async def trigger_plan(body: PlanTriggerRequest): # Create a record in db property_id, is_new = create_property( - portfolio_id=body.portfolio_id, address=config['address'], postcode=config['postcode'] + session, portfolio_id=body.portfolio_id, address=config['address'], postcode=config['postcode'] ) # if a new record was not created, we don't produduce recommendations @@ -150,6 +157,7 @@ async def trigger_plan(body: PlanTriggerRequest): # TODO: Need to add heat demand target create_property_targets( + session, property_id=property_id, portfolio_id=body.portfolio_id, epc_target=body.goal_value, @@ -193,7 +201,7 @@ async def trigger_plan(body: PlanTriggerRequest): # table probably won't be very large and won't be updated that often. It might be better to # store this data in s3 load it into memory when the app starts up. We will test this - materials = get_materials() + materials = get_materials(session) materials_by_type = filter_materials(materials) logger.info("Getting components and properties recommendations") @@ -319,10 +327,10 @@ async def trigger_plan(body: PlanTriggerRequest): # Upload property data for p in input_properties: property_details_epc = p.get_property_details_epc(portfolio_id=body.portfolio_id, rating_lookup=rating_lookup) - create_property_details_epc(property_details_epc) + create_property_details_epc(session, property_details_epc) property_data = p.get_full_property_data() - update_property_data(property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data) + update_property_data(session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data) # Upload recommendations recommendations_to_upload = recommendations.get(p.id, []) @@ -332,6 +340,7 @@ async def trigger_plan(body: PlanTriggerRequest): # Create a plan new_plan_id = create_plan( + session, { "portfolio_id": body.portfolio_id, "property_id": p.id, @@ -339,36 +348,12 @@ async def trigger_plan(body: PlanTriggerRequest): } ) - # upload recommendations - uploaded_recommendation_ids = [] - for rec in recommendations_to_upload: - - recommendation_id = create_recommendation( - { - "property_id": p.id, - "type": rec["type"], - "description": rec["description"], - "estimated_cost": rec["cost"], - "default": rec["default"], - "starting_u_value": rec.get("starting_u_value"), - "new_u_value": rec.get("new_u_value"), - # TODO: Placeholder for SAP points in place - "sap_points": rec["sap_points"] - # Remaining outputs yet to be handled - } - ) - uploaded_recommendation_ids.append(recommendation_id) - - # create the bridging between the recommendation and the materials - for part in rec["parts"]: - create_recommendation_material( - recommendation_id=recommendation_id, - material_id=part["id"], - depth=part["depths"][0] if part["depths"] else None, - ) + # Upload recommendations + uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id) # Finally, match the recommendation to the plan create_plan_recommendations( + session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids ) @@ -379,6 +364,8 @@ async def trigger_plan(body: PlanTriggerRequest): # way to do this, but it's the simplest and will be a process that we can re-use since when we change a # recommendation from being default to not default, we'll need to re-run this process to re-calculate the # the portfolion level impact - aggregate_portfolio_recommendations(portfolio_id=body.portfolio_id) + aggregate_portfolio_recommendations(session, portfolio_id=body.portfolio_id) + + session.close() return Response(status_code=200)