Model/model_data/simulation_system/handlers/predictions_app.py
2023-09-05 18:29:38 +01:00

94 lines
2.5 KiB
Python

import boto3
from botocore.exceptions import NoCredentialsError
import json
from io import StringIO
import os
import logging
from predictions import prediction
logger = logging.getLogger()
logger.setLevel(logging.INFO)
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev")
def upload_dataframe_to_s3(df, bucket, s3_file_name):
"""
Upload a pandas DataFrame to an S3 bucket as CSV
:param df: DataFrame to upload
:param bucket: Bucket to upload to
:param s3_file_name: S3 object name
:return: True if file was uploaded, else False
"""
# Initialize the S3 client
s3 = boto3.client('s3')
csv_buffer = StringIO()
# Write the DataFrame to the buffer as CSV
df.to_csv(csv_buffer, index=False)
try:
# Upload the CSV from the buffer to S3
s3.put_object(Bucket=bucket, Key=s3_file_name, Body=csv_buffer.getvalue())
print(f"Successfully uploaded DataFrame to {bucket}/{s3_file_name}")
return True
except NoCredentialsError:
print("Credentials not available")
return False
def handler(event, context):
"""
Take in event and trigger the prediction pipeline
"""
logger.info("received event: " + str(event))
# Assuming a file in a bucket landing for now?
# Assuming we have a model to use
body = json.loads(event["body"])
data_path = body["file_location"]
property_id = body["property_id"]
portfolio_id = body["portfolio_id"]
created_at = body["created_at"]
try:
# We could fix the model path but for the moment, we just take the best model path based on the registry
outputs = prediction(model_path=None, data_path=data_path)
# Store into s3, with key of {portfolio_id}-{property_id}
storage_filepath = f"{portfolio_id}/{property_id}/{created_at}.csv"
upload_dataframe_to_s3(
df=outputs,
bucket=f"retrofit-sap-predictions-{RUNTIME_ENVIRONMENT}",
s3_file_name=storage_filepath
)
return {
"statusCode": 200,
"body": json.dumps({
"message": "Successfully processed input",
"storage_filepath": storage_filepath
})
}
except (Exception, KeyError, ValueError) as e:
logger.info("Prediction failed")
logger.info(e)
return {
"statusCode": 500,
"body": json.dumps({
"message": "Prediction failed",
"error": str(e)
})
}
if __name__ == "__main__":
handler()