Model/backend/ml_models/api.py
2024-02-15 19:32:33 +00:00

144 lines
5.9 KiB
Python

import pandas as pd
import requests
from requests.exceptions import RequestException
from utils.logger import setup_logger
from utils.s3 import save_dataframe_to_s3_parquet, read_dataframe_from_s3_parquet
logger = setup_logger()
class ModelApi:
MODEL_PREFIXES = [
"sap_change_predictions",
"heat_demand_predictions",
"carbon_change_predictions"
]
MODEL_URLS = {
"sap_change_predictions": "sapmodel",
"heat_demand_predictions": "heatmodel",
"carbon_change_predictions": "carbonmodel"
}
def __init__(
self,
portfolio_id,
timestamp,
base_url="https://api.dev.hestia.homes",
):
"""
This class handles the communication with the Model APIs. These models include SAP change, heat demain change
and carbon change
property_id (int, optional): :
:param portfolio_id: The portfolio ID to be passed in the request payload. Defaults to 4.
:param timestamp: The creation timestamp to be passed in the request payload. Defaults to None.
:param base_url:
"""
self.base_url = base_url
self.portfolio_id = portfolio_id
self.timestamp = timestamp
def upload_scoring_data(self, df: pd.DataFrame, bucket: str, model_prefix: str) -> str:
"""
The sap model api needs a scoring data that is sitting in s3 to use as a dataset to score on
This method allows the user to upload a table as a parquet file. This method will return the file
location, which can be used as the file location in the predict() method
:param df: Pandas dataframe with scoring data to be uploaded to s3
:param bucket: Name of the bucket in s3 to upload to
:param model_prefix: The model prefix to be used in the file location
:return:
"""
if model_prefix not in self.MODEL_PREFIXES:
raise ValueError(f"Model prefix specified is not in {self.MODEL_PREFIXES}")
# Store parquet file in s3 for scoring
file_location = f"{model_prefix}/{self.portfolio_id}/{self.timestamp}.parquet"
logger.info("Storing scoring data to s3")
save_dataframe_to_s3_parquet(
df=df,
bucket_name=bucket,
file_key=file_location
)
return file_location
def predict(self, file_location, model_prefix: str):
"""Makes a POST request to the SAP Change Model API with the provided parameters.
Args:
file_location (str): The file location to be passed in the request payload.
model_prefix (str): The model prefix to be used in the request URL.
Returns:
dict: The API response as a dictionary if the request was successful, None otherwise.
"""
logger.info(f"Making request to {model_prefix} change api")
url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict"
payload = {
"file_location": file_location,
"property_id": "", # This should get removed
"portfolio_id": self.portfolio_id,
"created_at": self.timestamp
}
try:
response = requests.post(url, json=payload, headers={"Content-Type": "application/json"}, timeout=120)
# Check if the response status code is 2xx (success)
response.raise_for_status()
# Return the JSON response as a Python dictionary
return response.json()
except RequestException as e:
logger.error(f"An error occurred: {e}")
# In case of an error, you might want to return None or raise the exception
# depending on how you want to handle errors in your application
return None
def predict_all(self, df, bucket, prediction_buckets) -> dict:
"""
For each model prefix, this method will upload the scoring data to s3 and then make a request to the
model api to generate predictions. The predictions will be stored in the predictions bucket.
This method will then fetch the stored predictions and format them, returning all of the predictions as
a dictionary of panaas dataframes
:param df: Pandas dataframe with scoring data to be uploaded to s3
:param bucket: Name of the bucket in s3 to upload to
:param prediction_buckets: Dictionary containing the prediction buckets for each model prefix
:return:
"""
predictions = {}
for model_prefix in self.MODEL_PREFIXES:
logger.info(f"Scoring for model prefix: {model_prefix}")
file_location = self.upload_scoring_data(df, bucket, model_prefix)
response = self.predict(
"s3://{DATA_BUCKET}/".format(DATA_BUCKET=bucket) + file_location, model_prefix
)
predictions_bucket = prediction_buckets[model_prefix]
# Retrieve the predictions
predictions_df = pd.DataFrame(
read_dataframe_from_s3_parquet(
bucket_name=predictions_bucket,
file_key=response["storage_filepath"].split(predictions_bucket + "/")[1]
)
)
predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1)
predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+', expand=True)
# To grab the phase, we pull the integer after "phase=" in the recommendation_id. We can do this with a
# string split on phase= and then grab the second element of the resulting list. We could also use a
# regular expression to do this but we use the string split method here, for safety.
predictions_df['phase'] = predictions_df['recommendation_id'].str.split('phase=').str[1].str[0]
# Convert back to int
predictions_df['phase'] = predictions_df['phase'].astype(int)
predictions[model_prefix] = predictions_df
return predictions