mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-30 13:10:47 +00:00
get_plan_by_uploaded_file_id 🟩
This commit is contained in:
parent
0ccb0e0bf9
commit
f8fcf38886
3 changed files with 209 additions and 2 deletions
|
|
@ -1,12 +1,20 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Any, cast
|
||||
from typing import Any, Optional, cast
|
||||
|
||||
from sqlalchemy import delete, select
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
from sqlmodel import Session, col
|
||||
|
||||
from domain.magicplan.models import Floor, Plan
|
||||
from domain.magicplan.models import (
|
||||
Door,
|
||||
DoorVentilation,
|
||||
Floor,
|
||||
Plan,
|
||||
Room,
|
||||
Window,
|
||||
WindowVentilation,
|
||||
)
|
||||
from infrastructure.postgres.magic_plan_tables import (
|
||||
MagicPlanDoorModel,
|
||||
MagicPlanDoorVentilationModel,
|
||||
|
|
@ -23,6 +31,144 @@ class MagicPlanPostgresRepository(MagicPlanRepository):
|
|||
def __init__(self, session: Session) -> None:
|
||||
self._session = session
|
||||
|
||||
def get_plan_by_uploaded_file_id(self, uploaded_file_id: int) -> Optional[Plan]:
|
||||
plan_row = self._session.execute( # pyright: ignore[reportDeprecated]
|
||||
select(MagicPlanPlanModel).where(
|
||||
col(MagicPlanPlanModel.uploaded_file_id) == uploaded_file_id
|
||||
)
|
||||
).scalars().one_or_none()
|
||||
if plan_row is None:
|
||||
return None
|
||||
|
||||
plan_id = cast(int, plan_row.id)
|
||||
|
||||
floor_rows = list(
|
||||
self._session.execute( # pyright: ignore[reportDeprecated]
|
||||
select(MagicPlanFloorModel).where(
|
||||
col(MagicPlanFloorModel.magic_plan_plan_id) == plan_id
|
||||
)
|
||||
).scalars().all()
|
||||
)
|
||||
floor_ids = [cast(int, f.id) for f in floor_rows]
|
||||
|
||||
room_rows = list(
|
||||
self._session.execute( # pyright: ignore[reportDeprecated]
|
||||
select(MagicPlanRoomModel).where(
|
||||
col(MagicPlanRoomModel.magic_plan_floor_id).in_(floor_ids)
|
||||
)
|
||||
).scalars().all()
|
||||
)
|
||||
room_ids = [cast(int, r.id) for r in room_rows]
|
||||
|
||||
window_rows = list(
|
||||
self._session.execute( # pyright: ignore[reportDeprecated]
|
||||
select(MagicPlanWindowModel).where(
|
||||
col(MagicPlanWindowModel.magic_plan_room_id).in_(room_ids)
|
||||
)
|
||||
).scalars().all()
|
||||
)
|
||||
window_ids = [cast(int, w.id) for w in window_rows]
|
||||
|
||||
door_rows = list(
|
||||
self._session.execute( # pyright: ignore[reportDeprecated]
|
||||
select(MagicPlanDoorModel).where(
|
||||
col(MagicPlanDoorModel.magic_plan_room_id).in_(room_ids)
|
||||
)
|
||||
).scalars().all()
|
||||
)
|
||||
door_ids = [cast(int, d.id) for d in door_rows]
|
||||
|
||||
window_vent_by_window_id = {
|
||||
cast(int, wv.magic_plan_window_id): wv
|
||||
for wv in self._session.execute( # pyright: ignore[reportDeprecated]
|
||||
select(MagicPlanWindowVentilationModel).where(
|
||||
col(MagicPlanWindowVentilationModel.magic_plan_window_id).in_(
|
||||
window_ids
|
||||
)
|
||||
)
|
||||
).scalars().all()
|
||||
}
|
||||
door_vent_by_door_id = {
|
||||
cast(int, dv.magic_plan_door_id): dv
|
||||
for dv in self._session.execute( # pyright: ignore[reportDeprecated]
|
||||
select(MagicPlanDoorVentilationModel).where(
|
||||
col(MagicPlanDoorVentilationModel.magic_plan_door_id).in_(door_ids)
|
||||
)
|
||||
).scalars().all()
|
||||
}
|
||||
|
||||
windows_by_room_id: dict[int, list[Window]] = {}
|
||||
for w_row, w_id in zip(window_rows, window_ids):
|
||||
room_id = cast(int, w_row.magic_plan_room_id)
|
||||
wv_row = window_vent_by_window_id.get(w_id)
|
||||
ventilation = (
|
||||
WindowVentilation(
|
||||
opening_type=wv_row.opening_type,
|
||||
num_openings=wv_row.num_openings,
|
||||
pct_openable=wv_row.pct_openable,
|
||||
trickle_vent_area_mm2=wv_row.trickle_vent_area_mm2,
|
||||
num_trickle_vents=wv_row.num_trickle_vents,
|
||||
)
|
||||
if wv_row is not None
|
||||
else None
|
||||
)
|
||||
windows_by_room_id.setdefault(room_id, []).append(
|
||||
Window(
|
||||
width_m=cast(float, w_row.width_m),
|
||||
height_m=cast(float, w_row.height_m),
|
||||
area_m2=cast(float, w_row.area_m2),
|
||||
ventilation=ventilation,
|
||||
)
|
||||
)
|
||||
|
||||
doors_by_room_id: dict[int, list[Door]] = {}
|
||||
for d_row, d_id in zip(door_rows, door_ids):
|
||||
room_id = cast(int, d_row.magic_plan_room_id)
|
||||
dv_row = door_vent_by_door_id.get(d_id)
|
||||
ventilation = (
|
||||
DoorVentilation(undercut_mm=dv_row.undercut_mm)
|
||||
if dv_row is not None
|
||||
else None
|
||||
)
|
||||
doors_by_room_id.setdefault(room_id, []).append(
|
||||
Door(
|
||||
width_mm=cast(float, d_row.width_mm),
|
||||
height_mm=cast(float, d_row.height_mm),
|
||||
ventilation=ventilation,
|
||||
)
|
||||
)
|
||||
|
||||
rooms_by_floor_id: dict[int, list[Room]] = {}
|
||||
for r_row, r_id in zip(room_rows, room_ids):
|
||||
floor_id = cast(int, r_row.magic_plan_floor_id)
|
||||
rooms_by_floor_id.setdefault(floor_id, []).append(
|
||||
Room(
|
||||
name=cast(str, r_row.name),
|
||||
width_m=cast(float, r_row.width_m),
|
||||
length_m=cast(float, r_row.length_m),
|
||||
area_m2=cast(float, r_row.area_m2),
|
||||
windows=windows_by_room_id.get(r_id, []),
|
||||
doors=doors_by_room_id.get(r_id, []),
|
||||
)
|
||||
)
|
||||
|
||||
floors = [
|
||||
Floor(
|
||||
level=f_row.level,
|
||||
name=None,
|
||||
rooms=rooms_by_floor_id.get(cast(int, f_row.id), []),
|
||||
)
|
||||
for f_row in floor_rows
|
||||
]
|
||||
|
||||
return Plan(
|
||||
uid=cast(str, plan_row.magic_plan_uid),
|
||||
name=plan_row.name,
|
||||
address=plan_row.address,
|
||||
postcode=plan_row.postcode,
|
||||
floors=floors,
|
||||
)
|
||||
|
||||
def save(self, plan: Plan, uploaded_file_id: int) -> None:
|
||||
plan_id = self._upsert_plan(plan, uploaded_file_id)
|
||||
self._delete_children(plan_id)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Optional
|
||||
|
||||
from domain.magicplan.models import Plan
|
||||
|
||||
|
|
@ -16,3 +17,6 @@ class MagicPlanRepository(ABC):
|
|||
|
||||
@abstractmethod
|
||||
def save(self, plan: Plan, uploaded_file_id: int) -> None: ...
|
||||
|
||||
@abstractmethod
|
||||
def get_plan_by_uploaded_file_id(self, uploaded_file_id: int) -> Optional[Plan]: ...
|
||||
|
|
|
|||
|
|
@ -91,6 +91,63 @@ def test_save_writes_ventilation_rows(db_engine: Engine) -> None:
|
|||
assert len(session.exec(select(MagicPlanDoorVentilationModel)).all()) == 1
|
||||
|
||||
|
||||
def test_get_plan_by_uploaded_file_id_returns_plan(db_engine: Engine) -> None:
|
||||
# Arrange
|
||||
plan = _plan()
|
||||
|
||||
with Session(db_engine) as session:
|
||||
MagicPlanPostgresRepository(session).save(plan, uploaded_file_id=42)
|
||||
session.commit()
|
||||
|
||||
# Act
|
||||
with Session(db_engine) as session:
|
||||
result = MagicPlanPostgresRepository(session).get_plan_by_uploaded_file_id(42)
|
||||
|
||||
# Assert — full aggregate reconstructed; floor.name is not persisted (accepted data gap)
|
||||
assert result is not None
|
||||
assert result.uid == plan.uid
|
||||
assert result.name == plan.name
|
||||
assert result.address == plan.address
|
||||
assert result.postcode == plan.postcode
|
||||
assert len(result.floors) == 1
|
||||
result_floor = result.floors[0]
|
||||
assert result_floor.level == plan.floors[0].level
|
||||
assert result_floor.name is None # floor.name is not persisted per PRD
|
||||
assert len(result_floor.rooms) == 1
|
||||
result_room = result_floor.rooms[0]
|
||||
source_room = plan.floors[0].rooms[0]
|
||||
assert result_room.name == source_room.name
|
||||
assert result_room.width_m == source_room.width_m
|
||||
assert result_room.length_m == source_room.length_m
|
||||
assert result_room.area_m2 == source_room.area_m2
|
||||
assert len(result_room.windows) == 1
|
||||
result_window = result_room.windows[0]
|
||||
source_window = source_room.windows[0]
|
||||
assert result_window.width_m == source_window.width_m
|
||||
assert result_window.height_m == source_window.height_m
|
||||
assert result_window.area_m2 == source_window.area_m2
|
||||
assert result_window.ventilation is not None
|
||||
assert result_window.ventilation == source_window.ventilation
|
||||
assert len(result_room.doors) == 1
|
||||
result_door = result_room.doors[0]
|
||||
source_door = source_room.doors[0]
|
||||
assert result_door.width_mm == source_door.width_mm
|
||||
assert result_door.height_mm == source_door.height_mm
|
||||
assert result_door.ventilation is not None
|
||||
assert result_door.ventilation == source_door.ventilation
|
||||
|
||||
|
||||
def test_get_plan_by_uploaded_file_id_returns_none_when_not_found(db_engine: Engine) -> None:
|
||||
# Arrange — nothing saved for uploaded_file_id=999
|
||||
|
||||
# Act
|
||||
with Session(db_engine) as session:
|
||||
result = MagicPlanPostgresRepository(session).get_plan_by_uploaded_file_id(999)
|
||||
|
||||
# Assert
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_save_is_idempotent(db_engine: Engine) -> None:
|
||||
# Arrange
|
||||
plan = _plan()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue