ML/deployment/handlers/prediction_app.py
2025-11-04 16:52:24 +00:00

170 lines
5 KiB
Python

"""
This script is the handler for the lambda prediction function, responsible
for producting predictions for a model
"""
import boto3
from botocore.exceptions import NoCredentialsError
import json
from io import StringIO
import os
import logging
from generate_predictions import generate_predictions
from core.MLModels import model_factory
from config import settings
from core.DataClient import dataclient_factory
logger = logging.getLogger()
logger.setLevel(logging.INFO)
PREDICTIONS_BUCKET = os.getenv("PREDICTIONS_BUCKET", None)
def upload_dataframe_to_s3(df, bucket, s3_file_name):
"""
Upload a pandas DataFrame to an S3 bucket as CSV
:param df: DataFrame to upload
:param bucket: Bucket to upload to
:param s3_file_name: S3 object name
:return: True if file was uploaded, else False
"""
# Initialize the S3 client
s3 = boto3.client("s3")
csv_buffer = StringIO()
# Write the DataFrame to the buffer as CSV
df.to_csv(csv_buffer, index=False)
try:
# Upload the CSV from the buffer to S3
s3.put_object(Bucket=bucket, Key=s3_file_name, Body=csv_buffer.getvalue())
print(f"Successfully uploaded DataFrame to {bucket}/{s3_file_name}")
return True
except NoCredentialsError:
print("Credentials not available")
return False
def warming_up_invocation(
model,
model_filepath: str,
):
"""
Function to handle warm up invocations
"""
import pandas as pd
import numpy as np
model.load_model(model_filepath)
warmup_df = pd.DataFrame(
np.zeros((1, len(model.model.original_features))),
columns=model.model.original_features,
)
# model_names = model.model.model_names()
# if "NeuralNetFastAI" in model_names:
# model.model.predict(warmup_df, model="NeuralNetFastAI")
# else:
model.predict(data=warmup_df)
def handler(event, context):
"""
Take in event and trigger the prediction pipeline
"""
logger.info("received event: " + str(event))
try:
body = (
json.loads(event["body"])
if not isinstance(event["body"], dict)
else event["body"]
)
property_id = body["property_id"]
portfolio_id = body["portfolio_id"]
created_at = body["created_at"]
# TODO: Implement the loading of the model and prediction
logger.info(f"--- Initiate MLModel ---")
build_model_params = settings.build_model
client_params = settings.client
feature_process_params = settings.feature_processor
generate_predictions_params = settings.generate_predictions
model = model_factory(build_model_params["model_type"])
model_filepath = build_model_params["model_save_filepath"]
if "warm" in body:
logger.info("Warm up invocation - synthetic prediction")
warming_up_invocation(model=model, model_filepath=model_filepath)
return {
"statusCode": 200,
"body": json.dumps(
{
"message": "Successfully warmed up invocation",
}
),
}
if "testing" in body:
logger.info(
"Testing invocation for CI/CD - save file to same location in S3"
)
storage_filepath = body["file_location"].replace(
".parquet", "_output.parquet"
)
else:
storage_filepath = f"s3://{PREDICTIONS_BUCKET}/{portfolio_id}/{property_id}/{created_at}.parquet"
logger.info(f"--- Initiate Input DataClient ---")
input_dataclient = dataclient_factory(
dataclient_type="aws-s3",
dataclient_config=client_params["aws-s3"],
)
logger.info(f"--- Initiate Output DataClient ---")
output_dataclient = dataclient_factory(
dataclient_type="aws-s3",
dataclient_config=client_params["aws-s3"],
)
generate_predictions(
input_dataclient=input_dataclient,
output_dataclient=output_dataclient,
model=model,
target=feature_process_params["feature_processor_config"]["target"],
model_filepath=model_filepath,
test_data_filepath=body["file_location"],
predictions_output_filepath=storage_filepath,
predictions_column_name=generate_predictions_params[
"predictions_column_name"
],
identifier_column=generate_predictions_params["identifier_column"],
)
return {
"statusCode": 200,
"body": json.dumps(
{
"message": "Successfully processed input",
"storage_filepath": storage_filepath,
}
),
}
except (Exception, KeyError, ValueError) as e:
logger.info("Prediction failed")
logger.info(e)
return {
"statusCode": 500,
"body": json.dumps({"message": "Prediction failed", "error": str(e)}),
}