diff --git a/backend/apis/GoogleSolarApi.py b/backend/apis/GoogleSolarApi.py index ed7b7422..75f28ceb 100644 --- a/backend/apis/GoogleSolarApi.py +++ b/backend/apis/GoogleSolarApi.py @@ -589,14 +589,61 @@ class GoogleSolarApi: self.double_property = True @staticmethod + def calculate_percentage_decrease(start_efficiency, end_efficiency, consumption_averages): + """ + Calculate the percentage decrease in consumption between two energy efficiency ratings. + :param start_efficiency: The starting energy efficiency rating. + :param end_efficiency: The ending energy efficiency rating. + :param consumption_averages: The DataFrame containing the consumption averages. + :return: + """ + + start_consumption = consumption_averages.loc[ + consumption_averages["current-energy-efficiency"].astype(str) == str(start_efficiency), "total_consumption" + ].values[0] + + end_consumption = consumption_averages.loc[ + consumption_averages["current-energy-efficiency"].astype(str) == str(end_efficiency), "total_consumption" + ].values[0] + + percentage_decrease = ((start_consumption - end_consumption) / start_consumption) * 100 + # percentage_decrease cannot be nehative + if percentage_decrease < 0: + percentage_decrease = 0 + return percentage_decrease + + @classmethod + def estimate_new_consumption( + cls, current_energy_efficiency, target_efficiency, current_consumption, ofgem_consumption_averages + ): + """ + Given then consumption_averages dataset, which is produced as a result of the training_data.py script, + for the energy kwh models, this function will estimate the new consumption based on the current consumption, + based on the expected reduction in consumption from the current rating to the target rating. + :param current_energy_efficiency: The current energy efficiency rating + :param target_efficiency: The target energy efficiency rating + :param current_consumption: The current consumption of the property + :param ofgem_consumption_averages: DataFrame of the Ofgem consumption averages + :return: + """ + percentage_decrease = cls.calculate_percentage_decrease( + start_efficiency=current_energy_efficiency, + end_efficiency=target_efficiency, + consumption_averages=ofgem_consumption_averages + ) + new_consumption = current_consumption * (1 - percentage_decrease / 100) + return new_consumption + + @classmethod def prepare_input_data( + cls, input_properties: List[Property], - energy_consumption_client: EnergyConsumptionModel, + ofgem_consumption_averages: pd.DataFrame, body: PlanTriggerRequest ): """ :param input_properties: List of properties - :param energy_consumption_client: EnergyConsumptionModel instance + :param ofgem_consumption_averages: DataFrame of the Ofgem consumption averages :param body: PlanTriggerRequest instance This sets up the data required to make the solar api request :return: @@ -610,12 +657,13 @@ class GoogleSolarApi: # Energy consumption is adjusted for the property's expected post retrofit state # We set the target rating to EPC C, which is the typical EPC rating we would expect the # property to achieve post retrofit of just the fabric - "energy_consumption": energy_consumption_client.estimate_new_consumption( + "energy_consumption": cls.estimate_new_consumption( current_energy_efficiency=p.data["current-energy-efficiency"], target_efficiency="69", current_consumption=p.estimate_electrical_consumption( assumed_ashp_efficiency=assumptions.AVERAGE_ASHP_EFFICIENCY, exclusions=body.exclusions - ) + ), + ofgem_consumption_averages=ofgem_consumption_averages ), "property_id": p.id, "uprn": p.uprn @@ -628,12 +676,13 @@ class GoogleSolarApi: # Energy consumption is adjusted for the property's expected post retrofit state # We set the target rating to EPC C, which is the typical EPC rating we would expect the # property to achieve post retrofit of just the fabric - "energy_consumption": energy_consumption_client.estimate_new_consumption( + "energy_consumption": cls.estimate_new_consumption( current_energy_efficiency=p.data["current-energy-efficiency"], target_efficiency="69", current_consumption=p.estimate_electrical_consumption( assumed_ashp_efficiency=assumptions.AVERAGE_ASHP_EFFICIENCY, exclusions=body.exclusions ), + ofgem_consumption_averages=ofgem_consumption_averages ), "property_id": p.id, "uprn": p.uprn diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 074b5b75..26c84c81 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -40,7 +40,6 @@ from utils.logger import setup_logger from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3 from backend.ml_models.Valuation import PropertyValuation -from etl.bill_savings.EnergyConsumptionModel import EnergyConsumptionModel from etl.bill_savings.KwhData import KwhData from etl.spatial.OpenUprnClient import OpenUprnClient @@ -486,6 +485,16 @@ async def trigger_plan(body: PlanTriggerRequest): if not input_properties: return Response(status_code=204) + # Set up model api and warm up the lambdas + model_api = ModelApi( + portfolio_id=body.portfolio_id, + timestamp=created_at, + prediction_buckets=get_prediction_buckets() + ) + await model_api.async_warm_up_lambdas( + model_prefies=model_api.KWH_MODEL_PREFIXES + model_api.MODEL_PREFIXES + ) + # The materials data could be cached or local so we don't need to make # consistent requests to the backend for # the same data @@ -493,32 +502,14 @@ async def trigger_plan(body: PlanTriggerRequest): materials = get_materials(session) cleaned = get_cleaned() - dataset_version = "2024-07-08" - energy_consumption_client = EnergyConsumptionModel( - model_paths={ - "heating_kwh": f"model_directory/energy_consumption_model/heating_kwh_{dataset_version}.pkl", - "hot_water_kwh": f"model_directory/energy_consumption_model/hot_water_kwh_{dataset_version}.pkl" - }, - dummy_schema_path=f"model_directory/energy_consumption_model/{dataset_version}_dummy_schema.pkl", - consumption_average_path=f"energy_consumption/{dataset_version}/consumption_averages.parquet", - cleaned=cleaned, - environment=get_settings().ENVIRONMENT - ) - kwh_client = KwhData(bucket=get_settings().DATA_BUCKET, read_consumption_data=True) - model_api = ModelApi( - portfolio_id=body.portfolio_id, - timestamp=created_at, - prediction_buckets=get_prediction_buckets() - ) - epcs_for_scoring = kwh_client.transform(data=kwh_client.prepare_epc(input_properties), cleaned=cleaned) - kwh_preds = model_api.paginated_predictions( + kwh_preds = await model_api.async_paginated_predictions( data=epcs_for_scoring, bucket=get_settings().DATA_BUCKET, - model_prefixes=["heating_kwh_predictions", "hotwater_kwh_predictions"], + model_prefixes=model_api.KWH_MODEL_PREFIXES, extract_ids=False, batch_size=SCORING_BATCH_SIZE ) @@ -534,9 +525,15 @@ async def trigger_plan(body: PlanTriggerRequest): # extensions, since it doesn't seem to do a great job logger.info("Performing solar analysis") + + ofgem_consumption_averages = read_dataframe_from_s3_parquet( + bucket_name=get_settings().DATA_BUCKET, + file_key=f"energy_consumption/2024-07-08/consumption_averages.parquet" + ) + building_solar_config, unit_solar_config = GoogleSolarApi.prepare_input_data( input_properties=input_properties, - energy_consumption_client=energy_consumption_client, + ofgem_consumption_averages=ofgem_consumption_averages, body=body ) @@ -591,7 +588,7 @@ async def trigger_plan(body: PlanTriggerRequest): "carbon_ending"] ) - all_predictions = model_api.paginated_predictions( + all_predictions = await model_api.async_paginated_predictions( data=recommendations_scoring_data, bucket=get_settings().DATA_BUCKET, batch_size=SCORING_BATCH_SIZE @@ -620,10 +617,10 @@ async def trigger_plan(body: PlanTriggerRequest): scoring_epcs = pd.DataFrame(scoring_epcs) scoring_epcs = kwh_client.transform(data=scoring_epcs, cleaned=cleaned) - kwh_simulation_predictions = model_api.paginated_predictions( + kwh_simulation_predictions = await model_api.async_paginated_predictions( data=scoring_epcs, bucket=get_settings().DATA_BUCKET, - model_prefixes=["heating_kwh_predictions", "hotwater_kwh_predictions"], + model_prefixes=model_api.KWH_MODEL_PREFIXES, batch_size=SCORING_BATCH_SIZE ) @@ -774,7 +771,7 @@ async def trigger_plan(body: PlanTriggerRequest): update_or_create_property_spatial_details(session, p.uprn, p.spatial) property_data = p.get_full_property_data(current_valuation=valuations["current_value"]) - + update_property_data( session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data ) diff --git a/backend/docker/Dockerfile b/backend/docker/Dockerfile index 5302476d..006f088a 100644 --- a/backend/docker/Dockerfile +++ b/backend/docker/Dockerfile @@ -40,6 +40,7 @@ COPY ./etl/epc/ ./etl/epc/ COPY ./etl/epc_clean/ ./etl/epc_clean/ COPY ./etl/bill_savings/ ./etl/bill_savings/ COPY ./etl/spatial/ ./etl/spatial/ +COPY ./datatypes/ ./datatypes/ # Set the ENTRYPOINT to the AWS Lambda RIC and CMD to your function handler ENTRYPOINT [ "/usr/local/bin/python", "-m", "awslambdaric" ] diff --git a/backend/docker/lambda.Dockerfile b/backend/docker/lambda.Dockerfile index 763a24b2..1c079981 100644 --- a/backend/docker/lambda.Dockerfile +++ b/backend/docker/lambda.Dockerfile @@ -44,6 +44,7 @@ COPY ./etl/epc_clean/ ./etl/epc_clean/ COPY ./etl/bill_savings/ ./etl/bill_savings/ COPY ./etl/spatial/ ./etl/spatial/ COPY ./BaseUtility.py ./BaseUtility.py +COPY ./datatypes/ ./datatypes/ # Set the ENTRYPOINT to the AWS Lambda RIC and CMD to your function handler diff --git a/backend/ml_models/api.py b/backend/ml_models/api.py index e922d7fc..6e0a4162 100644 --- a/backend/ml_models/api.py +++ b/backend/ml_models/api.py @@ -1,3 +1,5 @@ +import aiohttp +import asyncio import pandas as pd from tqdm import tqdm import requests @@ -18,6 +20,8 @@ class ModelApi: # "hot_water_cost_predictions", ] + KWH_MODEL_PREFIXES = ["heating_kwh_predictions", "hotwater_kwh_predictions"] + MODEL_URLS = { "sap_change_predictions": "sapmodel", "heat_demand_predictions": "heatmodel", @@ -120,6 +124,28 @@ class ModelApi: # depending on how you want to handle errors in your application return None + async def predict_async(self, file_location, model_prefix: str): + """Makes an asynchronous POST request to the Model API with the provided parameters.""" + logger.info(f"Making request to {model_prefix} change api") + url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict" + payload = { + "file_location": file_location, + "property_id": "", # This should get removed + "portfolio_id": self.portfolio_id, + "created_at": self.timestamp + } + + async with aiohttp.ClientSession() as session: + try: + async with session.post( + url, json=payload, headers={"Content-Type": "application/json"}, timeout=120 + ) as response: + response.raise_for_status() + return await response.json() + except aiohttp.ClientError as e: + logger.error(f"An error occurred: {e}") + return None + @staticmethod def extract_phase(recommendation_id): if 'phase=' in recommendation_id: @@ -180,6 +206,43 @@ class ModelApi: return predictions + async def predict_all_async(self, df, bucket, model_prefixes=None, extract_ids=True) -> dict: + """Uploads data and makes asynchronous requests to the model APIs for predictions.""" + model_prefixes = self.MODEL_PREFIXES if model_prefixes is None else model_prefixes + + predictions = {} + tasks = [] + async with aiohttp.ClientSession() as session: + for model_prefix in model_prefixes: + logger.info(f"Scoring for model prefix: {model_prefix}") + file_location = self.upload_scoring_data(df, bucket, model_prefix) + # Schedule the prediction request as a coroutine + tasks.append( + self.predict_async(f"s3://{bucket}/" + file_location, model_prefix) + ) + + # Gather all asynchronous tasks (execute them concurrently) + responses = await asyncio.gather(*tasks, return_exceptions=True) + + for model_prefix, response in zip(model_prefixes, responses): + if response: + predictions_bucket = self.prediction_buckets[model_prefix] + predictions_df = pd.DataFrame( + read_dataframe_from_s3_parquet( + bucket_name=predictions_bucket, + file_key=response["storage_filepath"].split(predictions_bucket + "/")[1] + ) + ) + predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1) + if extract_ids: + predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+', + expand=True) + predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase) + + predictions[model_prefix] = predictions_df + + return predictions + def paginated_predictions(self, data, bucket, batch_size, model_prefixes=None, extract_ids=True): all_predictions = self.predictions_template() to_loop_over = range(0, data.shape[0], batch_size) @@ -196,3 +259,54 @@ class ModelApi: all_predictions[key] = pd.concat([all_predictions[key], scored]) return all_predictions + + async def async_warm_up_lambdas(self, model_prefies=None): + """Send asynchronous pre-flight requests to each model endpoint to wake up the cold Lambdas without waiting + for responses.""" + logger.info("Asynchronously warming up Lambda functions...") + + model_prefixes = self.MODEL_PREFIXES if model_prefies is None else model_prefies + + tasks = [] + async with aiohttp.ClientSession() as session: + for model_prefix in model_prefixes: + url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict" + # Create a coroutine for each warm-up request and add it to the tasks list + tasks.append(self._send_warm_up_request(session, url, model_prefix)) + + # Run all tasks concurrently but don't wait for the responses to finish + await asyncio.gather(*tasks, return_exceptions=True) + + @staticmethod + async def _send_warm_up_request(session, url, model_prefix): + """Helper method to send a pre-flight request to a given model URL.""" + try: + async with session.post(url, json={}, timeout=2) as response: + # Log success for monitoring but do not block on the response + logger.info(f"Warmed up {model_prefix} with status code: {response.status}") + except aiohttp.ClientError as e: + logger.warning(f"Failed to warm up {model_prefix}: {e}") + + logger.info("Lambda functions are warmed up and ready to go!") + + def async_paginated_predictions(self, data, bucket, batch_size, model_prefixes=None, extract_ids=True): + + all_predictions = self.predictions_template() + to_loop_over = range(0, data.shape[0], batch_size) + + async def run_batches(): + for chunk in tqdm(to_loop_over, total=len(to_loop_over)): + predictions_dict = await self.predict_all_async( + df=data.iloc[chunk:chunk + batch_size], + bucket=bucket, + model_prefixes=model_prefixes, + extract_ids=extract_ids + ) + + for key, scored in predictions_dict.items(): + all_predictions[key] = pd.concat([all_predictions[key], scored]) + + # Run the async function + asyncio.run(run_batches()) + + return all_predictions diff --git a/backend/requirements/requirements.txt b/backend/requirements/requirements.txt index 11f29183..dd5c34ca 100644 --- a/backend/requirements/requirements.txt +++ b/backend/requirements/requirements.txt @@ -12,9 +12,10 @@ pydantic-settings==2.6.0 psycopg2-binary==2.9.10 python-jose==3.3.0 cryptography==43.0.3 +mangum==0.19.0 # AWS boto3==1.35.44 -# ML, Data, Data Science +# ML, Data Science usaddress==0.5.11 epc-api-python==1.0.2 fuzzywuzzy==0.18.0 @@ -24,6 +25,7 @@ msgpack==1.1.0 scikit-learn==1.5.2 cffi==1.15.1 mip==1.15.0 +# Data pyarrow==17.0.0 fastparquet==2024.5.0 - +aiohttp==3.10.10