Model/backend/ml_models/api.py
Khalim Conn-Kowlessar 5e5609a7ca making model api async
2024-10-22 11:08:48 +01:00

312 lines
14 KiB
Python

import aiohttp
import asyncio
import pandas as pd
from tqdm import tqdm
import requests
from requests.exceptions import RequestException
from utils.logger import setup_logger
from utils.s3 import save_dataframe_to_s3_parquet, read_dataframe_from_s3_parquet
logger = setup_logger()
class ModelApi:
MODEL_PREFIXES = [
"sap_change_predictions",
"heat_demand_predictions",
"carbon_change_predictions",
# "lighting_cost_predictions",
# "heating_cost_predictions",
# "hot_water_cost_predictions",
]
KWH_MODEL_PREFIXES = ["heating_kwh_predictions", "hotwater_kwh_predictions"]
MODEL_URLS = {
"sap_change_predictions": "sapmodel",
"heat_demand_predictions": "heatmodel",
"carbon_change_predictions": "carbonmodel",
"hotwater_kwh_predictions": "hotwaterkwhmodel",
"heating_kwh_predictions": "heatingkwhmodel",
# "lighting_cost_predictions": "lightingmodel",
# "heating_cost_predictions": "heatingmodel",
# "hot_water_cost_predictions": "hotwatermodel",
}
def __init__(
self,
portfolio_id,
timestamp,
prediction_buckets,
base_url="https://api.dev.hestia.homes",
):
"""
This class handles the communication with the Model APIs. These models include SAP change, heat demain change
and carbon change
property_id (int, optional): :
:param portfolio_id: The portfolio ID to be passed in the request payload. Defaults to 4.
:param timestamp: The creation timestamp to be passed in the request payload. Defaults to None.
:param base_url:
"""
self.base_url = base_url
self.portfolio_id = portfolio_id
self.timestamp = timestamp
self.prediction_buckets = prediction_buckets
@staticmethod
def predictions_template():
return {
"sap_change_predictions": pd.DataFrame(),
"heat_demand_predictions": pd.DataFrame(),
"carbon_change_predictions": pd.DataFrame(),
"hotwater_kwh_predictions": pd.DataFrame(),
"heating_kwh_predictions": pd.DataFrame(),
}
def upload_scoring_data(self, df: pd.DataFrame, bucket: str, model_prefix: str) -> str:
"""
The sap model api needs a scoring data that is sitting in s3 to use as a dataset to score on
This method allows the user to upload a table as a parquet file. This method will return the file
location, which can be used as the file location in the predict() method
:param df: Pandas dataframe with scoring data to be uploaded to s3
:param bucket: Name of the bucket in s3 to upload to
:param model_prefix: The model prefix to be used in the file location
:return:
"""
# if model_prefix not in self.MODEL_PREFIXES:
# raise ValueError(f"Model prefix specified is not in {self.MODEL_PREFIXES}")
# Store parquet file in s3 for scoring
file_location = f"{model_prefix}/{self.portfolio_id}/{self.timestamp}.parquet"
logger.info("Storing scoring data to s3")
save_dataframe_to_s3_parquet(
df=df,
bucket_name=bucket,
file_key=file_location
)
return file_location
def predict(self, file_location, model_prefix: str):
"""Makes a POST request to the SAP Change Model API with the provided parameters.
Args:
file_location (str): The file location to be passed in the request payload.
model_prefix (str): The model prefix to be used in the request URL.
Returns:
dict: The API response as a dictionary if the request was successful, None otherwise.
"""
logger.info(f"Making request to {model_prefix} change api")
url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict"
payload = {
"file_location": file_location,
"property_id": "", # This should get removed
"portfolio_id": self.portfolio_id,
"created_at": self.timestamp
}
try:
response = requests.post(url, json=payload, headers={"Content-Type": "application/json"}, timeout=120)
# Check if the response status code is 2xx (success)
response.raise_for_status()
# Return the JSON response as a Python dictionary
return response.json()
except RequestException as e:
logger.error(f"An error occurred: {e}")
# In case of an error, you might want to return None or raise the exception
# depending on how you want to handle errors in your application
return None
async def predict_async(self, file_location, model_prefix: str):
"""Makes an asynchronous POST request to the Model API with the provided parameters."""
logger.info(f"Making request to {model_prefix} change api")
url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict"
payload = {
"file_location": file_location,
"property_id": "", # This should get removed
"portfolio_id": self.portfolio_id,
"created_at": self.timestamp
}
async with aiohttp.ClientSession() as session:
try:
async with session.post(
url, json=payload, headers={"Content-Type": "application/json"}, timeout=120
) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
logger.error(f"An error occurred: {e}")
return None
@staticmethod
def extract_phase(recommendation_id):
if 'phase=' in recommendation_id:
return int(recommendation_id.split('phase=')[1][0])
else:
return None
def predict_all(self, df, bucket, model_prefixes=None, extract_ids=True) -> dict:
"""
For each model prefix, this method will upload the scoring data to s3 and then make a request to the
model api to generate predictions. The predictions will be stored in the predictions bucket.
This method will then fetch the stored predictions and format them, returning all of the predictions as
a dictionary of panaas dataframes
:param df: Pandas dataframe with scoring data to be uploaded to s3
:param bucket: Name of the bucket in s3 to upload to
:param model_prefixes: List of model prefixes to generate predictions for. If None, all model prefixes will be
used
:param extract_ids: Boolean to determine if the property_id and recommendation_id should be extracted from the
id column
:return:
"""
model_prefixes = self.MODEL_PREFIXES if model_prefixes is None else model_prefixes
predictions = {}
for model_prefix in model_prefixes:
logger.info(f"Scoring for model prefix: {model_prefix}")
file_location = self.upload_scoring_data(df, bucket, model_prefix)
response = self.predict(
"s3://{DATA_BUCKET}/".format(DATA_BUCKET=bucket) + file_location, model_prefix
)
predictions_bucket = self.prediction_buckets[model_prefix]
# Retrieve the predictions
predictions_df = pd.DataFrame(
read_dataframe_from_s3_parquet(
bucket_name=predictions_bucket,
file_key=response["storage_filepath"].split(predictions_bucket + "/")[1]
)
)
predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1)
if extract_ids:
predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+', expand=True)
# To grab the phase, we pull the integer after "phase=" in the recommendation_id. We can do this with a
# string split on phase= and then grab the second element of the resulting list. We could also use a
# regular expression to do this but we use the string split method here, for safety.
# We may not always have a phase to split on, so we need to handle this case. We can do this by using
# the str[1] method to grab the second element of the resulting list. We then grab the first
# character of this
# string to get the phase. We then convert this to an integer.
# Convert back to int
predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase)
predictions[model_prefix] = predictions_df
return predictions
async def predict_all_async(self, df, bucket, model_prefixes=None, extract_ids=True) -> dict:
"""Uploads data and makes asynchronous requests to the model APIs for predictions."""
model_prefixes = self.MODEL_PREFIXES if model_prefixes is None else model_prefixes
predictions = {}
tasks = []
async with aiohttp.ClientSession() as session:
for model_prefix in model_prefixes:
logger.info(f"Scoring for model prefix: {model_prefix}")
file_location = self.upload_scoring_data(df, bucket, model_prefix)
# Schedule the prediction request as a coroutine
tasks.append(
self.predict_async(f"s3://{bucket}/" + file_location, model_prefix)
)
# Gather all asynchronous tasks (execute them concurrently)
responses = await asyncio.gather(*tasks, return_exceptions=True)
for model_prefix, response in zip(model_prefixes, responses):
if response:
predictions_bucket = self.prediction_buckets[model_prefix]
predictions_df = pd.DataFrame(
read_dataframe_from_s3_parquet(
bucket_name=predictions_bucket,
file_key=response["storage_filepath"].split(predictions_bucket + "/")[1]
)
)
predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1)
if extract_ids:
predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+',
expand=True)
predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase)
predictions[model_prefix] = predictions_df
return predictions
def paginated_predictions(self, data, bucket, batch_size, model_prefixes=None, extract_ids=True):
all_predictions = self.predictions_template()
to_loop_over = range(0, data.shape[0], batch_size)
for chunk in tqdm(to_loop_over, total=len(to_loop_over)):
predictions_dict = self.predict_all(
df=data.iloc[chunk:chunk + batch_size],
bucket=bucket,
model_prefixes=model_prefixes,
extract_ids=extract_ids
)
# Append the predictions to the predictions dictionary
for key, scored in predictions_dict.items():
all_predictions[key] = pd.concat([all_predictions[key], scored])
return all_predictions
async def async_warm_up_lambdas(self, model_prefies=None):
"""Send asynchronous pre-flight requests to each model endpoint to wake up the cold Lambdas without waiting
for responses."""
logger.info("Asynchronously warming up Lambda functions...")
model_prefixes = self.MODEL_PREFIXES if model_prefies is None else model_prefies
tasks = []
async with aiohttp.ClientSession() as session:
for model_prefix in model_prefixes:
url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict"
# Create a coroutine for each warm-up request and add it to the tasks list
tasks.append(self._send_warm_up_request(session, url, model_prefix))
# Run all tasks concurrently but don't wait for the responses to finish
await asyncio.gather(*tasks, return_exceptions=True)
@staticmethod
async def _send_warm_up_request(session, url, model_prefix):
"""Helper method to send a pre-flight request to a given model URL."""
try:
async with session.post(url, json={}, timeout=2) as response:
# Log success for monitoring but do not block on the response
logger.info(f"Warmed up {model_prefix} with status code: {response.status}")
except aiohttp.ClientError as e:
logger.warning(f"Failed to warm up {model_prefix}: {e}")
logger.info("Lambda functions are warmed up and ready to go!")
def async_paginated_predictions(self, data, bucket, batch_size, model_prefixes=None, extract_ids=True):
all_predictions = self.predictions_template()
to_loop_over = range(0, data.shape[0], batch_size)
async def run_batches():
for chunk in tqdm(to_loop_over, total=len(to_loop_over)):
predictions_dict = await self.predict_all_async(
df=data.iloc[chunk:chunk + batch_size],
bucket=bucket,
model_prefixes=model_prefixes,
extract_ids=extract_ids
)
for key, scored in predictions_dict.items():
all_predictions[key] = pd.concat([all_predictions[key], scored])
# Run the async function
asyncio.run(run_batches())
return all_predictions