Retrieve plan by uploaded_file_id 🟪

This commit is contained in:
Daniel Roth 2026-06-08 15:44:19 +00:00
parent f8fcf38886
commit 7497865fb2

View file

@ -1,6 +1,6 @@
from __future__ import annotations
from typing import Any, Optional, cast
from typing import Any, NamedTuple, Optional, cast
from sqlalchemy import delete, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
@ -27,148 +27,153 @@ from infrastructure.postgres.magic_plan_tables import (
from repositories.magic_plan.magic_plan_repository import MagicPlanRepository
class _Rows(NamedTuple):
floors: list[MagicPlanFloorModel]
rooms: list[MagicPlanRoomModel]
windows: list[MagicPlanWindowModel]
doors: list[MagicPlanDoorModel]
win_vents: list[MagicPlanWindowVentilationModel]
door_vents: list[MagicPlanDoorVentilationModel]
def _build_windows(
rows: list[MagicPlanWindowModel],
vents: list[MagicPlanWindowVentilationModel],
) -> dict[int, list[Window]]:
vent_by_id = {wv.magic_plan_window_id: wv for wv in vents}
result: dict[int, list[Window]] = {}
for row in rows:
wv = vent_by_id.get(cast(int, row.id))
result.setdefault(row.magic_plan_room_id, []).append(
Window(
width_m=cast(float, row.width_m),
height_m=cast(float, row.height_m),
area_m2=cast(float, row.area_m2),
ventilation=WindowVentilation(
opening_type=wv.opening_type,
num_openings=wv.num_openings,
pct_openable=wv.pct_openable,
trickle_vent_area_mm2=wv.trickle_vent_area_mm2,
num_trickle_vents=wv.num_trickle_vents,
) if wv else None,
)
)
return result
def _build_doors(
rows: list[MagicPlanDoorModel],
vents: list[MagicPlanDoorVentilationModel],
) -> dict[int, list[Door]]:
vent_by_id = {dv.magic_plan_door_id: dv for dv in vents}
result: dict[int, list[Door]] = {}
for row in rows:
dv = vent_by_id.get(cast(int, row.id))
result.setdefault(row.magic_plan_room_id, []).append(
Door(
width_mm=cast(float, row.width_mm),
height_mm=cast(float, row.height_mm),
ventilation=DoorVentilation(undercut_mm=dv.undercut_mm) if dv else None,
)
)
return result
def _build_rooms(
rows: list[MagicPlanRoomModel],
windows_by_room: dict[int, list[Window]],
doors_by_room: dict[int, list[Door]],
) -> dict[int, list[Room]]:
result: dict[int, list[Room]] = {}
for row in rows:
room_id = cast(int, row.id)
result.setdefault(row.magic_plan_floor_id, []).append(
Room(
name=cast(str, row.name),
width_m=cast(float, row.width_m),
length_m=cast(float, row.length_m),
area_m2=cast(float, row.area_m2),
windows=windows_by_room.get(room_id, []),
doors=doors_by_room.get(room_id, []),
)
)
return result
class MagicPlanPostgresRepository(MagicPlanRepository):
def __init__(self, session: Session) -> None:
self._session = session
def get_plan_by_uploaded_file_id(self, uploaded_file_id: int) -> Optional[Plan]:
plan_row = self._session.execute( # pyright: ignore[reportDeprecated]
plan_row = self._fetch_one(
select(MagicPlanPlanModel).where(
col(MagicPlanPlanModel.uploaded_file_id) == uploaded_file_id
)
).scalars().one_or_none()
)
if plan_row is None:
return None
plan_id = cast(int, plan_row.id)
floor_rows = list(
self._session.execute( # pyright: ignore[reportDeprecated]
select(MagicPlanFloorModel).where(
col(MagicPlanFloorModel.magic_plan_plan_id) == plan_id
)
).scalars().all()
)
floor_ids = [cast(int, f.id) for f in floor_rows]
room_rows = list(
self._session.execute( # pyright: ignore[reportDeprecated]
select(MagicPlanRoomModel).where(
col(MagicPlanRoomModel.magic_plan_floor_id).in_(floor_ids)
)
).scalars().all()
)
room_ids = [cast(int, r.id) for r in room_rows]
window_rows = list(
self._session.execute( # pyright: ignore[reportDeprecated]
select(MagicPlanWindowModel).where(
col(MagicPlanWindowModel.magic_plan_room_id).in_(room_ids)
)
).scalars().all()
)
window_ids = [cast(int, w.id) for w in window_rows]
door_rows = list(
self._session.execute( # pyright: ignore[reportDeprecated]
select(MagicPlanDoorModel).where(
col(MagicPlanDoorModel.magic_plan_room_id).in_(room_ids)
)
).scalars().all()
)
door_ids = [cast(int, d.id) for d in door_rows]
window_vent_by_window_id = {
cast(int, wv.magic_plan_window_id): wv
for wv in self._session.execute( # pyright: ignore[reportDeprecated]
select(MagicPlanWindowVentilationModel).where(
col(MagicPlanWindowVentilationModel.magic_plan_window_id).in_(
window_ids
)
)
).scalars().all()
}
door_vent_by_door_id = {
cast(int, dv.magic_plan_door_id): dv
for dv in self._session.execute( # pyright: ignore[reportDeprecated]
select(MagicPlanDoorVentilationModel).where(
col(MagicPlanDoorVentilationModel.magic_plan_door_id).in_(door_ids)
)
).scalars().all()
}
windows_by_room_id: dict[int, list[Window]] = {}
for w_row, w_id in zip(window_rows, window_ids):
room_id = cast(int, w_row.magic_plan_room_id)
wv_row = window_vent_by_window_id.get(w_id)
ventilation = (
WindowVentilation(
opening_type=wv_row.opening_type,
num_openings=wv_row.num_openings,
pct_openable=wv_row.pct_openable,
trickle_vent_area_mm2=wv_row.trickle_vent_area_mm2,
num_trickle_vents=wv_row.num_trickle_vents,
)
if wv_row is not None
else None
)
windows_by_room_id.setdefault(room_id, []).append(
Window(
width_m=cast(float, w_row.width_m),
height_m=cast(float, w_row.height_m),
area_m2=cast(float, w_row.area_m2),
ventilation=ventilation,
)
)
doors_by_room_id: dict[int, list[Door]] = {}
for d_row, d_id in zip(door_rows, door_ids):
room_id = cast(int, d_row.magic_plan_room_id)
dv_row = door_vent_by_door_id.get(d_id)
ventilation = (
DoorVentilation(undercut_mm=dv_row.undercut_mm)
if dv_row is not None
else None
)
doors_by_room_id.setdefault(room_id, []).append(
Door(
width_mm=cast(float, d_row.width_mm),
height_mm=cast(float, d_row.height_mm),
ventilation=ventilation,
)
)
rooms_by_floor_id: dict[int, list[Room]] = {}
for r_row, r_id in zip(room_rows, room_ids):
floor_id = cast(int, r_row.magic_plan_floor_id)
rooms_by_floor_id.setdefault(floor_id, []).append(
Room(
name=cast(str, r_row.name),
width_m=cast(float, r_row.width_m),
length_m=cast(float, r_row.length_m),
area_m2=cast(float, r_row.area_m2),
windows=windows_by_room_id.get(r_id, []),
doors=doors_by_room_id.get(r_id, []),
)
)
floors = [
Floor(
level=f_row.level,
name=None,
rooms=rooms_by_floor_id.get(cast(int, f_row.id), []),
)
for f_row in floor_rows
]
rows = self._fetch_rows(cast(int, plan_row.id))
windows_by_room = _build_windows(rows.windows, rows.win_vents)
doors_by_room = _build_doors(rows.doors, rows.door_vents)
rooms_by_floor = _build_rooms(rows.rooms, windows_by_room, doors_by_room)
return Plan(
uid=cast(str, plan_row.magic_plan_uid),
name=plan_row.name,
address=plan_row.address,
postcode=plan_row.postcode,
floors=floors,
floors=[
Floor(level=f.level, name=None, rooms=rooms_by_floor.get(cast(int, f.id), []))
for f in rows.floors
],
)
def _fetch_rows(self, plan_id: int) -> _Rows:
floor_rows: list[MagicPlanFloorModel] = self._fetch_many(
select(MagicPlanFloorModel).where(
col(MagicPlanFloorModel.magic_plan_plan_id) == plan_id
)
)
floor_ids = [cast(int, f.id) for f in floor_rows]
room_rows: list[MagicPlanRoomModel] = self._fetch_many(
select(MagicPlanRoomModel).where(
col(MagicPlanRoomModel.magic_plan_floor_id).in_(floor_ids)
)
)
room_ids = [cast(int, r.id) for r in room_rows]
window_rows: list[MagicPlanWindowModel] = self._fetch_many(
select(MagicPlanWindowModel).where(
col(MagicPlanWindowModel.magic_plan_room_id).in_(room_ids)
)
)
door_rows: list[MagicPlanDoorModel] = self._fetch_many(
select(MagicPlanDoorModel).where(
col(MagicPlanDoorModel.magic_plan_room_id).in_(room_ids)
)
)
win_vents: list[MagicPlanWindowVentilationModel] = self._fetch_many(
select(MagicPlanWindowVentilationModel).where(
col(MagicPlanWindowVentilationModel.magic_plan_window_id).in_(
[cast(int, w.id) for w in window_rows]
)
)
)
door_vents: list[MagicPlanDoorVentilationModel] = self._fetch_many(
select(MagicPlanDoorVentilationModel).where(
col(MagicPlanDoorVentilationModel.magic_plan_door_id).in_(
[cast(int, d.id) for d in door_rows]
)
)
)
return _Rows(floor_rows, room_rows, window_rows, door_rows, win_vents, door_vents)
def _fetch_one(self, stmt: Any) -> Any:
return self._session.execute(stmt).scalars().one_or_none() # pyright: ignore[reportDeprecated]
def _fetch_many(self, stmt: Any) -> Any:
return list(self._session.execute(stmt).scalars().all()) # pyright: ignore[reportDeprecated]
def save(self, plan: Plan, uploaded_file_id: int) -> None:
plan_id = self._upsert_plan(plan, uploaded_file_id)
self._delete_children(plan_id)