diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index a20369cc..025b252e 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -25,7 +25,7 @@ from backend.app.plan.utils import ( ) from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, read_parquet_from_s3 -from backend.ml_models.sap_change_model.api import SAPChangeModelAPI +from backend.ml_models.api import ModelApi from backend.Property import Property from etl.epc.DataProcessor import DataProcessor from etl.epc.settings import COLUMNS_TO_MERGE_ON @@ -234,30 +234,19 @@ async def trigger_plan(body: PlanTriggerRequest): recommendations_scoring_data = DataProcessor.clean_efficiency_variables(recommendations_scoring_data) - sap_change_model_api = SAPChangeModelAPI(portfolio_id=body.portfolio_id, timestamp=created_at) - file_location = sap_change_model_api.upload_scoring_data( - df=recommendations_scoring_data, bucket=get_settings().DATA_BUCKET + model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at) + all_predictions = model_api.predict_all( + df=recommendations_scoring_data, + bucket=get_settings().DATA_BUCKET, + predictions_bucket=get_settings().PREDICTIONS_BUCKET ) - response = sap_change_model_api.predict( - file_location="s3://{DATA_BUCKET}/".format(DATA_BUCKET=get_settings().DATA_BUCKET) + file_location, - ) - - # Retrieve the predictions - predictions = pd.DataFrame( - read_parquet_from_s3( - bucket_name=get_settings().PREDICTIONS_BUCKET, - file_key=response["storage_filepath"].split(get_settings().PREDICTIONS_BUCKET + "/")[1] - ) - ) - - predictions["predictions"] = predictions["predictions"].astype(float).round(1) - predictions[['property_id', 'recommendation_id']] = predictions['id'].str.split('+', expand=True) # Insert the predictions into the recommendations and run the optimiser logger.info("Optimising recommendations") for property_id in recommendations.keys(): property = [p for p in input_properties if p.id == property_id][0] + predictions = all_predictions["sap_change_predictions"] property_predictions = predictions[predictions["property_id"] == str(property_id)] for recommendations_by_type in recommendations[property_id]: diff --git a/backend/ml_models/sap_change_model/__init__.py b/backend/ml_models/__init__.py similarity index 100% rename from backend/ml_models/sap_change_model/__init__.py rename to backend/ml_models/__init__.py diff --git a/backend/ml_models/sap_change_model/api.py b/backend/ml_models/api.py similarity index 52% rename from backend/ml_models/sap_change_model/api.py rename to backend/ml_models/api.py index 2eb7d706..6c92df2a 100644 --- a/backend/ml_models/sap_change_model/api.py +++ b/backend/ml_models/api.py @@ -3,11 +3,24 @@ import requests from requests.exceptions import RequestException from utils.logger import setup_logger from utils.s3 import save_dataframe_to_s3_parquet +from backend.app.utils import read_parquet_from_s3 logger = setup_logger() -class SAPChangeModelAPI: +class ModelApi: + MODEL_PREFIXES = [ + "sap_change_predictions", + "heat_demand_predictions", + "carbon_change_predictions" + ] + + MODEL_URLS = { + "sap_change_predictions": "sapmodel", + "heat_demand_predictions": "heatmodel", + "carbon_change_predictions": "carbonmodel" + } + def __init__( self, portfolio_id, @@ -15,6 +28,9 @@ class SAPChangeModelAPI: base_url="https://api.dev.hestia.homes", ): """ + This class handles the communication with the Model APIs. These models include SAP change, heat demain change + and carbon change + property_id (int, optional): : :param portfolio_id: The portfolio ID to be passed in the request payload. Defaults to 4. :param timestamp: The creation timestamp to be passed in the request payload. Defaults to None. @@ -24,7 +40,7 @@ class SAPChangeModelAPI: self.portfolio_id = portfolio_id self.timestamp = timestamp - def upload_scoring_data(self, df: pd.DataFrame, bucket: str) -> str: + def upload_scoring_data(self, df: pd.DataFrame, bucket: str, model_prefix: str) -> str: """ The sap model api needs a scoring data that is sitting in s3 to use as a dataset to score on This method allows the user to upload a table as a parquet file. This method will return the file @@ -32,9 +48,13 @@ class SAPChangeModelAPI: :param df: Pandas dataframe with scoring data to be uploaded to s3 :param bucket: Name of the bucket in s3 to upload to + :param model_prefix: The model prefix to be used in the file location :return: """ + if model_prefix not in self.MODEL_PREFIXES: + raise ValueError(f"Model prefix specified is not in {self.MODEL_PREFIXES}") + # Store parquet file in s3 for scoring file_location = "sap_change_predictions/{portfolio_id}/{timestamp}.parquet".format( portfolio_id=self.portfolio_id, @@ -50,17 +70,18 @@ class SAPChangeModelAPI: return file_location - def predict(self, file_location): + def predict(self, file_location, model_prefix: str): """Makes a POST request to the SAP Change Model API with the provided parameters. Args: file_location (str): The file location to be passed in the request payload. + model_prefix (str): The model prefix to be used in the request URL. Returns: dict: The API response as a dictionary if the request was successful, None otherwise. """ logger.info("Making request to sap change api") - url = f"{self.base_url}/sapmodel/predict" + url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict" payload = { "file_location": file_location, "property_id": "", # This should get removed @@ -81,3 +102,37 @@ class SAPChangeModelAPI: # In case of an error, you might want to return None or raise the exception # depending on how you want to handle errors in your application return None + + def predict_all(self, df, bucket, predictions_bucket) -> dict: + + """ + For each model prefix, this method will upload the scoring data to s3 and then make a request to the + model api to generate predictions. The predictions will be stored in the predictions bucket. + This method will then fetch the stored predictions and format them, returning all of the predictions as + a dictionary of panaas dataframes + :param df: Pandas dataframe with scoring data to be uploaded to s3 + :param bucket: Name of the bucket in s3 to upload to + :param predictions_bucket: Name of the bucket in s3 to store predictions + :return: + """ + + predictions = {} + for model_prefix in self.MODEL_PREFIXES: + logger.info(f"Scoring for model prefix: {model_prefix}") + file_location = self.upload_scoring_data(df, bucket, model_prefix) + response = self.predict(file_location, model_prefix) + + # Retrieve the predictions + predictions_df = pd.DataFrame( + read_parquet_from_s3( + bucket_name=predictions_bucket, + file_key=response["storage_filepath"].split(predictions_bucket + "/")[1] + ) + ) + + predictions_df["predictions"] = predictions_df["predictions"].astype(float).round(1) + predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+', expand=True) + + predictions[model_prefix] = predictions_df + + return predictions