Model/backend/onboarders/handler.py
2026-02-05 14:07:43 +00:00

48 lines
1.8 KiB
Python

import json
from pydantic import BaseModel, Field
from typing import Optional, Literal
from onboarders.factory import OnboarderFactory
from utils.logger import setup_logger
logger = setup_logger()
class OnboardingEvent(BaseModel):
s3_uri: str = Field(..., description="S3 URI of the raw ARA input file")
system: Literal["parity", "generic"] = Field(..., description="Onboarding system identifier")
format: Literal["csv", "xlsx"]
sheet_name: Optional[str] = None
def handler(event, context):
"""
Lambda handler that triggers the model engine for each SQS message.
"""
for record in event.get("Records", []):
try:
event_body = json.loads(record["body"])
# Sample input data
# event_body = {
# "s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/peabody/2025_11_11 - Peabody - Data Extracts for "
# "Domna.xlsx",
# "system": "parity",
# "format": "xlsx",
# "sheet_name": "Sustainability"
# }
logger.info("Processing record with body: %s", event_body)
validated_event = OnboardingEvent(**event_body)
Onboarder = OnboarderFactory.create_onboarder(validated_event.system)
onboarder = Onboarder(
fileuri=validated_event.s3_uri,
format=validated_event.format,
sheet_name=validated_event.sheet_name,
file_format=validated_event.format
)
logger.info("Transforming data")
onboarder.transform()
logger.info(f"Writing data to {onboarder.output_file_name}, bucket: {onboarder.bucket_name}")
onboarder.write()
except Exception as e:
logger.error(f"Failed to process record: {e}")