mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
193 lines
7.7 KiB
Python
193 lines
7.7 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any, cast
|
|
|
|
from sqlalchemy import delete, select
|
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
|
from sqlmodel import Session, col
|
|
|
|
from domain.magicplan.models import Floor, Plan
|
|
from infrastructure.postgres.magic_plan_tables import (
|
|
MagicPlanDoorModel,
|
|
MagicPlanDoorVentilationModel,
|
|
MagicPlanFloorModel,
|
|
MagicPlanPlanModel,
|
|
MagicPlanRoomModel,
|
|
MagicPlanWindowModel,
|
|
MagicPlanWindowVentilationModel,
|
|
)
|
|
from repositories.magic_plan.magic_plan_repository import MagicPlanRepository
|
|
|
|
|
|
class MagicPlanPostgresRepository(MagicPlanRepository):
|
|
def __init__(self, session: Session) -> None:
|
|
self._session = session
|
|
|
|
def save(self, plan: Plan, uploaded_file_id: int) -> None:
|
|
plan_id = self._upsert_plan(plan, uploaded_file_id)
|
|
self._delete_children(plan_id)
|
|
floor_ids = self._insert_floors(plan.floors, plan_id)
|
|
room_ids = self._insert_rooms(plan.floors, floor_ids)
|
|
window_ids, door_ids = self._insert_windows_and_doors(plan.floors, room_ids)
|
|
self._insert_ventilation(plan.floors, window_ids, door_ids)
|
|
|
|
def _upsert_plan(self, plan: Plan, uploaded_file_id: int) -> int:
|
|
row_data: dict[str, Any] = MagicPlanPlanModel.from_domain(
|
|
plan, uploaded_file_id
|
|
).model_dump(exclude={"id"})
|
|
stmt = (
|
|
pg_insert(MagicPlanPlanModel)
|
|
.values(**row_data)
|
|
.on_conflict_do_update(
|
|
index_elements=["magic_plan_uid"],
|
|
set_={k: v for k, v in row_data.items() if k != "magic_plan_uid"},
|
|
)
|
|
.returning(col(MagicPlanPlanModel.id))
|
|
)
|
|
return cast(
|
|
int, self._session.execute(stmt).scalar_one()
|
|
) # pyright: ignore[reportDeprecated]
|
|
|
|
def _delete_children(self, plan_id: int) -> None:
|
|
floor_subq = (
|
|
select(col(MagicPlanFloorModel.id))
|
|
.where(col(MagicPlanFloorModel.magic_plan_plan_id) == plan_id)
|
|
.scalar_subquery()
|
|
)
|
|
room_subq = (
|
|
select(col(MagicPlanRoomModel.id))
|
|
.where(col(MagicPlanRoomModel.magic_plan_floor_id).in_(floor_subq))
|
|
.scalar_subquery()
|
|
)
|
|
window_subq = (
|
|
select(col(MagicPlanWindowModel.id))
|
|
.where(col(MagicPlanWindowModel.magic_plan_room_id).in_(room_subq))
|
|
.scalar_subquery()
|
|
)
|
|
door_subq = (
|
|
select(col(MagicPlanDoorModel.id))
|
|
.where(col(MagicPlanDoorModel.magic_plan_room_id).in_(room_subq))
|
|
.scalar_subquery()
|
|
)
|
|
self._session.execute( # pyright: ignore[reportDeprecated]
|
|
delete(MagicPlanWindowVentilationModel).where(
|
|
col(MagicPlanWindowVentilationModel.magic_plan_window_id).in_(
|
|
window_subq
|
|
)
|
|
)
|
|
)
|
|
self._session.execute( # pyright: ignore[reportDeprecated]
|
|
delete(MagicPlanDoorVentilationModel).where(
|
|
col(MagicPlanDoorVentilationModel.magic_plan_door_id).in_(door_subq)
|
|
)
|
|
)
|
|
self._session.execute( # pyright: ignore[reportDeprecated]
|
|
delete(MagicPlanWindowModel).where(
|
|
col(MagicPlanWindowModel.magic_plan_room_id).in_(room_subq)
|
|
)
|
|
)
|
|
self._session.execute( # pyright: ignore[reportDeprecated]
|
|
delete(MagicPlanDoorModel).where(
|
|
col(MagicPlanDoorModel.magic_plan_room_id).in_(room_subq)
|
|
)
|
|
)
|
|
self._session.execute( # pyright: ignore[reportDeprecated]
|
|
delete(MagicPlanRoomModel).where(
|
|
col(MagicPlanRoomModel.magic_plan_floor_id).in_(floor_subq)
|
|
)
|
|
)
|
|
self._session.execute( # pyright: ignore[reportDeprecated]
|
|
delete(MagicPlanFloorModel).where(
|
|
col(MagicPlanFloorModel.magic_plan_plan_id) == plan_id
|
|
)
|
|
)
|
|
|
|
def _insert_floors(self, floors: list[Floor], plan_id: int) -> list[int]:
|
|
rows: list[dict[str, Any]] = [
|
|
MagicPlanFloorModel.from_domain(floor, plan_id).model_dump(exclude={"id"})
|
|
for floor in floors
|
|
]
|
|
result = self._session.execute( # pyright: ignore[reportDeprecated]
|
|
pg_insert(MagicPlanFloorModel)
|
|
.values(rows)
|
|
.returning(col(MagicPlanFloorModel.id))
|
|
)
|
|
return cast(list[int], list(result.scalars().all()))
|
|
|
|
def _insert_rooms(self, floors: list[Floor], floor_ids: list[int]) -> list[int]:
|
|
rows: list[dict[str, Any]] = [
|
|
MagicPlanRoomModel.from_domain(room, floor_id).model_dump(exclude={"id"})
|
|
for floor, floor_id in zip(floors, floor_ids)
|
|
for room in floor.rooms
|
|
]
|
|
result = self._session.execute( # pyright: ignore[reportDeprecated]
|
|
pg_insert(MagicPlanRoomModel)
|
|
.values(rows)
|
|
.returning(col(MagicPlanRoomModel.id))
|
|
)
|
|
return cast(list[int], list(result.scalars().all()))
|
|
|
|
def _insert_windows_and_doors(
|
|
self, floors: list[Floor], room_ids: list[int]
|
|
) -> tuple[list[int], list[int]]:
|
|
all_rooms = [room for floor in floors for room in floor.rooms]
|
|
window_rows: list[dict[str, Any]] = [
|
|
MagicPlanWindowModel.from_domain(window, room_id).model_dump(exclude={"id"})
|
|
for room, room_id in zip(all_rooms, room_ids)
|
|
for window in room.windows
|
|
]
|
|
door_rows: list[dict[str, Any]] = [
|
|
MagicPlanDoorModel.from_domain(door, room_id).model_dump(exclude={"id"})
|
|
for room, room_id in zip(all_rooms, room_ids)
|
|
for door in room.doors
|
|
]
|
|
window_ids: list[int] = []
|
|
door_ids: list[int] = []
|
|
if window_rows:
|
|
result = self._session.execute( # pyright: ignore[reportDeprecated]
|
|
pg_insert(MagicPlanWindowModel)
|
|
.values(window_rows)
|
|
.returning(col(MagicPlanWindowModel.id))
|
|
)
|
|
window_ids = cast(list[int], list(result.scalars().all()))
|
|
if door_rows:
|
|
result = self._session.execute( # pyright: ignore[reportDeprecated]
|
|
pg_insert(MagicPlanDoorModel)
|
|
.values(door_rows)
|
|
.returning(col(MagicPlanDoorModel.id))
|
|
)
|
|
door_ids = cast(list[int], list(result.scalars().all()))
|
|
return window_ids, door_ids
|
|
|
|
def _insert_ventilation(
|
|
self,
|
|
floors: list[Floor],
|
|
window_ids: list[int],
|
|
door_ids: list[int],
|
|
) -> None:
|
|
all_rooms = [room for floor in floors for room in floor.rooms]
|
|
all_windows = [w for room in all_rooms for w in room.windows]
|
|
all_doors = [d for room in all_rooms for d in room.doors]
|
|
|
|
window_vent_rows: list[dict[str, Any]] = [
|
|
MagicPlanWindowVentilationModel.from_domain(w.ventilation, wid).model_dump(
|
|
exclude={"id"}
|
|
)
|
|
for w, wid in zip(all_windows, window_ids)
|
|
if w.ventilation is not None
|
|
]
|
|
door_vent_rows: list[dict[str, Any]] = [
|
|
MagicPlanDoorVentilationModel.from_domain(d.ventilation, did).model_dump(
|
|
exclude={"id"}
|
|
)
|
|
for d, did in zip(all_doors, door_ids)
|
|
if d.ventilation is not None
|
|
]
|
|
if window_vent_rows:
|
|
self._session.execute( # pyright: ignore[reportDeprecated]
|
|
pg_insert(MagicPlanWindowVentilationModel).values(window_vent_rows)
|
|
)
|
|
if door_vent_rows:
|
|
self._session.execute( # pyright: ignore[reportDeprecated]
|
|
pg_insert(MagicPlanDoorVentilationModel).values(door_vent_rows)
|
|
)
|