import aiohttp import asyncio import pandas as pd from typing import List, Dict from tqdm import tqdm import requests from requests.exceptions import RequestException from utils.logger import setup_logger from utils.s3 import save_dataframe_to_s3_parquet, read_dataframe_from_s3_parquet logger = setup_logger() class ModelApi: _aiohttp_session: aiohttp.ClientSession = None MODEL_PREFIXES = [ "sap_change_predictions", "heat_demand_predictions", "carbon_change_predictions", ] KWH_MODEL_PREFIXES = ["heating_kwh_predictions", "hotwater_kwh_predictions"] BASELINE_MODEL_PREFIXES = [ "retrofit_sap_baseline_predictions", "retrofit_heat_baseline_predictions", "retrofit_carbon_baseline_predictions", ] MODEL_URLS: Dict[str, str] = { "sap_change_predictions": "sapmodel", "heat_demand_predictions": "heatmodel", "carbon_change_predictions": "carbonmodel", "hotwater_kwh_predictions": "hotwaterkwhmodel", "heating_kwh_predictions": "heatingkwhmodel", # Baseline prediction models "retrofit_sap_baseline_predictions": "sapbaselinemodel", "retrofit_heat_baseline_predictions": "heatbaselinemodel", "retrofit_carbon_baseline_predictions": "carbonbaselinemodel", } def __init__( self, portfolio_id, timestamp, prediction_buckets, base_url="https://api.dev.hestia.homes", max_retries=2, ): self.base_url = base_url self.portfolio_id = portfolio_id self.timestamp = timestamp self.prediction_buckets = prediction_buckets self.max_retries = max_retries self.semaphore = asyncio.Semaphore(3) @staticmethod def get_aiohttp_session(): if ModelApi._aiohttp_session is None or ModelApi._aiohttp_session.closed: connector = aiohttp.TCPConnector(limit=10) ModelApi._aiohttp_session = aiohttp.ClientSession(connector=connector) return ModelApi._aiohttp_session @staticmethod async def close_aiohttp_session(): if ModelApi._aiohttp_session and not ModelApi._aiohttp_session.closed: await ModelApi._aiohttp_session.close() ModelApi._aiohttp_session = None @staticmethod def predictions_template(): return {key: pd.DataFrame() for key in ModelApi.MODEL_URLS.keys()} def upload_scoring_data(self, df: pd.DataFrame, bucket: str, model_prefix: str) -> str: file_location = f"{model_prefix}/{self.portfolio_id}/{self.timestamp}.parquet" logger.info(f"Storing scoring data to s3: {file_location}") save_dataframe_to_s3_parquet(df=df, bucket_name=bucket, file_key=file_location) return file_location def predict(self, file_location, model_prefix: str): """Makes a POST request to the SAP Change Model API with the provided parameters. Args: file_location (str): The file location to be passed in the request payload. model_prefix (str): The model prefix to be used in the request URL. Returns: dict: The API response as a dictionary if the request was successful, None otherwise. """ logger.info(f"Making request to {model_prefix} change api") url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict" payload = { "file_location": file_location, "property_id": "", # This should get removed "portfolio_id": self.portfolio_id, "created_at": self.timestamp } try: response = requests.post(url, json=payload, headers={"Content-Type": "application/json"}, timeout=120) # Check if the response status code is 2xx (success) response.raise_for_status() # Return the JSON response as a Python dictionary return response.json() except RequestException as e: logger.error(f"An error occurred: {e}") # In case of an error, you might want to return None or raise the exception # depending on how you want to handle errors in your application return None async def predict_async(self, file_location, model_prefix: str, session: aiohttp.ClientSession): url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict" payload = { "file_location": file_location, "property_id": "", "portfolio_id": self.portfolio_id, "created_at": self.timestamp } headers = { "Content-Type": "application/json", "User-Agent": "model-engine-lambda/1.0" } async with self.semaphore: # await asyncio.sleep(random.uniform(0.3, 1.2)) try: async with session.post(url, json=payload, headers=headers, timeout=120) as response: if response.status != 200: text = await response.text() logger.error( f"\n--- API ERROR [{model_prefix}] ---\n" f"Status Code: {response.status}\n" f"URL: {url}\n" f"Payload: {payload}\n" f"Response Text:\n{text}\n" f"----------------------------" ) return None return await response.json() except aiohttp.ClientError as e: logger.error(f"[{model_prefix}] HTTP error: {e}") return None except Exception as e: logger.exception(f"[{model_prefix}] Unexpected error: {e}") return None @staticmethod def extract_phase(recommendation_id): if 'phase=' in recommendation_id: extracted = recommendation_id.split('phase=')[1] return int(extracted.strip()) else: return None def predict_all( self, df: pd.DataFrame, bucket: str, model_prefixes: List[str] | None = None, extract_ids: bool = True, extract_uprn: bool = False ) -> dict: """ For each model prefix, this method will upload the scoring data to s3 and then make a request to the model api to generate predictions. The predictions will be stored in the predictions bucket. This method will then fetch the stored predictions and format them, returning all of the predictions as a dictionary of panaas dataframes :param df: Pandas dataframe with scoring data to be uploaded to s3 :param bucket: Name of the bucket in s3 to upload to :param model_prefixes: List of model prefixes to generate predictions for. If None, all model prefixes will be used :param extract_ids: Boolean to determine if the property_id and recommendation_id should be extracted from the scoring data :param extract_uprn: Boolean to determine if the uprn should be extracted from the scoring data id column :return: """ model_prefixes = self.MODEL_PREFIXES if model_prefixes is None else model_prefixes predictions = {} for model_prefix in model_prefixes: logger.info(f"Scoring for model prefix: {model_prefix}") file_location = self.upload_scoring_data(df, bucket, model_prefix) response = self.predict( "s3://{DATA_BUCKET}/".format(DATA_BUCKET=bucket) + file_location, model_prefix ) predictions_bucket = self.prediction_buckets[model_prefix] # Retrieve the predictions predictions_df = pd.DataFrame( read_dataframe_from_s3_parquet( bucket_name=predictions_bucket, file_key=response["storage_filepath"].split(predictions_bucket + "/")[1] ) ) predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1) if extract_ids: predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+', expand=True) # To grab the phase, we pull the integer after "phase=" in the recommendation_id. We can do this with a # string split on phase= and then grab the second element of the resulting list. We could also use a # regular expression to do this but we use the string split method here, for safety. # We may not always have a phase to split on, so we need to handle this case. We can do this by using # the str[1] method to grab the second element of the resulting list. We then grab the first # character of this # string to get the phase. We then convert this to an integer. # Convert back to int predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase) if extract_uprn and "uprn" in df.columns: predictions_df["uprn"] = df["uprn"].values predictions[model_prefix] = predictions_df return predictions async def predict_all_async(self, df, bucket, model_prefixes=None, extract_ids=True) -> dict: model_prefixes = self.MODEL_PREFIXES if model_prefixes is None else model_prefixes predictions = {} session = self.get_aiohttp_session() async def run_model(model_prefix): logger.info(f"Scoring for model prefix: {model_prefix}") file_location = self.upload_scoring_data(df, bucket, model_prefix) response = await self.predict_async(f"s3://{bucket}/" + file_location, model_prefix, session=session) return model_prefix, response # Run all model calls concurrently results = await asyncio.gather( *(run_model(mp) for mp in model_prefixes), return_exceptions=True ) for model_prefix, response in results: if response and not isinstance(response, Exception): predictions_bucket = self.prediction_buckets[model_prefix] predictions_df = pd.DataFrame( read_dataframe_from_s3_parquet( bucket_name=predictions_bucket, file_key=response["storage_filepath"].split(predictions_bucket + "/")[1] ) ) predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1) if extract_ids: predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+', expand=True) predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase) predictions[model_prefix] = predictions_df return predictions def paginated_predictions(self, data, bucket, batch_size, model_prefixes=None, extract_ids=True): all_predictions = self.predictions_template() to_loop_over = range(0, data.shape[0], batch_size) for chunk in tqdm(to_loop_over, total=len(to_loop_over)): predictions_dict = self.predict_all( df=data.iloc[chunk:chunk + batch_size], bucket=bucket, model_prefixes=model_prefixes, extract_ids=extract_ids ) # Append the predictions to the predictions dictionary for key, scored in predictions_dict.items(): all_predictions[key] = pd.concat([all_predictions[key], scored]) return all_predictions async def async_warm_up_lambdas(self, model_prefies=None): logger.info("Asynchronously warming up Lambda functions...") model_prefixes = self.MODEL_PREFIXES if model_prefies is None else model_prefies session = self.get_aiohttp_session() tasks = [ self._send_warm_up_request( session, f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict", model_prefix ) for model_prefix in model_prefixes ] await asyncio.gather(*tasks, return_exceptions=True) logger.info("Lambda functions are warmed up and ready to go!") @staticmethod async def _send_warm_up_request(session, url, model_prefix): try: json_payload = { "file_location": "s3://warm-up-placeholder", "portfolio_id": 0, "property_id": "", "created_at": "2020-01-01T00:00:00", "warm": True # The presence of this key will send the api down a specific warm up route, to call # prediction and load the font manager, because that is a key bottleneck for cold starts } async with session.post(url, json=json_payload, timeout=10) as response: text = await response.text() if response.status != 200: logger.warning( f"[{model_prefix}] Warm-up failed. Status: {response.status} | Body:\n{text}" ) else: logger.info(f"[{model_prefix}] Warmed up with status {response.status}") except aiohttp.ClientError as e: logger.warning(f"[{model_prefix}] Warm-up ClientError: {e}") except Exception as e: logger.warning(f"[{model_prefix}] Warm-up Exception: {e}") async def async_paginated_predictions(self, data, bucket, batch_size, model_prefixes=None, extract_ids=True): all_predictions = self.predictions_template() for chunk in tqdm(range(0, data.shape[0], batch_size)): attempts = 0 while attempts <= self.max_retries: try: predictions_dict = await self.predict_all_async( df=data.iloc[chunk:chunk + batch_size], bucket=bucket, model_prefixes=model_prefixes, extract_ids=extract_ids ) for key, scored in predictions_dict.items(): all_predictions[key] = pd.concat([all_predictions[key], scored]) break except Exception as e: attempts += 1 logger.error(f"Batch {chunk}-{chunk + batch_size} failed (Attempt {attempts}): {e}") await asyncio.sleep(2 ** attempts) # exponential backoff await self.close_aiohttp_session() # Ensure stable output structure for the datagrame to be utilised by other functions downstream for k in all_predictions.keys(): if all_predictions[k].empty: col_template = ['id', 'predictions', 'property_id', 'recommendation_id', 'phase'] if ( extract_ids) else ['id', 'predictions'] all_predictions[k] = pd.DataFrame( columns=col_template ) return all_predictions @property def models_for_warm_up(self): return self.KWH_MODEL_PREFIXES + self.MODEL_PREFIXES + self.BASELINE_MODEL_PREFIXES