Generalising api class for prediction in router:

This commit is contained in:
Khalim Conn-Kowlessar 2023-11-27 16:35:56 +00:00
parent abcf65158a
commit 0f3e325f5b
3 changed files with 66 additions and 22 deletions

View file

@ -25,7 +25,7 @@ from backend.app.plan.utils import (
) )
from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, read_parquet_from_s3 from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, read_parquet_from_s3
from backend.ml_models.sap_change_model.api import SAPChangeModelAPI from backend.ml_models.api import ModelApi
from backend.Property import Property from backend.Property import Property
from etl.epc.DataProcessor import DataProcessor from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON from etl.epc.settings import COLUMNS_TO_MERGE_ON
@ -234,30 +234,19 @@ async def trigger_plan(body: PlanTriggerRequest):
recommendations_scoring_data = DataProcessor.clean_efficiency_variables(recommendations_scoring_data) recommendations_scoring_data = DataProcessor.clean_efficiency_variables(recommendations_scoring_data)
sap_change_model_api = SAPChangeModelAPI(portfolio_id=body.portfolio_id, timestamp=created_at) model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at)
file_location = sap_change_model_api.upload_scoring_data( all_predictions = model_api.predict_all(
df=recommendations_scoring_data, bucket=get_settings().DATA_BUCKET df=recommendations_scoring_data,
bucket=get_settings().DATA_BUCKET,
predictions_bucket=get_settings().PREDICTIONS_BUCKET
) )
response = sap_change_model_api.predict(
file_location="s3://{DATA_BUCKET}/".format(DATA_BUCKET=get_settings().DATA_BUCKET) + file_location,
)
# Retrieve the predictions
predictions = pd.DataFrame(
read_parquet_from_s3(
bucket_name=get_settings().PREDICTIONS_BUCKET,
file_key=response["storage_filepath"].split(get_settings().PREDICTIONS_BUCKET + "/")[1]
)
)
predictions["predictions"] = predictions["predictions"].astype(float).round(1)
predictions[['property_id', 'recommendation_id']] = predictions['id'].str.split('+', expand=True)
# Insert the predictions into the recommendations and run the optimiser # Insert the predictions into the recommendations and run the optimiser
logger.info("Optimising recommendations") logger.info("Optimising recommendations")
for property_id in recommendations.keys(): for property_id in recommendations.keys():
property = [p for p in input_properties if p.id == property_id][0] property = [p for p in input_properties if p.id == property_id][0]
predictions = all_predictions["sap_change_predictions"]
property_predictions = predictions[predictions["property_id"] == str(property_id)] property_predictions = predictions[predictions["property_id"] == str(property_id)]
for recommendations_by_type in recommendations[property_id]: for recommendations_by_type in recommendations[property_id]:

View file

@ -3,11 +3,24 @@ import requests
from requests.exceptions import RequestException from requests.exceptions import RequestException
from utils.logger import setup_logger from utils.logger import setup_logger
from utils.s3 import save_dataframe_to_s3_parquet from utils.s3 import save_dataframe_to_s3_parquet
from backend.app.utils import read_parquet_from_s3
logger = setup_logger() logger = setup_logger()
class SAPChangeModelAPI: class ModelApi:
MODEL_PREFIXES = [
"sap_change_predictions",
"heat_demand_predictions",
"carbon_change_predictions"
]
MODEL_URLS = {
"sap_change_predictions": "sapmodel",
"heat_demand_predictions": "heatmodel",
"carbon_change_predictions": "carbonmodel"
}
def __init__( def __init__(
self, self,
portfolio_id, portfolio_id,
@ -15,6 +28,9 @@ class SAPChangeModelAPI:
base_url="https://api.dev.hestia.homes", base_url="https://api.dev.hestia.homes",
): ):
""" """
This class handles the communication with the Model APIs. These models include SAP change, heat demain change
and carbon change
property_id (int, optional): : property_id (int, optional): :
:param portfolio_id: The portfolio ID to be passed in the request payload. Defaults to 4. :param portfolio_id: The portfolio ID to be passed in the request payload. Defaults to 4.
:param timestamp: The creation timestamp to be passed in the request payload. Defaults to None. :param timestamp: The creation timestamp to be passed in the request payload. Defaults to None.
@ -24,7 +40,7 @@ class SAPChangeModelAPI:
self.portfolio_id = portfolio_id self.portfolio_id = portfolio_id
self.timestamp = timestamp self.timestamp = timestamp
def upload_scoring_data(self, df: pd.DataFrame, bucket: str) -> str: def upload_scoring_data(self, df: pd.DataFrame, bucket: str, model_prefix: str) -> str:
""" """
The sap model api needs a scoring data that is sitting in s3 to use as a dataset to score on The sap model api needs a scoring data that is sitting in s3 to use as a dataset to score on
This method allows the user to upload a table as a parquet file. This method will return the file This method allows the user to upload a table as a parquet file. This method will return the file
@ -32,9 +48,13 @@ class SAPChangeModelAPI:
:param df: Pandas dataframe with scoring data to be uploaded to s3 :param df: Pandas dataframe with scoring data to be uploaded to s3
:param bucket: Name of the bucket in s3 to upload to :param bucket: Name of the bucket in s3 to upload to
:param model_prefix: The model prefix to be used in the file location
:return: :return:
""" """
if model_prefix not in self.MODEL_PREFIXES:
raise ValueError(f"Model prefix specified is not in {self.MODEL_PREFIXES}")
# Store parquet file in s3 for scoring # Store parquet file in s3 for scoring
file_location = "sap_change_predictions/{portfolio_id}/{timestamp}.parquet".format( file_location = "sap_change_predictions/{portfolio_id}/{timestamp}.parquet".format(
portfolio_id=self.portfolio_id, portfolio_id=self.portfolio_id,
@ -50,17 +70,18 @@ class SAPChangeModelAPI:
return file_location return file_location
def predict(self, file_location): def predict(self, file_location, model_prefix: str):
"""Makes a POST request to the SAP Change Model API with the provided parameters. """Makes a POST request to the SAP Change Model API with the provided parameters.
Args: Args:
file_location (str): The file location to be passed in the request payload. file_location (str): The file location to be passed in the request payload.
model_prefix (str): The model prefix to be used in the request URL.
Returns: Returns:
dict: The API response as a dictionary if the request was successful, None otherwise. dict: The API response as a dictionary if the request was successful, None otherwise.
""" """
logger.info("Making request to sap change api") logger.info("Making request to sap change api")
url = f"{self.base_url}/sapmodel/predict" url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict"
payload = { payload = {
"file_location": file_location, "file_location": file_location,
"property_id": "", # This should get removed "property_id": "", # This should get removed
@ -81,3 +102,37 @@ class SAPChangeModelAPI:
# In case of an error, you might want to return None or raise the exception # In case of an error, you might want to return None or raise the exception
# depending on how you want to handle errors in your application # depending on how you want to handle errors in your application
return None return None
def predict_all(self, df, bucket, predictions_bucket) -> dict:
"""
For each model prefix, this method will upload the scoring data to s3 and then make a request to the
model api to generate predictions. The predictions will be stored in the predictions bucket.
This method will then fetch the stored predictions and format them, returning all of the predictions as
a dictionary of panaas dataframes
:param df: Pandas dataframe with scoring data to be uploaded to s3
:param bucket: Name of the bucket in s3 to upload to
:param predictions_bucket: Name of the bucket in s3 to store predictions
:return:
"""
predictions = {}
for model_prefix in self.MODEL_PREFIXES:
logger.info(f"Scoring for model prefix: {model_prefix}")
file_location = self.upload_scoring_data(df, bucket, model_prefix)
response = self.predict(file_location, model_prefix)
# Retrieve the predictions
predictions_df = pd.DataFrame(
read_parquet_from_s3(
bucket_name=predictions_bucket,
file_key=response["storage_filepath"].split(predictions_bucket + "/")[1]
)
)
predictions_df["predictions"] = predictions_df["predictions"].astype(float).round(1)
predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+', expand=True)
predictions[model_prefix] = predictions_df
return predictions