from typing import Any, Optional, cast from sqlalchemy import delete, select from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlmodel import Session, col from datatypes.magicplan.domain.models import Door, Floor, Plan, Room, Window from backend.app.db.models.magic_plan import ( MagicPlanDoorModel, MagicPlanFloorModel, MagicPlanPlanModel, MagicPlanRoomModel, MagicPlanWindowModel, ) def get_plan_by_uploaded_file_id( session: Session, uploaded_file_id: int ) -> Optional[Plan]: plan_row = session.execute( select(MagicPlanPlanModel).where( col(MagicPlanPlanModel.uploaded_file_id) == uploaded_file_id ) ).scalar_one_or_none() if plan_row is None: return None floor_rows = list( session.execute( select(MagicPlanFloorModel) .where(col(MagicPlanFloorModel.magic_plan_plan_id) == plan_row.id) .order_by(col(MagicPlanFloorModel.level)) ).scalars() ) return Plan( uid=plan_row.magic_plan_uid if plan_row.magic_plan_uid is not None else "", name=plan_row.name, address=plan_row.address, postcode=plan_row.postcode, floors=[_to_floor(session, f) for f in floor_rows], ) def _to_floor(session: Session, row: MagicPlanFloorModel) -> Floor: room_rows = list( session.execute( select(MagicPlanRoomModel).where( col(MagicPlanRoomModel.magic_plan_floor_id) == row.id ) ).scalars() ) return Floor( level=row.level, name=None, rooms=[_to_room(session, r) for r in room_rows], ) def _to_room(session: Session, row: MagicPlanRoomModel) -> Room: window_rows = list( session.execute( select(MagicPlanWindowModel) .where(col(MagicPlanWindowModel.magic_plan_room_id) == row.id) .order_by( col(MagicPlanWindowModel.magic_plan_room_id), col(MagicPlanWindowModel.id), ) ).scalars() ) door_rows = list( session.execute( select(MagicPlanDoorModel).where( col(MagicPlanDoorModel.magic_plan_room_id) == row.id ) ).scalars() ) return Room( name=row.name if row.name is not None else "", width_m=row.width_m if row.width_m is not None else 0.0, length_m=row.length_m if row.length_m is not None else 0.0, area_m2=row.area_m2 if row.area_m2 is not None else 0.0, windows=[_to_window(w) for w in window_rows], doors=[_to_door(d) for d in door_rows], ) def _to_window(row: MagicPlanWindowModel) -> Window: return Window( width_m=row.width_m if row.width_m is not None else 0.0, height_m=row.height_m if row.height_m is not None else 0.0, area_m2=row.area_m2 if row.area_m2 is not None else 0.0, opening_type=row.opening_type if row.opening_type is not None else "", ) def _to_door(row: MagicPlanDoorModel) -> Door: return Door(width_mm=row.width_mm if row.width_mm is not None else 0.0) def save_plan(session: Session, plan: Plan, uploaded_file_id: int) -> None: plan_id: int = _upsert_plan(session, plan, uploaded_file_id) _delete_children(session, plan_id) floor_ids: list[int] = _insert_floors(session, plan.floors, plan_id) room_ids: list[int] = _insert_rooms(session, plan.floors, floor_ids) _insert_windows_and_doors(session, plan.floors, room_ids) def _upsert_plan(session: Session, plan: Plan, uploaded_file_id: int) -> int: stmt = ( pg_insert(MagicPlanPlanModel) .values( magic_plan_uid=plan.uid, name=plan.name, address=plan.address, postcode=plan.postcode, uploaded_file_id=uploaded_file_id, ) .on_conflict_do_update( index_elements=["magic_plan_uid"], set_={ "name": plan.name, "address": plan.address, "postcode": plan.postcode, "uploaded_file_id": uploaded_file_id, }, ) .returning(col(MagicPlanPlanModel.id)) ) row_id: int = session.execute(stmt).scalar_one() return row_id def _delete_children(session: Session, plan_id: int) -> None: floor_subq = ( select(col(MagicPlanFloorModel.id)) .where(col(MagicPlanFloorModel.magic_plan_plan_id) == plan_id) .scalar_subquery() ) room_subq = ( select(col(MagicPlanRoomModel.id)) .where(col(MagicPlanRoomModel.magic_plan_floor_id).in_(floor_subq)) .scalar_subquery() ) session.execute( delete(MagicPlanWindowModel).where( col(MagicPlanWindowModel.magic_plan_room_id).in_(room_subq) ) ) session.execute( delete(MagicPlanDoorModel).where( col(MagicPlanDoorModel.magic_plan_room_id).in_(room_subq) ) ) session.execute( delete(MagicPlanRoomModel).where( col(MagicPlanRoomModel.magic_plan_floor_id).in_(floor_subq) ) ) session.execute( delete(MagicPlanFloorModel).where( col(MagicPlanFloorModel.magic_plan_plan_id) == plan_id ) ) def _insert_floors(session: Session, floors: list[Floor], plan_id: int) -> list[int]: rows: list[dict[str, Any]] = [ {"magic_plan_plan_id": plan_id, "level": floor.level} for floor in floors ] result = session.execute( pg_insert(MagicPlanFloorModel) .values(rows) .returning(col(MagicPlanFloorModel.id)) ) return cast(list[int], list(result.scalars().all())) def _insert_rooms( session: Session, floors: list[Floor], floor_ids: list[int] ) -> list[int]: rows: list[dict[str, Any]] = [ { "magic_plan_floor_id": floor_id, "name": room.name, "width_m": room.width_m, "length_m": room.length_m, "area_m2": room.area_m2, } for floor, floor_id in zip(floors, floor_ids) for room in floor.rooms ] result = session.execute( pg_insert(MagicPlanRoomModel).values(rows).returning(col(MagicPlanRoomModel.id)) ) return cast(list[int], list(result.scalars().all())) def _insert_windows_and_doors( session: Session, floors: list[Floor], room_ids: list[int] ) -> None: all_rooms = [room for floor in floors for room in floor.rooms] window_rows: list[dict[str, Any]] = [ { "magic_plan_room_id": room_id, "width_m": window.width_m, "height_m": window.height_m, "area_m2": window.area_m2, "opening_type": window.opening_type, } for room, room_id in zip(all_rooms, room_ids) for window in room.windows ] door_rows: list[dict[str, Any]] = [ { "magic_plan_room_id": room_id, "width_mm": door.width_mm, } for room, room_id in zip(all_rooms, room_ids) for door in room.doors ] if window_rows: session.execute(pg_insert(MagicPlanWindowModel).values(window_rows)) if door_rows: session.execute(pg_insert(MagicPlanDoorModel).values(door_rows))