mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
228 lines
7.1 KiB
Python
228 lines
7.1 KiB
Python
from typing import Any, Optional, cast
|
|
|
|
from sqlalchemy import delete, select
|
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
|
from sqlmodel import Session, col
|
|
|
|
from datatypes.magicplan.domain.models import Door, Floor, Plan, Room, Window
|
|
from backend.app.db.models.magic_plan import (
|
|
MagicPlanDoorModel,
|
|
MagicPlanFloorModel,
|
|
MagicPlanPlanModel,
|
|
MagicPlanRoomModel,
|
|
MagicPlanWindowModel,
|
|
)
|
|
|
|
|
|
def get_plan_by_uploaded_file_id(
|
|
session: Session, uploaded_file_id: int
|
|
) -> Optional[Plan]:
|
|
plan_row = session.execute(
|
|
select(MagicPlanPlanModel).where(
|
|
col(MagicPlanPlanModel.uploaded_file_id) == uploaded_file_id
|
|
)
|
|
).scalar_one_or_none()
|
|
|
|
if plan_row is None:
|
|
return None
|
|
|
|
floor_rows = list(
|
|
session.execute(
|
|
select(MagicPlanFloorModel)
|
|
.where(col(MagicPlanFloorModel.magic_plan_plan_id) == plan_row.id)
|
|
.order_by(col(MagicPlanFloorModel.level))
|
|
).scalars()
|
|
)
|
|
|
|
return Plan(
|
|
uid=plan_row.magic_plan_uid if plan_row.magic_plan_uid is not None else "",
|
|
name=plan_row.name,
|
|
address=plan_row.address,
|
|
postcode=plan_row.postcode,
|
|
floors=[_to_floor(session, f) for f in floor_rows],
|
|
)
|
|
|
|
|
|
def _to_floor(session: Session, row: MagicPlanFloorModel) -> Floor:
|
|
room_rows = list(
|
|
session.execute(
|
|
select(MagicPlanRoomModel).where(
|
|
col(MagicPlanRoomModel.magic_plan_floor_id) == row.id
|
|
)
|
|
).scalars()
|
|
)
|
|
return Floor(
|
|
level=row.level,
|
|
name=None,
|
|
rooms=[_to_room(session, r) for r in room_rows],
|
|
)
|
|
|
|
|
|
def _to_room(session: Session, row: MagicPlanRoomModel) -> Room:
|
|
window_rows = list(
|
|
session.execute(
|
|
select(MagicPlanWindowModel)
|
|
.where(col(MagicPlanWindowModel.magic_plan_room_id) == row.id)
|
|
.order_by(
|
|
col(MagicPlanWindowModel.magic_plan_room_id),
|
|
col(MagicPlanWindowModel.id),
|
|
)
|
|
).scalars()
|
|
)
|
|
door_rows = list(
|
|
session.execute(
|
|
select(MagicPlanDoorModel).where(
|
|
col(MagicPlanDoorModel.magic_plan_room_id) == row.id
|
|
)
|
|
).scalars()
|
|
)
|
|
return Room(
|
|
name=row.name if row.name is not None else "",
|
|
width_m=row.width_m if row.width_m is not None else 0.0,
|
|
length_m=row.length_m if row.length_m is not None else 0.0,
|
|
area_m2=row.area_m2 if row.area_m2 is not None else 0.0,
|
|
windows=[_to_window(w) for w in window_rows],
|
|
doors=[_to_door(d) for d in door_rows],
|
|
)
|
|
|
|
|
|
def _to_window(row: MagicPlanWindowModel) -> Window:
|
|
return Window(
|
|
width_m=row.width_m if row.width_m is not None else 0.0,
|
|
height_m=row.height_m if row.height_m is not None else 0.0,
|
|
area_m2=row.area_m2 if row.area_m2 is not None else 0.0,
|
|
opening_type=row.opening_type if row.opening_type is not None else "",
|
|
)
|
|
|
|
|
|
def _to_door(row: MagicPlanDoorModel) -> Door:
|
|
return Door(width_mm=row.width_mm if row.width_mm is not None else 0.0)
|
|
|
|
|
|
def save_plan(session: Session, plan: Plan, uploaded_file_id: int) -> None:
|
|
plan_id: int = _upsert_plan(session, plan, uploaded_file_id)
|
|
_delete_children(session, plan_id)
|
|
floor_ids: list[int] = _insert_floors(session, plan.floors, plan_id)
|
|
room_ids: list[int] = _insert_rooms(session, plan.floors, floor_ids)
|
|
_insert_windows_and_doors(session, plan.floors, room_ids)
|
|
|
|
|
|
def _upsert_plan(session: Session, plan: Plan, uploaded_file_id: int) -> int:
|
|
stmt = (
|
|
pg_insert(MagicPlanPlanModel)
|
|
.values(
|
|
magic_plan_uid=plan.uid,
|
|
name=plan.name,
|
|
address=plan.address,
|
|
postcode=plan.postcode,
|
|
uploaded_file_id=uploaded_file_id,
|
|
)
|
|
.on_conflict_do_update(
|
|
index_elements=["magic_plan_uid"],
|
|
set_={
|
|
"name": plan.name,
|
|
"address": plan.address,
|
|
"postcode": plan.postcode,
|
|
"uploaded_file_id": uploaded_file_id,
|
|
},
|
|
)
|
|
.returning(col(MagicPlanPlanModel.id))
|
|
)
|
|
row_id: int = session.execute(stmt).scalar_one()
|
|
return row_id
|
|
|
|
|
|
def _delete_children(session: Session, plan_id: int) -> None:
|
|
floor_subq = (
|
|
select(col(MagicPlanFloorModel.id))
|
|
.where(col(MagicPlanFloorModel.magic_plan_plan_id) == plan_id)
|
|
.scalar_subquery()
|
|
)
|
|
room_subq = (
|
|
select(col(MagicPlanRoomModel.id))
|
|
.where(col(MagicPlanRoomModel.magic_plan_floor_id).in_(floor_subq))
|
|
.scalar_subquery()
|
|
)
|
|
session.execute(
|
|
delete(MagicPlanWindowModel).where(
|
|
col(MagicPlanWindowModel.magic_plan_room_id).in_(room_subq)
|
|
)
|
|
)
|
|
session.execute(
|
|
delete(MagicPlanDoorModel).where(
|
|
col(MagicPlanDoorModel.magic_plan_room_id).in_(room_subq)
|
|
)
|
|
)
|
|
session.execute(
|
|
delete(MagicPlanRoomModel).where(
|
|
col(MagicPlanRoomModel.magic_plan_floor_id).in_(floor_subq)
|
|
)
|
|
)
|
|
session.execute(
|
|
delete(MagicPlanFloorModel).where(
|
|
col(MagicPlanFloorModel.magic_plan_plan_id) == plan_id
|
|
)
|
|
)
|
|
|
|
|
|
def _insert_floors(session: Session, floors: list[Floor], plan_id: int) -> list[int]:
|
|
rows: list[dict[str, Any]] = [
|
|
{"magic_plan_plan_id": plan_id, "level": floor.level} for floor in floors
|
|
]
|
|
result = session.execute(
|
|
pg_insert(MagicPlanFloorModel)
|
|
.values(rows)
|
|
.returning(col(MagicPlanFloorModel.id))
|
|
)
|
|
return cast(list[int], list(result.scalars().all()))
|
|
|
|
|
|
def _insert_rooms(
|
|
session: Session, floors: list[Floor], floor_ids: list[int]
|
|
) -> list[int]:
|
|
rows: list[dict[str, Any]] = [
|
|
{
|
|
"magic_plan_floor_id": floor_id,
|
|
"name": room.name,
|
|
"width_m": room.width_m,
|
|
"length_m": room.length_m,
|
|
"area_m2": room.area_m2,
|
|
}
|
|
for floor, floor_id in zip(floors, floor_ids)
|
|
for room in floor.rooms
|
|
]
|
|
result = session.execute(
|
|
pg_insert(MagicPlanRoomModel).values(rows).returning(col(MagicPlanRoomModel.id))
|
|
)
|
|
return cast(list[int], list(result.scalars().all()))
|
|
|
|
|
|
def _insert_windows_and_doors(
|
|
session: Session, floors: list[Floor], room_ids: list[int]
|
|
) -> None:
|
|
all_rooms = [room for floor in floors for room in floor.rooms]
|
|
|
|
window_rows: list[dict[str, Any]] = [
|
|
{
|
|
"magic_plan_room_id": room_id,
|
|
"width_m": window.width_m,
|
|
"height_m": window.height_m,
|
|
"area_m2": window.area_m2,
|
|
"opening_type": window.opening_type,
|
|
}
|
|
for room, room_id in zip(all_rooms, room_ids)
|
|
for window in room.windows
|
|
]
|
|
door_rows: list[dict[str, Any]] = [
|
|
{
|
|
"magic_plan_room_id": room_id,
|
|
"width_mm": door.width_mm,
|
|
}
|
|
for room, room_id in zip(all_rooms, room_ids)
|
|
for door in room.doors
|
|
]
|
|
|
|
if window_rows:
|
|
session.execute(pg_insert(MagicPlanWindowModel).values(window_rows))
|
|
if door_rows:
|
|
session.execute(pg_insert(MagicPlanDoorModel).values(door_rows))
|