ML/deployment/handlers/prediction_app.py
2023-10-10 12:35:34 +01:00

129 lines
4.1 KiB
Python

"""
This script is the handler for the lambda prediction function, responsible
for producting predictions for a model
"""
import boto3
from botocore.exceptions import NoCredentialsError
import json
from io import StringIO
import os
import logging
from generate_predictions import generate_predictions
from core.MLModels import model_factory
from config import settings
from core.DataClient import dataclient_factory
logger = logging.getLogger()
logger.setLevel(logging.INFO)
PREDICTIONS_BUCKET = os.getenv("PREDICTIONS_BUCKET", None)
def upload_dataframe_to_s3(df, bucket, s3_file_name):
"""
Upload a pandas DataFrame to an S3 bucket as CSV
:param df: DataFrame to upload
:param bucket: Bucket to upload to
:param s3_file_name: S3 object name
:return: True if file was uploaded, else False
"""
# Initialize the S3 client
s3 = boto3.client("s3")
csv_buffer = StringIO()
# Write the DataFrame to the buffer as CSV
df.to_csv(csv_buffer, index=False)
try:
# Upload the CSV from the buffer to S3
s3.put_object(Bucket=bucket, Key=s3_file_name, Body=csv_buffer.getvalue())
print(f"Successfully uploaded DataFrame to {bucket}/{s3_file_name}")
return True
except NoCredentialsError:
print("Credentials not available")
return False
def handler(event, context):
"""
Take in event and trigger the prediction pipeline
"""
logger.info("received event: " + str(event))
try:
body = (
json.loads(event["body"])
if not isinstance(event["body"], dict)
else event["body"]
)
property_id = body["property_id"]
portfolio_id = body["portfolio_id"]
created_at = body["created_at"]
# TODO: Implement the loading of the model and prediction
storage_filepath = f"s3://{PREDICTIONS_BUCKET}/{portfolio_id}/{property_id}/{created_at}.parquet"
logger.info("-------------------------")
logger.info(f"--- Initiate MLModel ---")
logger.info("-------------------------")
build_model_params = settings.build_model
client_params = settings.client
feature_process_params = settings.feature_processor
generate_predictions_params = settings.generate_predictions
model = model_factory(build_model_params["model_type"])
logger.info("----------------------------")
logger.info(f"--- Initiate Input DataClient ---")
logger.info("----------------------------")
input_dataclient = dataclient_factory(
dataclient_type="aws-s3",
dataclient_config=client_params["aws-s3"],
)
logger.info("----------------------------")
logger.info(f"--- Initiate Output DataClient ---")
logger.info("----------------------------")
output_dataclient = dataclient_factory(
dataclient_type="aws-s3",
dataclient_config=client_params["aws-s3"],
)
generate_predictions(
input_dataclient=input_dataclient,
output_dataclient=output_dataclient,
model=model,
target=feature_process_params["feature_processor_config"]["target"],
model_filepath=build_model_params["model_save_filepath"],
test_data_filepath=body["file_location"],
predictions_output_filepath=storage_filepath,
predictions_column_name=generate_predictions_params[
"predictions_column_name"
],
identifier_column=generate_predictions_params["identifier_column"],
)
return {
"statusCode": 200,
"body": json.dumps(
{
"message": "Successfully processed input",
"storage_filepath": storage_filepath,
}
),
}
except (Exception, KeyError, ValueError) as e:
logger.info("Prediction failed")
logger.info(e)
return {
"statusCode": 500,
"body": json.dumps({"message": "Prediction failed", "error": str(e)}),
}