import pandas as pd from tqdm import tqdm import requests from requests.exceptions import RequestException from utils.logger import setup_logger from utils.s3 import save_dataframe_to_s3_parquet, read_dataframe_from_s3_parquet logger = setup_logger() class ModelApi: MODEL_PREFIXES = [ "sap_change_predictions", "heat_demand_predictions", "carbon_change_predictions", # "lighting_cost_predictions", # "heating_cost_predictions", # "hot_water_cost_predictions", ] MODEL_URLS = { "sap_change_predictions": "sapmodel", "heat_demand_predictions": "heatmodel", "carbon_change_predictions": "carbonmodel", "hotwater_kwh_predictions": "hotwaterkwhmodel", "heating_kwh_predictions": "heatingkwhmodel", # "lighting_cost_predictions": "lightingmodel", # "heating_cost_predictions": "heatingmodel", # "hot_water_cost_predictions": "hotwatermodel", } def __init__( self, portfolio_id, timestamp, prediction_buckets, base_url="https://api.dev.hestia.homes", ): """ This class handles the communication with the Model APIs. These models include SAP change, heat demain change and carbon change property_id (int, optional): : :param portfolio_id: The portfolio ID to be passed in the request payload. Defaults to 4. :param timestamp: The creation timestamp to be passed in the request payload. Defaults to None. :param base_url: """ self.base_url = base_url self.portfolio_id = portfolio_id self.timestamp = timestamp self.prediction_buckets = prediction_buckets @staticmethod def predictions_template(): return { "sap_change_predictions": pd.DataFrame(), "heat_demand_predictions": pd.DataFrame(), "carbon_change_predictions": pd.DataFrame(), "hotwater_kwh_predictions": pd.DataFrame(), "heating_kwh_predictions": pd.DataFrame(), } def upload_scoring_data(self, df: pd.DataFrame, bucket: str, model_prefix: str) -> str: """ The sap model api needs a scoring data that is sitting in s3 to use as a dataset to score on This method allows the user to upload a table as a parquet file. This method will return the file location, which can be used as the file location in the predict() method :param df: Pandas dataframe with scoring data to be uploaded to s3 :param bucket: Name of the bucket in s3 to upload to :param model_prefix: The model prefix to be used in the file location :return: """ # if model_prefix not in self.MODEL_PREFIXES: # raise ValueError(f"Model prefix specified is not in {self.MODEL_PREFIXES}") # Store parquet file in s3 for scoring file_location = f"{model_prefix}/{self.portfolio_id}/{self.timestamp}.parquet" logger.info("Storing scoring data to s3") save_dataframe_to_s3_parquet( df=df, bucket_name=bucket, file_key=file_location ) return file_location def predict(self, file_location, model_prefix: str): """Makes a POST request to the SAP Change Model API with the provided parameters. Args: file_location (str): The file location to be passed in the request payload. model_prefix (str): The model prefix to be used in the request URL. Returns: dict: The API response as a dictionary if the request was successful, None otherwise. """ logger.info(f"Making request to {model_prefix} change api") url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict" payload = { "file_location": file_location, "property_id": "", # This should get removed "portfolio_id": self.portfolio_id, "created_at": self.timestamp } try: response = requests.post(url, json=payload, headers={"Content-Type": "application/json"}, timeout=120) # Check if the response status code is 2xx (success) response.raise_for_status() # Return the JSON response as a Python dictionary return response.json() except RequestException as e: logger.error(f"An error occurred: {e}") # In case of an error, you might want to return None or raise the exception # depending on how you want to handle errors in your application return None @staticmethod def extract_phase(recommendation_id): if 'phase=' in recommendation_id: return int(recommendation_id.split('phase=')[1][0]) else: return None def predict_all(self, df, bucket, model_prefixes=None, extract_ids=True) -> dict: """ For each model prefix, this method will upload the scoring data to s3 and then make a request to the model api to generate predictions. The predictions will be stored in the predictions bucket. This method will then fetch the stored predictions and format them, returning all of the predictions as a dictionary of panaas dataframes :param df: Pandas dataframe with scoring data to be uploaded to s3 :param bucket: Name of the bucket in s3 to upload to :param model_prefixes: List of model prefixes to generate predictions for. If None, all model prefixes will be used :param extract_ids: Boolean to determine if the property_id and recommendation_id should be extracted from the id column :return: """ model_prefixes = self.MODEL_PREFIXES if model_prefixes is None else model_prefixes predictions = {} for model_prefix in model_prefixes: logger.info(f"Scoring for model prefix: {model_prefix}") file_location = self.upload_scoring_data(df, bucket, model_prefix) response = self.predict( "s3://{DATA_BUCKET}/".format(DATA_BUCKET=bucket) + file_location, model_prefix ) predictions_bucket = self.prediction_buckets[model_prefix] # Retrieve the predictions predictions_df = pd.DataFrame( read_dataframe_from_s3_parquet( bucket_name=predictions_bucket, file_key=response["storage_filepath"].split(predictions_bucket + "/")[1] ) ) predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1) if extract_ids: predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+', expand=True) # To grab the phase, we pull the integer after "phase=" in the recommendation_id. We can do this with a # string split on phase= and then grab the second element of the resulting list. We could also use a # regular expression to do this but we use the string split method here, for safety. # We may not always have a phase to split on, so we need to handle this case. We can do this by using # the str[1] method to grab the second element of the resulting list. We then grab the first # character of this # string to get the phase. We then convert this to an integer. # Convert back to int predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase) predictions[model_prefix] = predictions_df return predictions def paginated_predictions(self, data, bucket, batch_size, model_prefixes=None, extract_ids=True): all_predictions = self.predictions_template() to_loop_over = range(0, data.shape[0], batch_size) for chunk in tqdm(to_loop_over, total=len(to_loop_over)): predictions_dict = self.predict_all( df=data.iloc[chunk:chunk + batch_size], bucket=bucket, model_prefixes=model_prefixes, extract_ids=extract_ids ) # Append the predictions to the predictions dictionary for key, scored in predictions_dict.items(): all_predictions[key] = pd.concat([all_predictions[key], scored]) return all_predictions