mirror of
https://github.com/Hestia-Homes/ML.git
synced 2026-06-08 11:17:25 +00:00
129 lines
4.1 KiB
Python
129 lines
4.1 KiB
Python
"""
|
|
This script is the handler for the lambda prediction function, responsible
|
|
for producting predictions for a model
|
|
"""
|
|
|
|
import boto3
|
|
from botocore.exceptions import NoCredentialsError
|
|
import json
|
|
from io import StringIO
|
|
import os
|
|
import logging
|
|
from generate_predictions import generate_predictions
|
|
from core.MLModels import model_factory
|
|
from config import settings
|
|
from core.DataClient import dataclient_factory
|
|
|
|
logger = logging.getLogger()
|
|
logger.setLevel(logging.INFO)
|
|
|
|
PREDICTIONS_BUCKET = os.getenv("PREDICTIONS_BUCKET", None)
|
|
|
|
|
|
def upload_dataframe_to_s3(df, bucket, s3_file_name):
|
|
"""
|
|
Upload a pandas DataFrame to an S3 bucket as CSV
|
|
|
|
:param df: DataFrame to upload
|
|
:param bucket: Bucket to upload to
|
|
:param s3_file_name: S3 object name
|
|
:return: True if file was uploaded, else False
|
|
"""
|
|
|
|
# Initialize the S3 client
|
|
s3 = boto3.client("s3")
|
|
csv_buffer = StringIO()
|
|
|
|
# Write the DataFrame to the buffer as CSV
|
|
df.to_csv(csv_buffer, index=False)
|
|
|
|
try:
|
|
# Upload the CSV from the buffer to S3
|
|
s3.put_object(Bucket=bucket, Key=s3_file_name, Body=csv_buffer.getvalue())
|
|
print(f"Successfully uploaded DataFrame to {bucket}/{s3_file_name}")
|
|
return True
|
|
except NoCredentialsError:
|
|
print("Credentials not available")
|
|
return False
|
|
|
|
|
|
def handler(event, context):
|
|
"""
|
|
Take in event and trigger the prediction pipeline
|
|
"""
|
|
|
|
logger.info("received event: " + str(event))
|
|
|
|
try:
|
|
body = (
|
|
json.loads(event["body"])
|
|
if not isinstance(event["body"], dict)
|
|
else event["body"]
|
|
)
|
|
|
|
property_id = body["property_id"]
|
|
portfolio_id = body["portfolio_id"]
|
|
created_at = body["created_at"]
|
|
|
|
# TODO: Implement the loading of the model and prediction
|
|
|
|
storage_filepath = f"s3://{PREDICTIONS_BUCKET}/{portfolio_id}/{property_id}/{created_at}.parquet"
|
|
|
|
logger.info("-------------------------")
|
|
logger.info(f"--- Initiate MLModel ---")
|
|
logger.info("-------------------------")
|
|
|
|
build_model_params = settings.build_model
|
|
client_params = settings.client
|
|
feature_process_params = settings.feature_processor
|
|
generate_predictions_params = settings.generate_predictions
|
|
|
|
model = model_factory(build_model_params["model_type"])
|
|
|
|
logger.info("----------------------------")
|
|
logger.info(f"--- Initiate Input DataClient ---")
|
|
logger.info("----------------------------")
|
|
input_dataclient = dataclient_factory(
|
|
dataclient_type="aws-s3",
|
|
dataclient_config=client_params["aws-s3"],
|
|
)
|
|
|
|
logger.info("----------------------------")
|
|
logger.info(f"--- Initiate Output DataClient ---")
|
|
logger.info("----------------------------")
|
|
output_dataclient = dataclient_factory(
|
|
dataclient_type="aws-s3",
|
|
dataclient_config=client_params["aws-s3"],
|
|
)
|
|
|
|
generate_predictions(
|
|
input_dataclient=input_dataclient,
|
|
output_dataclient=output_dataclient,
|
|
model=model,
|
|
target=feature_process_params["feature_processor_config"]["target"],
|
|
model_filepath=build_model_params["model_save_filepath"],
|
|
test_data_filepath=body["file_location"],
|
|
predictions_output_filepath=storage_filepath,
|
|
predictions_column_name=generate_predictions_params[
|
|
"predictions_column_name"
|
|
],
|
|
identifier_column=generate_predictions_params["identifier_column"],
|
|
)
|
|
|
|
return {
|
|
"statusCode": 200,
|
|
"body": json.dumps(
|
|
{
|
|
"message": "Successfully processed input",
|
|
"storage_filepath": storage_filepath,
|
|
}
|
|
),
|
|
}
|
|
|
|
except (Exception, KeyError, ValueError) as e:
|
|
logger.info("Prediction failed")
|
|
logger.info(e)
|
|
return {
|
|
"statusCode": 500,
|
|
"body": json.dumps({"message": "Prediction failed", "error": str(e)}),
|
|
}
|