import boto3 from botocore.exceptions import NoCredentialsError import json from io import StringIO import os import logging from predictions import prediction logger = logging.getLogger() logger.setLevel(logging.INFO) RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev") def upload_dataframe_to_s3(df, bucket, s3_file_name): """ Upload a pandas DataFrame to an S3 bucket as CSV :param df: DataFrame to upload :param bucket: Bucket to upload to :param s3_file_name: S3 object name :return: True if file was uploaded, else False """ # Initialize the S3 client s3 = boto3.client('s3') csv_buffer = StringIO() # Write the DataFrame to the buffer as CSV df.to_csv(csv_buffer, index=False) try: # Upload the CSV from the buffer to S3 s3.put_object(Bucket=bucket, Key=s3_file_name, Body=csv_buffer.getvalue()) print(f"Successfully uploaded DataFrame to {bucket}/{s3_file_name}") return True except NoCredentialsError: print("Credentials not available") return False def handler(event, context): """ Take in event and trigger the prediction pipeline """ logger.info("received event: " + str(event)) # Assuming a file in a bucket landing for now? # Assuming we have a model to use body = json.loads(event["body"]) data_path = body["file_location"] property_id = body["property_id"] portfolio_id = body["portfolio_id"] created_at = body["created_at"] try: # We could fix the model path but for the moment, we just take the best model path based on the registry outputs = prediction(model_path=None, data_path=data_path) # Store into s3, with key of {portfolio_id}-{property_id} storage_filepath = f"{portfolio_id}/{property_id}/{created_at}.csv" upload_dataframe_to_s3( df=outputs, bucket=f"retrofit-sap-predictions-{RUNTIME_ENVIRONMENT}", s3_file_name=storage_filepath ) return { "statusCode": 200, "body": json.dumps({ "message": "Successfully processed input", "storage_filepath": storage_filepath }) } except (Exception, KeyError, ValueError) as e: logger.info("Prediction failed") logger.info(e) return { "statusCode": 500, "body": json.dumps({ "message": "Prediction failed", "error": str(e) }) } if __name__ == "__main__": handler()