diff --git a/repositories/magic_plan/magic_plan_postgres_repository.py b/repositories/magic_plan/magic_plan_postgres_repository.py index c2df041e..f8ff123c 100644 --- a/repositories/magic_plan/magic_plan_postgres_repository.py +++ b/repositories/magic_plan/magic_plan_postgres_repository.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, Optional, cast +from typing import Any, NamedTuple, Optional, cast from sqlalchemy import delete, select from sqlalchemy.dialects.postgresql import insert as pg_insert @@ -27,148 +27,153 @@ from infrastructure.postgres.magic_plan_tables import ( from repositories.magic_plan.magic_plan_repository import MagicPlanRepository +class _Rows(NamedTuple): + floors: list[MagicPlanFloorModel] + rooms: list[MagicPlanRoomModel] + windows: list[MagicPlanWindowModel] + doors: list[MagicPlanDoorModel] + win_vents: list[MagicPlanWindowVentilationModel] + door_vents: list[MagicPlanDoorVentilationModel] + + +def _build_windows( + rows: list[MagicPlanWindowModel], + vents: list[MagicPlanWindowVentilationModel], +) -> dict[int, list[Window]]: + vent_by_id = {wv.magic_plan_window_id: wv for wv in vents} + result: dict[int, list[Window]] = {} + for row in rows: + wv = vent_by_id.get(cast(int, row.id)) + result.setdefault(row.magic_plan_room_id, []).append( + Window( + width_m=cast(float, row.width_m), + height_m=cast(float, row.height_m), + area_m2=cast(float, row.area_m2), + ventilation=WindowVentilation( + opening_type=wv.opening_type, + num_openings=wv.num_openings, + pct_openable=wv.pct_openable, + trickle_vent_area_mm2=wv.trickle_vent_area_mm2, + num_trickle_vents=wv.num_trickle_vents, + ) if wv else None, + ) + ) + return result + + +def _build_doors( + rows: list[MagicPlanDoorModel], + vents: list[MagicPlanDoorVentilationModel], +) -> dict[int, list[Door]]: + vent_by_id = {dv.magic_plan_door_id: dv for dv in vents} + result: dict[int, list[Door]] = {} + for row in rows: + dv = vent_by_id.get(cast(int, row.id)) + result.setdefault(row.magic_plan_room_id, []).append( + Door( + width_mm=cast(float, row.width_mm), + height_mm=cast(float, row.height_mm), + ventilation=DoorVentilation(undercut_mm=dv.undercut_mm) if dv else None, + ) + ) + return result + + +def _build_rooms( + rows: list[MagicPlanRoomModel], + windows_by_room: dict[int, list[Window]], + doors_by_room: dict[int, list[Door]], +) -> dict[int, list[Room]]: + result: dict[int, list[Room]] = {} + for row in rows: + room_id = cast(int, row.id) + result.setdefault(row.magic_plan_floor_id, []).append( + Room( + name=cast(str, row.name), + width_m=cast(float, row.width_m), + length_m=cast(float, row.length_m), + area_m2=cast(float, row.area_m2), + windows=windows_by_room.get(room_id, []), + doors=doors_by_room.get(room_id, []), + ) + ) + return result + + class MagicPlanPostgresRepository(MagicPlanRepository): def __init__(self, session: Session) -> None: self._session = session def get_plan_by_uploaded_file_id(self, uploaded_file_id: int) -> Optional[Plan]: - plan_row = self._session.execute( # pyright: ignore[reportDeprecated] + plan_row = self._fetch_one( select(MagicPlanPlanModel).where( col(MagicPlanPlanModel.uploaded_file_id) == uploaded_file_id ) - ).scalars().one_or_none() + ) if plan_row is None: return None - plan_id = cast(int, plan_row.id) - - floor_rows = list( - self._session.execute( # pyright: ignore[reportDeprecated] - select(MagicPlanFloorModel).where( - col(MagicPlanFloorModel.magic_plan_plan_id) == plan_id - ) - ).scalars().all() - ) - floor_ids = [cast(int, f.id) for f in floor_rows] - - room_rows = list( - self._session.execute( # pyright: ignore[reportDeprecated] - select(MagicPlanRoomModel).where( - col(MagicPlanRoomModel.magic_plan_floor_id).in_(floor_ids) - ) - ).scalars().all() - ) - room_ids = [cast(int, r.id) for r in room_rows] - - window_rows = list( - self._session.execute( # pyright: ignore[reportDeprecated] - select(MagicPlanWindowModel).where( - col(MagicPlanWindowModel.magic_plan_room_id).in_(room_ids) - ) - ).scalars().all() - ) - window_ids = [cast(int, w.id) for w in window_rows] - - door_rows = list( - self._session.execute( # pyright: ignore[reportDeprecated] - select(MagicPlanDoorModel).where( - col(MagicPlanDoorModel.magic_plan_room_id).in_(room_ids) - ) - ).scalars().all() - ) - door_ids = [cast(int, d.id) for d in door_rows] - - window_vent_by_window_id = { - cast(int, wv.magic_plan_window_id): wv - for wv in self._session.execute( # pyright: ignore[reportDeprecated] - select(MagicPlanWindowVentilationModel).where( - col(MagicPlanWindowVentilationModel.magic_plan_window_id).in_( - window_ids - ) - ) - ).scalars().all() - } - door_vent_by_door_id = { - cast(int, dv.magic_plan_door_id): dv - for dv in self._session.execute( # pyright: ignore[reportDeprecated] - select(MagicPlanDoorVentilationModel).where( - col(MagicPlanDoorVentilationModel.magic_plan_door_id).in_(door_ids) - ) - ).scalars().all() - } - - windows_by_room_id: dict[int, list[Window]] = {} - for w_row, w_id in zip(window_rows, window_ids): - room_id = cast(int, w_row.magic_plan_room_id) - wv_row = window_vent_by_window_id.get(w_id) - ventilation = ( - WindowVentilation( - opening_type=wv_row.opening_type, - num_openings=wv_row.num_openings, - pct_openable=wv_row.pct_openable, - trickle_vent_area_mm2=wv_row.trickle_vent_area_mm2, - num_trickle_vents=wv_row.num_trickle_vents, - ) - if wv_row is not None - else None - ) - windows_by_room_id.setdefault(room_id, []).append( - Window( - width_m=cast(float, w_row.width_m), - height_m=cast(float, w_row.height_m), - area_m2=cast(float, w_row.area_m2), - ventilation=ventilation, - ) - ) - - doors_by_room_id: dict[int, list[Door]] = {} - for d_row, d_id in zip(door_rows, door_ids): - room_id = cast(int, d_row.magic_plan_room_id) - dv_row = door_vent_by_door_id.get(d_id) - ventilation = ( - DoorVentilation(undercut_mm=dv_row.undercut_mm) - if dv_row is not None - else None - ) - doors_by_room_id.setdefault(room_id, []).append( - Door( - width_mm=cast(float, d_row.width_mm), - height_mm=cast(float, d_row.height_mm), - ventilation=ventilation, - ) - ) - - rooms_by_floor_id: dict[int, list[Room]] = {} - for r_row, r_id in zip(room_rows, room_ids): - floor_id = cast(int, r_row.magic_plan_floor_id) - rooms_by_floor_id.setdefault(floor_id, []).append( - Room( - name=cast(str, r_row.name), - width_m=cast(float, r_row.width_m), - length_m=cast(float, r_row.length_m), - area_m2=cast(float, r_row.area_m2), - windows=windows_by_room_id.get(r_id, []), - doors=doors_by_room_id.get(r_id, []), - ) - ) - - floors = [ - Floor( - level=f_row.level, - name=None, - rooms=rooms_by_floor_id.get(cast(int, f_row.id), []), - ) - for f_row in floor_rows - ] + rows = self._fetch_rows(cast(int, plan_row.id)) + windows_by_room = _build_windows(rows.windows, rows.win_vents) + doors_by_room = _build_doors(rows.doors, rows.door_vents) + rooms_by_floor = _build_rooms(rows.rooms, windows_by_room, doors_by_room) return Plan( uid=cast(str, plan_row.magic_plan_uid), name=plan_row.name, address=plan_row.address, postcode=plan_row.postcode, - floors=floors, + floors=[ + Floor(level=f.level, name=None, rooms=rooms_by_floor.get(cast(int, f.id), [])) + for f in rows.floors + ], ) + def _fetch_rows(self, plan_id: int) -> _Rows: + floor_rows: list[MagicPlanFloorModel] = self._fetch_many( + select(MagicPlanFloorModel).where( + col(MagicPlanFloorModel.magic_plan_plan_id) == plan_id + ) + ) + floor_ids = [cast(int, f.id) for f in floor_rows] + room_rows: list[MagicPlanRoomModel] = self._fetch_many( + select(MagicPlanRoomModel).where( + col(MagicPlanRoomModel.magic_plan_floor_id).in_(floor_ids) + ) + ) + room_ids = [cast(int, r.id) for r in room_rows] + window_rows: list[MagicPlanWindowModel] = self._fetch_many( + select(MagicPlanWindowModel).where( + col(MagicPlanWindowModel.magic_plan_room_id).in_(room_ids) + ) + ) + door_rows: list[MagicPlanDoorModel] = self._fetch_many( + select(MagicPlanDoorModel).where( + col(MagicPlanDoorModel.magic_plan_room_id).in_(room_ids) + ) + ) + win_vents: list[MagicPlanWindowVentilationModel] = self._fetch_many( + select(MagicPlanWindowVentilationModel).where( + col(MagicPlanWindowVentilationModel.magic_plan_window_id).in_( + [cast(int, w.id) for w in window_rows] + ) + ) + ) + door_vents: list[MagicPlanDoorVentilationModel] = self._fetch_many( + select(MagicPlanDoorVentilationModel).where( + col(MagicPlanDoorVentilationModel.magic_plan_door_id).in_( + [cast(int, d.id) for d in door_rows] + ) + ) + ) + return _Rows(floor_rows, room_rows, window_rows, door_rows, win_vents, door_vents) + + def _fetch_one(self, stmt: Any) -> Any: + return self._session.execute(stmt).scalars().one_or_none() # pyright: ignore[reportDeprecated] + + def _fetch_many(self, stmt: Any) -> Any: + return list(self._session.execute(stmt).scalars().all()) # pyright: ignore[reportDeprecated] + def save(self, plan: Plan, uploaded_file_id: int) -> None: plan_id = self._upsert_plan(plan, uploaded_file_id) self._delete_children(plan_id)