import pandas as pd import requests from requests.exceptions import RequestException from utils.logger import setup_logger from utils.s3 import save_dataframe_to_s3_parquet from backend.app.utils import read_parquet_from_s3 logger = setup_logger() class ModelApi: MODEL_PREFIXES = [ "sap_change_predictions", "heat_demand_predictions", "carbon_change_predictions" ] MODEL_URLS = { "sap_change_predictions": "sapmodel", "heat_demand_predictions": "heatmodel", "carbon_change_predictions": "carbonmodel" } def __init__( self, portfolio_id, timestamp, base_url="https://api.dev.hestia.homes", ): """ This class handles the communication with the Model APIs. These models include SAP change, heat demain change and carbon change property_id (int, optional): : :param portfolio_id: The portfolio ID to be passed in the request payload. Defaults to 4. :param timestamp: The creation timestamp to be passed in the request payload. Defaults to None. :param base_url: """ self.base_url = base_url self.portfolio_id = portfolio_id self.timestamp = timestamp def upload_scoring_data(self, df: pd.DataFrame, bucket: str, model_prefix: str) -> str: """ The sap model api needs a scoring data that is sitting in s3 to use as a dataset to score on This method allows the user to upload a table as a parquet file. This method will return the file location, which can be used as the file location in the predict() method :param df: Pandas dataframe with scoring data to be uploaded to s3 :param bucket: Name of the bucket in s3 to upload to :param model_prefix: The model prefix to be used in the file location :return: """ if model_prefix not in self.MODEL_PREFIXES: raise ValueError(f"Model prefix specified is not in {self.MODEL_PREFIXES}") # Store parquet file in s3 for scoring file_location = f"{model_prefix}/{self.portfolio_id}/{self.timestamp}.parquet" logger.info("Storing scoring data to s3") save_dataframe_to_s3_parquet( df=df, bucket_name=bucket, file_key=file_location ) return file_location def predict(self, file_location, model_prefix: str): """Makes a POST request to the SAP Change Model API with the provided parameters. Args: file_location (str): The file location to be passed in the request payload. model_prefix (str): The model prefix to be used in the request URL. Returns: dict: The API response as a dictionary if the request was successful, None otherwise. """ logger.info(f"Making request to {model_prefix} change api") url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict" payload = { "file_location": file_location, "property_id": "", # This should get removed "portfolio_id": self.portfolio_id, "created_at": self.timestamp } try: response = requests.post(url, json=payload, headers={"Content-Type": "application/json"}, timeout=120) # Check if the response status code is 2xx (success) response.raise_for_status() # Return the JSON response as a Python dictionary return response.json() except RequestException as e: logger.error(f"An error occurred: {e}") # In case of an error, you might want to return None or raise the exception # depending on how you want to handle errors in your application return None def predict_all(self, df, bucket, prediction_buckets) -> dict: """ For each model prefix, this method will upload the scoring data to s3 and then make a request to the model api to generate predictions. The predictions will be stored in the predictions bucket. This method will then fetch the stored predictions and format them, returning all of the predictions as a dictionary of panaas dataframes :param df: Pandas dataframe with scoring data to be uploaded to s3 :param bucket: Name of the bucket in s3 to upload to :param prediction_buckets: Dictionary containing the prediction buckets for each model prefix :return: """ predictions = {} for model_prefix in self.MODEL_PREFIXES: logger.info(f"Scoring for model prefix: {model_prefix}") file_location = self.upload_scoring_data(df, bucket, model_prefix) response = self.predict( "s3://{DATA_BUCKET}/".format(DATA_BUCKET=bucket) + file_location, model_prefix ) predictions_bucket = prediction_buckets[model_prefix] # Retrieve the predictions predictions_df = pd.DataFrame( read_parquet_from_s3( bucket_name=predictions_bucket, file_key=response["storage_filepath"].split(predictions_bucket + "/")[1] ) ) predictions_df["predictions"] = predictions_df["predictions"].astype(float).round(1) predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+', expand=True) predictions[model_prefix] = predictions_df return predictions