Model/backend/magic_plan/magic_plan_service.py
2026-05-08 14:14:42 +00:00

49 lines
1.5 KiB
Python

import gzip
from typing import Optional
from datatypes.magicplan.api.response import (
MagicPlanPlan,
PlanSummary,
PlansListResponse,
)
from datatypes.magicplan.domain.mapper import map_plan
from datatypes.magicplan.domain.models import Plan
from backend.app.db.connection import db_session
from backend.app.db.functions.magic_plan_functions import save_plan
from backend.magic_plan.address_matcher import find_matching_plan
from backend.magic_plan.magic_plan_client import MagicPlanClient
from backend.magic_plan.magic_plan_trigger_request import MagicPlanTriggerRequest
from utils.logger import setup_logger
from utils.s3 import save_data_to_s3
logger = setup_logger()
class MagicPlanService:
def __init__(self, client: MagicPlanClient, s3_bucket: str) -> None:
self._client = client
self._s3_bucket = s3_bucket
def run(self, request: MagicPlanTriggerRequest) -> Plan:
address = request.address
uprn = request.uprn
if uprn is not None:
logger.info("MagicPlanService.run uprn=%s", uprn)
plans_response: PlansListResponse = self._client.get_plans()
matched: Optional[PlanSummary] = find_matching_plan(
plans_response.plans, address
)
if matched is None:
raise ValueError(f"No MagicPlan found for address: {address!r}")
magic_plan: MagicPlanPlan = self._client.get_plan(matched.id)
plan: Plan = map_plan(magic_plan)
with db_session() as session:
save_plan(session, plan)
return plan