completed data upload and added batch uploading to manage large quantities of data

This commit is contained in:
Khalim Conn-Kowlessar 2023-10-13 13:24:49 +11:00
parent 042fbea083
commit 28550efbe5
5 changed files with 64 additions and 43 deletions

View file

@ -300,7 +300,14 @@ class Property(Definitions):
if len(attributes) == 0:
# We attempt to perform the clean on the fly
cleaner_cls = all_cleaner_map[description]
attributes = [cleaner_cls(self.data[description]).process()]
cleaner_cls = cleaner_cls(self.data[description])
processed = {
"original_description": self.data[description],
"clean_description": cleaner_cls.description.replace("(assumed)", "").rstrip().capitalize(),
**cleaner_cls.process()
}
attributes = [processed]
setattr(self, self.ATTRIBUTE_MAP[description], attributes[0])

View file

@ -11,4 +11,4 @@ db_string = connection_string.format(
dbname=get_settings().DB_NAME,
)
db_engine = create_engine(db_string, pool_size=20, max_overflow=5)
db_engine = create_engine(db_string, pool_size=5, max_overflow=5)

View file

@ -36,9 +36,12 @@ from recommendations.optimiser.optimiser_functions import prepare_input_measures
from recommendations.WallRecommendations import WallRecommendations
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet
from tqdm import tqdm
logger = setup_logger()
BATCH_SIZE = 5
router = APIRouter(
prefix="/plan",
tags=["plan"],
@ -74,16 +77,16 @@ async def trigger_plan(body: PlanTriggerRequest):
session, portfolio_id=body.portfolio_id, address=config['address'], postcode=config['postcode']
)
# if a new record was not created, we don't produduce recommendations
if not is_new:
continue
# TODO: Need to add heat demand target
create_property_targets(
session,
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
)
# if not is_new:
# continue
# # TODO: Need to add heat demand target
# create_property_targets(
# session,
# property_id=property_id,
# portfolio_id=body.portfolio_id,
# epc_target=body.goal_value,
# heat_demand_target=None
# )
input_properties.append(
Property(
@ -115,6 +118,11 @@ async def trigger_plan(body: PlanTriggerRequest):
# TODO: Move this to a class. We probably want a Recommender class which takes the injects the optimisers
# in as a dependency and then the optimisers can take the input measures in as part of the setup() method
# import pickle
# with open("input_properties.pickle", "rb") as f:
# input_properties = pickle.load(f)
recommendations = {}
recommendations_scoring_data = []
@ -278,40 +286,48 @@ async def trigger_plan(body: PlanTriggerRequest):
# 3) the recommendations
logger.info("Uploading recommendations to the database")
# Upload property data
for p in input_properties:
property_details_epc = p.get_property_details_epc(portfolio_id=body.portfolio_id,
rating_lookup=rating_lookup)
create_property_details_epc(session, property_details_epc)
for i in tqdm(range(0, len(input_properties), BATCH_SIZE)):
try:
# Take a slice of the input_properties list to make a batch
batch_properties = input_properties[i:i + BATCH_SIZE]
property_data = p.get_full_property_data()
update_property_data(session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data)
for p in batch_properties:
# Upload recommendations
recommendations_to_upload = recommendations.get(p.id, [])
# Your existing operations
property_details_epc = p.get_property_details_epc(
portfolio_id=body.portfolio_id, rating_lookup=rating_lookup
)
create_property_details_epc(session, property_details_epc)
if not recommendations_to_upload:
continue
property_data = p.get_full_property_data()
update_property_data(
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
)
# Create a plan
new_plan_id = create_plan(
session,
{
"portfolio_id": body.portfolio_id,
"property_id": p.id,
"is_default": True
}
)
recommendations_to_upload = recommendations.get(p.id, [])
if not recommendations_to_upload:
continue
# Upload recommendations
uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id)
new_plan_id = create_plan(session, {
"portfolio_id": body.portfolio_id,
"property_id": p.id,
"is_default": True
})
# Finally, match the recommendation to the plan
create_plan_recommendations(
session,
plan_id=new_plan_id,
recommendation_ids=uploaded_recommendation_ids
)
uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id)
create_plan_recommendations(
session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids
)
# Commit the session after each batch
session.commit()
except Exception as e:
# Rollback the session if an error occurs
session.rollback()
print("Failed i = %s" % str(i))
logger.error(f"An error occurred during batch starting at index {i}: {e}")
logger.info("Creating portfolio aggregations")
# We implement this in the simplest way possible which will be just to query the database for all

View file

@ -69,8 +69,6 @@ class FloorRecommendations(Definitions):
return
if u_value:
if self.property.data["property-type"] != "House":
raise NotImplementedError("Implement me")
# By being built more recently than this, it means that the property was likely build with soild
# concrete floors with insulation already

View file

@ -307,7 +307,7 @@ def get_roof_u_value(
# Get the U-value from table S10 based on the age band and the determined column
u_value = s10.loc[s10['Age_band'].str.contains(age_band), column].values[0]
return u_value
return float(u_value)
def estimate_perimeter(floor_area, num_rooms):