mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
36 lines
1.2 KiB
Python
36 lines
1.2 KiB
Python
from typing import Any
|
|
|
|
from backend.app.config import get_settings
|
|
from backend.magic_plan.magic_plan_client import MagicPlanClient
|
|
from backend.magic_plan.magic_plan_service import MagicPlanService
|
|
from backend.magic_plan.magic_plan_trigger_request import MagicPlanTriggerRequest
|
|
from datatypes.magicplan.domain.models import Plan
|
|
from backend.utils.subtasks import task_handler
|
|
from utils.logger import setup_logger
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
@task_handler()
|
|
def handler(body: dict[str, Any], context: Any) -> str:
|
|
settings = get_settings()
|
|
payload = MagicPlanTriggerRequest.model_validate(body)
|
|
client = MagicPlanClient(
|
|
customer_id=settings.MAGICPLAN_CUSTOMER_ID,
|
|
api_key=settings.MAGICPLAN_API_KEY,
|
|
)
|
|
plan: Plan = MagicPlanService(client, s3_bucket="retrofit-energy-assessments-dev").run(payload)
|
|
logger.info("Saved MagicPlan plan uid=%s", plan.uid)
|
|
return plan.uid
|
|
|
|
|
|
if __name__ == "__main__":
|
|
event = {
|
|
"Records": [
|
|
{
|
|
"body": '{"address": "2 Laburnum Way Bromley BR2 8BZ", "hubspot_deal_id": "local-test-deal"}',
|
|
"messageId": "local-test",
|
|
}
|
|
]
|
|
}
|
|
handler(event, None)
|