diff --git a/backend/engine/engine.py b/backend/engine/engine.py index 26205328..8968eb0e 100644 --- a/backend/engine/engine.py +++ b/backend/engine/engine.py @@ -517,13 +517,6 @@ async def model_engine(body: PlanTriggerRequest): input_properties = [] for config in tqdm(plan_input): - if config["landlord_property_id"] in ["LE113NWIC95", "NG241FBCT", "NG51BNIC"]: - continue - - if not pd.isnull(config.get("uprn")): - if int(float(config.get("uprn"))) < 0: - continue - # We validate each record in the file. If the record is NOT valid, we need to handle this accordingly uprn = config.get("uprn", None) if pd.isnull(uprn): @@ -532,7 +525,7 @@ async def model_engine(body: PlanTriggerRequest): uprn = int(float(uprn)) epc_searcher = SearchEpc( - address1=config["address"], + address1=str(config["address"]), postcode=config["postcode"], uprn=uprn, auth_token=get_settings().EPC_AUTH_TOKEN, diff --git a/backend/ml_models/api.py b/backend/ml_models/api.py index ddfa7684..149cac49 100644 --- a/backend/ml_models/api.py +++ b/backend/ml_models/api.py @@ -1,3 +1,4 @@ +import json import aiohttp import asyncio import pandas as pd @@ -11,6 +12,8 @@ logger = setup_logger() class ModelApi: + _aiohttp_session: aiohttp.ClientSession = None + MODEL_PREFIXES = [ "sap_change_predictions", "heat_demand_predictions", @@ -54,44 +57,30 @@ class ModelApi: self.portfolio_id = portfolio_id self.timestamp = timestamp self.prediction_buckets = prediction_buckets - self.max_retries = max_retries + self.semaphore = asyncio.Semaphore(5) + + @staticmethod + def get_aiohttp_session(): + if ModelApi._aiohttp_session is None or ModelApi._aiohttp_session.closed: + connector = aiohttp.TCPConnector(limit=10) + ModelApi._aiohttp_session = aiohttp.ClientSession(connector=connector) + return ModelApi._aiohttp_session + + @staticmethod + async def close_aiohttp_session(): + if ModelApi._aiohttp_session and not ModelApi._aiohttp_session.closed: + await ModelApi._aiohttp_session.close() + ModelApi._aiohttp_session = None @staticmethod def predictions_template(): - return { - "sap_change_predictions": pd.DataFrame(), - "heat_demand_predictions": pd.DataFrame(), - "carbon_change_predictions": pd.DataFrame(), - "hotwater_kwh_predictions": pd.DataFrame(), - "heating_kwh_predictions": pd.DataFrame(), - } + return {key: pd.DataFrame() for key in ModelApi.MODEL_URLS.keys()} def upload_scoring_data(self, df: pd.DataFrame, bucket: str, model_prefix: str) -> str: - """ - The sap model api needs a scoring data that is sitting in s3 to use as a dataset to score on - This method allows the user to upload a table as a parquet file. This method will return the file - location, which can be used as the file location in the predict() method - - :param df: Pandas dataframe with scoring data to be uploaded to s3 - :param bucket: Name of the bucket in s3 to upload to - :param model_prefix: The model prefix to be used in the file location - :return: - """ - - # if model_prefix not in self.MODEL_PREFIXES: - # raise ValueError(f"Model prefix specified is not in {self.MODEL_PREFIXES}") - - # Store parquet file in s3 for scoring file_location = f"{model_prefix}/{self.portfolio_id}/{self.timestamp}.parquet" - - logger.info("Storing scoring data to s3") - save_dataframe_to_s3_parquet( - df=df, - bucket_name=bucket, - file_key=file_location - ) - + logger.info(f"Storing scoring data to s3: {file_location}") + save_dataframe_to_s3_parquet(df=df, bucket_name=bucket, file_key=file_location) return file_location def predict(self, file_location, model_prefix: str): @@ -127,25 +116,25 @@ class ModelApi: # depending on how you want to handle errors in your application return None - async def predict_async(self, file_location, model_prefix: str): - """Makes an asynchronous POST request to the Model API with the provided parameters.""" - logger.info(f"Making request to {model_prefix} model API") + async def predict_async(self, file_location, model_prefix: str, session: aiohttp.ClientSession): url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict" payload = { "file_location": file_location, - "property_id": "", # This should get removed + "property_id": "", "portfolio_id": self.portfolio_id, "created_at": self.timestamp } - async with aiohttp.ClientSession() as session: - try: - async with session.post( - url, json=payload, headers={"Content-Type": "application/json"}, timeout=120 - ) as response: - text = await response.text() + headers = { + "Content-Type": "application/json", + "User-Agent": "model-engine-lambda/1.0" + } + async with self.semaphore: + try: + async with session.post(url, json=payload, headers=headers, timeout=120) as response: if response.status != 200: + text = await response.text() logger.error( f"\n--- API ERROR [{model_prefix}] ---\n" f"Status Code: {response.status}\n" @@ -155,10 +144,7 @@ class ModelApi: f"----------------------------" ) return None - - # Successful response - return aiohttp.helpers.json.loads(text) - + return await response.json() except aiohttp.ClientError as e: logger.error(f"[{model_prefix}] HTTP error: {e}") return None @@ -227,41 +213,35 @@ class ModelApi: return predictions async def predict_all_async(self, df, bucket, model_prefixes=None, extract_ids=True) -> dict: - """Uploads data and makes asynchronous requests to the model APIs for predictions.""" model_prefixes = self.MODEL_PREFIXES if model_prefixes is None else model_prefixes - predictions = {} tasks = [] - async with aiohttp.ClientSession() as session: - for model_prefix in model_prefixes: - logger.info(f"Scoring for model prefix: {model_prefix}") - logger.info("Uploading scoring data to S3") - file_location = self.upload_scoring_data(df, bucket, model_prefix) - logger.info("Data uploaded to S3, now making prediction request") - # Schedule the prediction request as a coroutine - tasks.append( - self.predict_async(f"s3://{bucket}/" + file_location, model_prefix) - ) - # Gather all asynchronous tasks (execute them concurrently) - responses = await asyncio.gather(*tasks, return_exceptions=True) + session = self.get_aiohttp_session() - for model_prefix, response in zip(model_prefixes, responses): - if response: - predictions_bucket = self.prediction_buckets[model_prefix] - predictions_df = pd.DataFrame( - read_dataframe_from_s3_parquet( - bucket_name=predictions_bucket, - file_key=response["storage_filepath"].split(predictions_bucket + "/")[1] - ) + for model_prefix in model_prefixes: + logger.info(f"Scoring for model prefix: {model_prefix}") + file_location = self.upload_scoring_data(df, bucket, model_prefix) + tasks.append(self.predict_async(f"s3://{bucket}/" + file_location, model_prefix, session=session)) + + responses = await asyncio.gather(*tasks, return_exceptions=True) + + for model_prefix, response in zip(model_prefixes, responses): + if response: + predictions_bucket = self.prediction_buckets[model_prefix] + predictions_df = pd.DataFrame( + read_dataframe_from_s3_parquet( + bucket_name=predictions_bucket, + file_key=response["storage_filepath"].split(predictions_bucket + "/")[1] ) - predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1) - if extract_ids: - predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+', - expand=True) - predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase) + ) + predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1) + if extract_ids: + predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+', + expand=True) + predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase) - predictions[model_prefix] = predictions_df + predictions[model_prefix] = predictions_df return predictions @@ -283,75 +263,57 @@ class ModelApi: return all_predictions async def async_warm_up_lambdas(self, model_prefies=None): - """Send asynchronous pre-flight requests to each model endpoint to wake up the cold Lambdas without waiting - for responses.""" logger.info("Asynchronously warming up Lambda functions...") - model_prefixes = self.MODEL_PREFIXES if model_prefies is None else model_prefies - - tasks = [] - async with aiohttp.ClientSession() as session: - for model_prefix in model_prefixes: - url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict" - # Create a coroutine for each warm-up request and add it to the tasks list - tasks.append(self._send_warm_up_request(session, url, model_prefix)) - - # Run all tasks concurrently but don't wait for the responses to finish - await asyncio.gather(*tasks, return_exceptions=True) + session = self.get_aiohttp_session() + tasks = [ + self._send_warm_up_request(session, f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict", + model_prefix) + for model_prefix in model_prefixes + ] + await asyncio.gather(*tasks, return_exceptions=True) + logger.info("Lambda functions are warmed up and ready to go!") @staticmethod async def _send_warm_up_request(session, url, model_prefix): - """Helper method to send a pre-flight request to a given model URL.""" try: - async with session.post(url, json={}, timeout=2) as response: - # Log success for monitoring but do not block on the response - logger.info(f"Warmed up {model_prefix} with status code: {response.status}") + json_payload = { + "file_location": "s3://warm-up-placeholder", + "portfolio_id": 0, + "property_id": "", + "created_at": "2020-01-01T00:00:00" + } + async with session.post(url, json=json_payload, timeout=10) as response: + text = await response.text() + if response.status != 200: + logger.warning( + f"[{model_prefix}] Warm-up failed. Status: {response.status} | Body:\n{text}" + ) + else: + logger.info(f"[{model_prefix}] Warmed up with status {response.status}") except aiohttp.ClientError as e: - logger.warning(f"Failed to warm up {model_prefix}: {e}") - - logger.info("Lambda functions are warmed up and ready to go!") + logger.warning(f"[{model_prefix}] Warm-up ClientError: {e}") + except Exception as e: + logger.warning(f"[{model_prefix}] Warm-up Exception: {e}") async def async_paginated_predictions(self, data, bucket, batch_size, model_prefixes=None, extract_ids=True): all_predictions = self.predictions_template() - to_loop_over = range(0, data.shape[0], batch_size) - - async def run_batches(): - for chunk in tqdm(to_loop_over, total=len(to_loop_over)): - - attempts = 0 - success = False - while attempts <= self.max_retries and not success: - try: - predictions_dict = await self.predict_all_async( - df=data.iloc[chunk:chunk + batch_size], - bucket=bucket, - model_prefixes=model_prefixes, - extract_ids=extract_ids - ) - - for key, scored in predictions_dict.items(): - all_predictions[key] = pd.concat([all_predictions[key], scored]) - - success = True - except Exception as e: - attempts += 1 - logger.error( - f"Batch {chunk}-{chunk + batch_size} failed (Attempt {attempts}/{self.max_retries}). " - f"Error: {e}" - ) - - if attempts > self.max_retries: - logger.error( - f"Skipping batch {chunk}-{chunk + batch_size} after {self.max_retries} failed attempts." - ) - - # Check if there is an existing event loop - try: - # If there is an existing event loop, await the coroutine directly - loop = asyncio.get_running_loop() - await run_batches() - except RuntimeError: # No running event loop - # If no event loop is running, use asyncio.run() - asyncio.run(run_batches()) - + for chunk in tqdm(range(0, data.shape[0], batch_size)): + attempts = 0 + while attempts <= self.max_retries: + try: + predictions_dict = await self.predict_all_async( + df=data.iloc[chunk:chunk + batch_size], + bucket=bucket, + model_prefixes=model_prefixes, + extract_ids=extract_ids + ) + for key, scored in predictions_dict.items(): + all_predictions[key] = pd.concat([all_predictions[key], scored]) + break + except Exception as e: + attempts += 1 + logger.error(f"Batch {chunk}-{chunk + batch_size} failed (Attempt {attempts}): {e}") + await asyncio.sleep(2 ** attempts) # exponential backoff + await self.close_aiohttp_session() return all_predictions diff --git a/serverless.yml b/serverless.yml index 96b7eafa..1f029795 100644 --- a/serverless.yml +++ b/serverless.yml @@ -60,7 +60,7 @@ functions: image: uri: ${env:ECR_URI}:${env:GITHUB_SHA} timeout: 900 - memorySize: 2048 + memorySize: 4096 role: EngineLambdaRole events: - sqs: