""" This script is the handler for the lambda prediction function, responsible for producting predictions for a model """ import boto3 from botocore.exceptions import NoCredentialsError import json from io import StringIO import os import logging from generate_predictions import generate_predictions from core.MLModels import model_factory from config import settings from core.DataClient import dataclient_factory logger = logging.getLogger() logger.setLevel(logging.INFO) PREDICTIONS_BUCKET = os.getenv("PREDICTIONS_BUCKET", None) def upload_dataframe_to_s3(df, bucket, s3_file_name): """ Upload a pandas DataFrame to an S3 bucket as CSV :param df: DataFrame to upload :param bucket: Bucket to upload to :param s3_file_name: S3 object name :return: True if file was uploaded, else False """ # Initialize the S3 client s3 = boto3.client("s3") csv_buffer = StringIO() # Write the DataFrame to the buffer as CSV df.to_csv(csv_buffer, index=False) try: # Upload the CSV from the buffer to S3 s3.put_object(Bucket=bucket, Key=s3_file_name, Body=csv_buffer.getvalue()) print(f"Successfully uploaded DataFrame to {bucket}/{s3_file_name}") return True except NoCredentialsError: print("Credentials not available") return False def handler(event, context): """ Take in event and trigger the prediction pipeline """ logger.info("received event: " + str(event)) try: body = ( json.loads(event["body"]) if not isinstance(event["body"], dict) else event["body"] ) property_id = body["property_id"] portfolio_id = body["portfolio_id"] created_at = body["created_at"] # TODO: Implement the loading of the model and prediction storage_filepath = f"s3://{PREDICTIONS_BUCKET}/{portfolio_id}/{property_id}/{created_at}.parquet" logger.info(f"--- Initiate MLModel ---") build_model_params = settings.build_model client_params = settings.client feature_process_params = settings.feature_processor generate_predictions_params = settings.generate_predictions model = model_factory(build_model_params["model_type"]) logger.info(f"--- Initiate Input DataClient ---") input_dataclient = dataclient_factory( dataclient_type="aws-s3", dataclient_config=client_params["aws-s3"], ) logger.info(f"--- Initiate Output DataClient ---") output_dataclient = dataclient_factory( dataclient_type="aws-s3", dataclient_config=client_params["aws-s3"], ) generate_predictions( input_dataclient=input_dataclient, output_dataclient=output_dataclient, model=model, target=feature_process_params["feature_processor_config"]["target"], model_filepath=build_model_params["model_save_filepath"], test_data_filepath=body["file_location"], predictions_output_filepath=storage_filepath, predictions_column_name=generate_predictions_params[ "predictions_column_name" ], identifier_column=generate_predictions_params["identifier_column"], ) return { "statusCode": 200, "body": json.dumps( { "message": "Successfully processed input", "storage_filepath": storage_filepath, } ), } except (Exception, KeyError, ValueError) as e: logger.info("Prediction failed") logger.info(e) return { "statusCode": 500, "body": json.dumps({"message": "Prediction failed", "error": str(e)}), }