From 28550efbe55d107f0c7b609cbc1ef49e526dc93e Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Fri, 13 Oct 2023 13:24:49 +1100 Subject: [PATCH] completed data upload and added batch uploading to manage large quantities of data --- backend/Property.py | 9 ++- backend/app/db/connection.py | 2 +- backend/app/plan/router.py | 92 +++++++++++++++---------- recommendations/FloorRecommendations.py | 2 - recommendations/recommendation_utils.py | 2 +- 5 files changed, 64 insertions(+), 43 deletions(-) diff --git a/backend/Property.py b/backend/Property.py index be3d8e9b..2b283e36 100644 --- a/backend/Property.py +++ b/backend/Property.py @@ -300,7 +300,14 @@ class Property(Definitions): if len(attributes) == 0: # We attempt to perform the clean on the fly cleaner_cls = all_cleaner_map[description] - attributes = [cleaner_cls(self.data[description]).process()] + cleaner_cls = cleaner_cls(self.data[description]) + processed = { + "original_description": self.data[description], + "clean_description": cleaner_cls.description.replace("(assumed)", "").rstrip().capitalize(), + **cleaner_cls.process() + } + + attributes = [processed] setattr(self, self.ATTRIBUTE_MAP[description], attributes[0]) diff --git a/backend/app/db/connection.py b/backend/app/db/connection.py index 28f06047..9efdfd25 100644 --- a/backend/app/db/connection.py +++ b/backend/app/db/connection.py @@ -11,4 +11,4 @@ db_string = connection_string.format( dbname=get_settings().DB_NAME, ) -db_engine = create_engine(db_string, pool_size=20, max_overflow=5) +db_engine = create_engine(db_string, pool_size=5, max_overflow=5) diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index ea493f73..25855fef 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -36,9 +36,12 @@ from recommendations.optimiser.optimiser_functions import prepare_input_measures from recommendations.WallRecommendations import WallRecommendations from utils.logger import setup_logger from utils.s3 import read_dataframe_from_s3_parquet +from tqdm import tqdm logger = setup_logger() +BATCH_SIZE = 5 + router = APIRouter( prefix="/plan", tags=["plan"], @@ -74,16 +77,16 @@ async def trigger_plan(body: PlanTriggerRequest): session, portfolio_id=body.portfolio_id, address=config['address'], postcode=config['postcode'] ) # if a new record was not created, we don't produduce recommendations - if not is_new: - continue - # TODO: Need to add heat demand target - create_property_targets( - session, - property_id=property_id, - portfolio_id=body.portfolio_id, - epc_target=body.goal_value, - heat_demand_target=None - ) + # if not is_new: + # continue + # # TODO: Need to add heat demand target + # create_property_targets( + # session, + # property_id=property_id, + # portfolio_id=body.portfolio_id, + # epc_target=body.goal_value, + # heat_demand_target=None + # ) input_properties.append( Property( @@ -115,6 +118,11 @@ async def trigger_plan(body: PlanTriggerRequest): # TODO: Move this to a class. We probably want a Recommender class which takes the injects the optimisers # in as a dependency and then the optimisers can take the input measures in as part of the setup() method + + # import pickle + # with open("input_properties.pickle", "rb") as f: + # input_properties = pickle.load(f) + recommendations = {} recommendations_scoring_data = [] @@ -278,40 +286,48 @@ async def trigger_plan(body: PlanTriggerRequest): # 3) the recommendations logger.info("Uploading recommendations to the database") - # Upload property data - for p in input_properties: - property_details_epc = p.get_property_details_epc(portfolio_id=body.portfolio_id, - rating_lookup=rating_lookup) - create_property_details_epc(session, property_details_epc) + for i in tqdm(range(0, len(input_properties), BATCH_SIZE)): + try: + # Take a slice of the input_properties list to make a batch + batch_properties = input_properties[i:i + BATCH_SIZE] - property_data = p.get_full_property_data() - update_property_data(session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data) + for p in batch_properties: - # Upload recommendations - recommendations_to_upload = recommendations.get(p.id, []) + # Your existing operations + property_details_epc = p.get_property_details_epc( + portfolio_id=body.portfolio_id, rating_lookup=rating_lookup + ) + create_property_details_epc(session, property_details_epc) - if not recommendations_to_upload: - continue + property_data = p.get_full_property_data() + update_property_data( + session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data + ) - # Create a plan - new_plan_id = create_plan( - session, - { - "portfolio_id": body.portfolio_id, - "property_id": p.id, - "is_default": True - } - ) + recommendations_to_upload = recommendations.get(p.id, []) + if not recommendations_to_upload: + continue - # Upload recommendations - uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id) + new_plan_id = create_plan(session, { + "portfolio_id": body.portfolio_id, + "property_id": p.id, + "is_default": True + }) - # Finally, match the recommendation to the plan - create_plan_recommendations( - session, - plan_id=new_plan_id, - recommendation_ids=uploaded_recommendation_ids - ) + uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id) + + create_plan_recommendations( + session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids + ) + + # Commit the session after each batch + session.commit() + + except Exception as e: + # Rollback the session if an error occurs + session.rollback() + print("Failed i = %s" % str(i)) + logger.error(f"An error occurred during batch starting at index {i}: {e}") logger.info("Creating portfolio aggregations") # We implement this in the simplest way possible which will be just to query the database for all diff --git a/recommendations/FloorRecommendations.py b/recommendations/FloorRecommendations.py index 9ee023c4..35e34648 100644 --- a/recommendations/FloorRecommendations.py +++ b/recommendations/FloorRecommendations.py @@ -69,8 +69,6 @@ class FloorRecommendations(Definitions): return if u_value: - if self.property.data["property-type"] != "House": - raise NotImplementedError("Implement me") # By being built more recently than this, it means that the property was likely build with soild # concrete floors with insulation already diff --git a/recommendations/recommendation_utils.py b/recommendations/recommendation_utils.py index 1eef7b05..cd7bb3f8 100644 --- a/recommendations/recommendation_utils.py +++ b/recommendations/recommendation_utils.py @@ -307,7 +307,7 @@ def get_roof_u_value( # Get the U-value from table S10 based on the age band and the determined column u_value = s10.loc[s10['Age_band'].str.contains(age_band), column].values[0] - return u_value + return float(u_value) def estimate_perimeter(floor_area, num_rooms):