refactor magicplan in ddd structure

This commit is contained in:
Daniel Roth 2026-06-03 17:20:20 +00:00
parent 010a576a4a
commit 174ef26075
18 changed files with 335 additions and 363 deletions

View file

@ -1,143 +0,0 @@
from typing import Any, cast
from sqlalchemy import delete, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlmodel import Session, col
from datatypes.magicplan.domain.models import Floor, Plan
from backend.app.db.models.magic_plan import (
MagicPlanDoorModel,
MagicPlanFloorModel,
MagicPlanPlanModel,
MagicPlanRoomModel,
MagicPlanWindowModel,
)
def save_plan(session: Session, plan: Plan, uploaded_file_id: int) -> None:
plan_id: int = _upsert_plan(session, plan, uploaded_file_id)
_delete_children(session, plan_id)
floor_ids: list[int] = _insert_floors(session, plan.floors, plan_id)
room_ids: list[int] = _insert_rooms(session, plan.floors, floor_ids)
_insert_windows_and_doors(session, plan.floors, room_ids)
def _upsert_plan(session: Session, plan: Plan, uploaded_file_id: int) -> int:
stmt = (
pg_insert(MagicPlanPlanModel)
.values(
magic_plan_uid=plan.uid,
name=plan.name,
address=plan.address,
postcode=plan.postcode,
uploaded_file_id=uploaded_file_id,
)
.on_conflict_do_update(
index_elements=["magic_plan_uid"],
set_={
"name": plan.name,
"address": plan.address,
"postcode": plan.postcode,
"uploaded_file_id": uploaded_file_id,
},
)
.returning(col(MagicPlanPlanModel.id))
)
row_id: int = session.execute(stmt).scalar_one()
return row_id
def _delete_children(session: Session, plan_id: int) -> None:
floor_subq = (
select(col(MagicPlanFloorModel.id))
.where(col(MagicPlanFloorModel.magic_plan_plan_id) == plan_id)
.scalar_subquery()
)
room_subq = (
select(col(MagicPlanRoomModel.id))
.where(col(MagicPlanRoomModel.magic_plan_floor_id).in_(floor_subq))
.scalar_subquery()
)
session.execute(
delete(MagicPlanWindowModel).where(
col(MagicPlanWindowModel.magic_plan_room_id).in_(room_subq)
)
)
session.execute(
delete(MagicPlanDoorModel).where(
col(MagicPlanDoorModel.magic_plan_room_id).in_(room_subq)
)
)
session.execute(
delete(MagicPlanRoomModel).where(
col(MagicPlanRoomModel.magic_plan_floor_id).in_(floor_subq)
)
)
session.execute(
delete(MagicPlanFloorModel).where(
col(MagicPlanFloorModel.magic_plan_plan_id) == plan_id
)
)
def _insert_floors(session: Session, floors: list[Floor], plan_id: int) -> list[int]:
rows: list[dict[str, Any]] = [
{"magic_plan_plan_id": plan_id, "level": floor.level} for floor in floors
]
result = session.execute(
pg_insert(MagicPlanFloorModel)
.values(rows)
.returning(col(MagicPlanFloorModel.id))
)
return cast(list[int], list(result.scalars().all()))
def _insert_rooms(
session: Session, floors: list[Floor], floor_ids: list[int]
) -> list[int]:
rows: list[dict[str, Any]] = [
{
"magic_plan_floor_id": floor_id,
"name": room.name,
"width_m": room.width_m,
"length_m": room.length_m,
"area_m2": room.area_m2,
}
for floor, floor_id in zip(floors, floor_ids)
for room in floor.rooms
]
result = session.execute(
pg_insert(MagicPlanRoomModel).values(rows).returning(col(MagicPlanRoomModel.id))
)
return cast(list[int], list(result.scalars().all()))
def _insert_windows_and_doors(
session: Session, floors: list[Floor], room_ids: list[int]
) -> None:
all_rooms = [room for floor in floors for room in floor.rooms]
window_rows: list[dict[str, Any]] = [
{
"magic_plan_room_id": room_id,
"width_m": window.width_m,
"height_m": window.height_m,
"area_m2": window.area_m2,
"opening_type": window.opening_type,
}
for room, room_id in zip(all_rooms, room_ids)
for window in room.windows
]
door_rows: list[dict[str, Any]] = [
{
"magic_plan_room_id": room_id,
"width_mm": door.width_mm,
}
for room, room_id in zip(all_rooms, room_ids)
for door in room.doors
]
if window_rows:
session.execute(pg_insert(MagicPlanWindowModel).values(window_rows))
if door_rows:
session.execute(pg_insert(MagicPlanDoorModel).values(door_rows))

View file

@ -1,41 +0,0 @@
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlmodel import SQLModel
import backend.app.db.models.magic_plan # noqa: F401 — registers MagicPlan models with SQLModel.metadata
# TODO: promote to backend/app/db/conftest.py once a second DB-touching test directory appears under this tree
@pytest.fixture(scope="function")
def engine(postgresql):
connection_string = (
f"postgresql+psycopg://"
f"{postgresql.info.user}:"
f"{postgresql.info.password}@"
f"{postgresql.info.host}:"
f"{postgresql.info.port}/"
f"{postgresql.info.dbname}"
)
engine = create_engine(connection_string)
SQLModel.metadata.create_all(engine)
yield engine
SQLModel.metadata.drop_all(engine)
engine.dispose()
@pytest.fixture(scope="function")
def db_session(engine):
connection = engine.connect()
transaction = connection.begin()
session = sessionmaker(bind=connection)()
yield session
session.close()
transaction.rollback()
connection.close()

View file

@ -1,115 +0,0 @@
import json
from pathlib import Path
import pytest
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from sqlmodel import SQLModel
from datatypes.magicplan.api.response import MagicPlanPlan
from datatypes.magicplan.domain.mapper import map_plan
from datatypes.magicplan.domain.models import Plan
from backend.app.db.functions.magic_plan_functions import save_plan
from backend.app.db.models.magic_plan import (
MagicPlanDoorModel,
MagicPlanFloorModel,
MagicPlanPlanModel,
MagicPlanRoomModel,
MagicPlanWindowModel,
)
FIXTURE_DIR = Path(__file__).parents[4] / "magic_plan"
@pytest.fixture(scope="module")
def domain_plan() -> Plan:
data = json.loads(
(FIXTURE_DIR / "magicplan_api_plan_response_example.json").read_text()
)
return map_plan(MagicPlanPlan.model_validate(data["data"]))
def _count(session: Session, model: type[SQLModel]) -> int:
return session.execute(select(func.count()).select_from(model)).scalar_one()
def test_plan_row_present_after_save(db_session: Session, domain_plan: Plan) -> None:
# Act
save_plan(db_session, domain_plan, 1)
# Assert
assert _count(db_session, MagicPlanPlanModel) == 1
def test_floor_count_matches_domain(db_session: Session, domain_plan: Plan) -> None:
# Arrange
expected = len(domain_plan.floors)
# Act
save_plan(db_session, domain_plan, 1)
# Assert
assert _count(db_session, MagicPlanFloorModel) == expected
def test_room_count_matches_domain(db_session: Session, domain_plan: Plan) -> None:
# Arrange
expected = sum(len(f.rooms) for f in domain_plan.floors)
# Act
save_plan(db_session, domain_plan, 1)
# Assert
assert _count(db_session, MagicPlanRoomModel) == expected
def test_window_count_matches_domain(db_session: Session, domain_plan: Plan) -> None:
# Arrange
expected = sum(len(r.windows) for f in domain_plan.floors for r in f.rooms)
# Act
save_plan(db_session, domain_plan, 1)
# Assert
assert _count(db_session, MagicPlanWindowModel) == expected
def test_door_count_matches_domain(db_session: Session, domain_plan: Plan) -> None:
# Arrange
expected = sum(len(r.doors) for f in domain_plan.floors for r in f.rooms)
# Act
save_plan(db_session, domain_plan, 1)
# Assert
assert _count(db_session, MagicPlanDoorModel) == expected
def test_save_plan_idempotent(db_session: Session, domain_plan: Plan) -> None:
# Act — call twice within the same session
save_plan(db_session, domain_plan, 1)
save_plan(db_session, domain_plan, 1)
# Assert — same row counts as a single call
assert _count(db_session, MagicPlanPlanModel) == 1
assert _count(db_session, MagicPlanFloorModel) == len(domain_plan.floors)
assert _count(db_session, MagicPlanRoomModel) == sum(
len(f.rooms) for f in domain_plan.floors
)
assert _count(db_session, MagicPlanWindowModel) == sum(
len(r.windows) for f in domain_plan.floors for r in f.rooms
)
assert _count(db_session, MagicPlanDoorModel) == sum(
len(r.doors) for f in domain_plan.floors for r in f.rooms
)
def test_uploaded_file_id_stored_after_save(db_session: Session, domain_plan: Plan) -> None:
# Act
save_plan(db_session, domain_plan, 1)
# Assert
row = db_session.execute(select(MagicPlanPlanModel)).scalar_one()
assert row.uploaded_file_id == 1
def test_save_plan_updates_uploaded_file_id_on_reingest(
db_session: Session, domain_plan: Plan
) -> None:
# Arrange
save_plan(db_session, domain_plan, 1)
# Act
save_plan(db_session, domain_plan, 2)
# Assert
row = db_session.execute(select(MagicPlanPlanModel)).scalar_one()
assert row.uploaded_file_id == 2

View file

@ -1,53 +0,0 @@
from typing import Optional
from sqlmodel import Field, SQLModel
class MagicPlanPlanModel(SQLModel, table=True):
__tablename__ = "magic_plan_plan"
id: Optional[int] = Field(default=None, primary_key=True)
magic_plan_uid: Optional[str] = Field(default=None, unique=True, index=True)
name: Optional[str] = None
address: Optional[str] = None
postcode: Optional[str] = None
uploaded_file_id: Optional[int] = Field(default=None)
class MagicPlanFloorModel(SQLModel, table=True):
__tablename__ = "magic_plan_floor"
id: Optional[int] = Field(default=None, primary_key=True)
magic_plan_plan_id: int = Field(foreign_key="magic_plan_plan.id")
level: Optional[int] = None
class MagicPlanRoomModel(SQLModel, table=True):
__tablename__ = "magic_plan_room"
id: Optional[int] = Field(default=None, primary_key=True)
magic_plan_floor_id: int = Field(foreign_key="magic_plan_floor.id")
name: Optional[str] = None
width_m: Optional[float] = None
length_m: Optional[float] = None
area_m2: Optional[float] = None
class MagicPlanWindowModel(SQLModel, table=True):
__tablename__ = "magic_plan_window"
id: Optional[int] = Field(default=None, primary_key=True)
magic_plan_room_id: int = Field(foreign_key="magic_plan_room.id")
width_m: Optional[float] = None
height_m: Optional[float] = None
area_m2: Optional[float] = None
opening_type: Optional[str] = None
class MagicPlanDoorModel(SQLModel, table=True):
__tablename__ = "magic_plan_door"
id: Optional[int] = Field(default=None, primary_key=True)
magic_plan_room_id: int = Field(foreign_key="magic_plan_room.id")
width_mm: Optional[float] = None
type: Optional[str] = None

View file

@ -1,8 +1,8 @@
from typing import Any
from backend.app.config import get_settings
from backend.magic_plan.magic_plan_client import MagicPlanClient
from backend.magic_plan.magic_plan_service import MagicPlanService
from infrastructure.magic_plan.magic_plan_client import MagicPlanClient
from orchestration.magic_plan_orchestrator import MagicPlanService
from backend.magic_plan.magic_plan_trigger_request import MagicPlanTriggerRequest
from datatypes.magicplan.domain.models import Plan
from backend.app.db.models.tasks import SourceEnum

View file

@ -8,5 +8,8 @@ RUN pip install --no-cache-dir -r requirements.txt
COPY utils/ utils/
COPY backend/ backend/
COPY datatypes/ datatypes/
COPY orchestration/ orchestration/
COPY repositories/ repositories/
COPY infrastructure/ infrastructure/
CMD ["backend.magic_plan.handler.handler"]

View file

@ -0,0 +1,95 @@
from __future__ import annotations
from typing import ClassVar, Optional
from sqlmodel import Field, SQLModel
from datatypes.magicplan.domain.models import Door, Floor, Plan, Room, Window
class MagicPlanPlanModel(SQLModel, table=True):
__tablename__: ClassVar[str] = "magic_plan_plan" # pyright: ignore[reportIncompatibleVariableOverride]
id: Optional[int] = Field(default=None, primary_key=True)
magic_plan_uid: Optional[str] = Field(default=None, unique=True, index=True)
name: Optional[str] = None
address: Optional[str] = None
postcode: Optional[str] = None
uploaded_file_id: Optional[int] = Field(default=None)
@classmethod
def from_domain(cls, plan: Plan, uploaded_file_id: int) -> "MagicPlanPlanModel":
return cls(
magic_plan_uid=plan.uid,
name=plan.name,
address=plan.address,
postcode=plan.postcode,
uploaded_file_id=uploaded_file_id,
)
class MagicPlanFloorModel(SQLModel, table=True):
__tablename__: ClassVar[str] = "magic_plan_floor" # pyright: ignore[reportIncompatibleVariableOverride]
id: Optional[int] = Field(default=None, primary_key=True)
magic_plan_plan_id: int = Field(foreign_key="magic_plan_plan.id")
level: Optional[int] = None
@classmethod
def from_domain(cls, floor: Floor, plan_id: int) -> "MagicPlanFloorModel":
return cls(magic_plan_plan_id=plan_id, level=floor.level)
class MagicPlanRoomModel(SQLModel, table=True):
__tablename__: ClassVar[str] = "magic_plan_room" # pyright: ignore[reportIncompatibleVariableOverride]
id: Optional[int] = Field(default=None, primary_key=True)
magic_plan_floor_id: int = Field(foreign_key="magic_plan_floor.id")
name: Optional[str] = None
width_m: Optional[float] = None
length_m: Optional[float] = None
area_m2: Optional[float] = None
@classmethod
def from_domain(cls, room: Room, floor_id: int) -> "MagicPlanRoomModel":
return cls(
magic_plan_floor_id=floor_id,
name=room.name,
width_m=room.width_m,
length_m=room.length_m,
area_m2=room.area_m2,
)
class MagicPlanWindowModel(SQLModel, table=True):
__tablename__: ClassVar[str] = "magic_plan_window" # pyright: ignore[reportIncompatibleVariableOverride]
id: Optional[int] = Field(default=None, primary_key=True)
magic_plan_room_id: int = Field(foreign_key="magic_plan_room.id")
width_m: Optional[float] = None
height_m: Optional[float] = None
area_m2: Optional[float] = None
opening_type: Optional[str] = None
@classmethod
def from_domain(cls, window: Window, room_id: int) -> "MagicPlanWindowModel":
return cls(
magic_plan_room_id=room_id,
width_m=window.width_m,
height_m=window.height_m,
area_m2=window.area_m2,
opening_type=window.opening_type,
)
class MagicPlanDoorModel(SQLModel, table=True):
__tablename__: ClassVar[str] = "magic_plan_door" # pyright: ignore[reportIncompatibleVariableOverride]
id: Optional[int] = Field(default=None, primary_key=True)
magic_plan_room_id: int = Field(foreign_key="magic_plan_room.id")
width_mm: Optional[float] = None
type: Optional[str] = None
@classmethod
def from_domain(cls, door: Door, room_id: int) -> "MagicPlanDoorModel":
return cls(magic_plan_room_id=room_id, width_mm=door.width_mm)

View file

@ -8,14 +8,16 @@ from datatypes.magicplan.domain.mapper import map_plan
from datatypes.magicplan.domain.models import Plan
from backend.app.db.connection import db_session
from backend.app.db.functions.magic_plan_functions import save_plan
from backend.app.db.models.uploaded_file import (
FileSourceEnum,
FileTypeEnum,
UploadedFile,
)
from backend.magic_plan.address_matcher import find_matching_plan
from backend.magic_plan.magic_plan_client import MagicPlanClient
from repositories.magic_plan.magic_plan_postgres_repository import (
MagicPlanPostgresRepository,
)
from infrastructure.magic_plan.magic_plan_client import MagicPlanClient
from backend.magic_plan.magic_plan_trigger_request import MagicPlanTriggerRequest
from utils.logger import setup_logger
from utils.s3 import save_data_to_s3
@ -57,7 +59,7 @@ class MagicPlanService:
with db_session() as session:
session.add(uploaded_file)
session.flush()
save_plan(session, plan, cast(int, uploaded_file.id))
MagicPlanPostgresRepository(session).save(plan, cast(int, uploaded_file.id))
return plan

View file

@ -0,0 +1,122 @@
from __future__ import annotations
from typing import Any, cast
from sqlalchemy import delete, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlmodel import Session, col
from datatypes.magicplan.domain.models import Floor, Plan
from infrastructure.postgres.magic_plan_tables import (
MagicPlanDoorModel,
MagicPlanFloorModel,
MagicPlanPlanModel,
MagicPlanRoomModel,
MagicPlanWindowModel,
)
from repositories.magic_plan.magic_plan_repository import MagicPlanRepository
class MagicPlanPostgresRepository(MagicPlanRepository):
def __init__(self, session: Session) -> None:
self._session = session
def save(self, plan: Plan, uploaded_file_id: int) -> None:
plan_id = self._upsert_plan(plan, uploaded_file_id)
self._delete_children(plan_id)
floor_ids = self._insert_floors(plan.floors, plan_id)
room_ids = self._insert_rooms(plan.floors, floor_ids)
self._insert_windows_and_doors(plan.floors, room_ids)
def _upsert_plan(self, plan: Plan, uploaded_file_id: int) -> int:
row_data: dict[str, Any] = MagicPlanPlanModel.from_domain(
plan, uploaded_file_id
).model_dump(exclude={"id"})
stmt = (
pg_insert(MagicPlanPlanModel)
.values(**row_data)
.on_conflict_do_update(
index_elements=["magic_plan_uid"],
set_={k: v for k, v in row_data.items() if k != "magic_plan_uid"},
)
.returning(col(MagicPlanPlanModel.id))
)
return cast(int, self._session.execute(stmt).scalar_one()) # pyright: ignore[reportDeprecated]
def _delete_children(self, plan_id: int) -> None:
floor_subq = (
select(col(MagicPlanFloorModel.id))
.where(col(MagicPlanFloorModel.magic_plan_plan_id) == plan_id)
.scalar_subquery()
)
room_subq = (
select(col(MagicPlanRoomModel.id))
.where(col(MagicPlanRoomModel.magic_plan_floor_id).in_(floor_subq))
.scalar_subquery()
)
self._session.execute( # pyright: ignore[reportDeprecated]
delete(MagicPlanWindowModel).where(
col(MagicPlanWindowModel.magic_plan_room_id).in_(room_subq)
)
)
self._session.execute( # pyright: ignore[reportDeprecated]
delete(MagicPlanDoorModel).where(
col(MagicPlanDoorModel.magic_plan_room_id).in_(room_subq)
)
)
self._session.execute( # pyright: ignore[reportDeprecated]
delete(MagicPlanRoomModel).where(
col(MagicPlanRoomModel.magic_plan_floor_id).in_(floor_subq)
)
)
self._session.execute( # pyright: ignore[reportDeprecated]
delete(MagicPlanFloorModel).where(
col(MagicPlanFloorModel.magic_plan_plan_id) == plan_id
)
)
def _insert_floors(self, floors: list[Floor], plan_id: int) -> list[int]:
rows: list[dict[str, Any]] = [
MagicPlanFloorModel.from_domain(floor, plan_id).model_dump(exclude={"id"})
for floor in floors
]
result = self._session.execute( # pyright: ignore[reportDeprecated]
pg_insert(MagicPlanFloorModel)
.values(rows)
.returning(col(MagicPlanFloorModel.id))
)
return cast(list[int], list(result.scalars().all()))
def _insert_rooms(self, floors: list[Floor], floor_ids: list[int]) -> list[int]:
rows: list[dict[str, Any]] = [
MagicPlanRoomModel.from_domain(room, floor_id).model_dump(exclude={"id"})
for floor, floor_id in zip(floors, floor_ids)
for room in floor.rooms
]
result = self._session.execute( # pyright: ignore[reportDeprecated]
pg_insert(MagicPlanRoomModel)
.values(rows)
.returning(col(MagicPlanRoomModel.id))
)
return cast(list[int], list(result.scalars().all()))
def _insert_windows_and_doors(
self, floors: list[Floor], room_ids: list[int]
) -> None:
all_rooms = [room for floor in floors for room in floor.rooms]
window_rows: list[dict[str, Any]] = [
MagicPlanWindowModel.from_domain(window, room_id).model_dump(
exclude={"id"}
)
for room, room_id in zip(all_rooms, room_ids)
for window in room.windows
]
door_rows: list[dict[str, Any]] = [
MagicPlanDoorModel.from_domain(door, room_id).model_dump(exclude={"id"})
for room, room_id in zip(all_rooms, room_ids)
for door in room.doors
]
if window_rows:
self._session.execute(pg_insert(MagicPlanWindowModel).values(window_rows)) # pyright: ignore[reportDeprecated]
if door_rows:
self._session.execute(pg_insert(MagicPlanDoorModel).values(door_rows)) # pyright: ignore[reportDeprecated]

View file

@ -0,0 +1,18 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from datatypes.magicplan.domain.models import Plan
class MagicPlanRepository(ABC):
"""Persists a MagicPlan aggregate.
Implementations are expected to write child rows using bulk inserts
(one round-trip per table) the codebase targets Postgres exclusively.
Upsert semantics on magic_plan_uid; child rows are deleted and re-inserted
on each save (idempotent re-run behaviour per ADR-0012).
"""
@abstractmethod
def save(self, plan: Plan, uploaded_file_id: int) -> None: ...

View file

@ -6,7 +6,7 @@ from unittest.mock import MagicMock, patch
import pytest
import requests
from backend.magic_plan.magic_plan_client import MagicPlanClient
from infrastructure.magic_plan.magic_plan_client import MagicPlanClient
from datatypes.magicplan.api.response import MagicPlanPlan, PlanSummary
FIXTURE_DIR = Path(__file__).parents[2] / "magic_plan"
@ -129,11 +129,17 @@ def test_get_plans_multi_page_fetches_all_pages(
page2_plan = {**page1_plan, "id": "page-2-plan-id"}
page1_response = MagicMock()
page1_response.json.return_value = {
"data": {"paging": {"page": 1, "next_page": True, "count": 2}, "plans": [page1_plan]}
"data": {
"paging": {"page": 1, "next_page": True, "count": 2},
"plans": [page1_plan],
}
}
page2_response = MagicMock()
page2_response.json.return_value = {
"data": {"paging": {"page": 2, "next_page": False, "count": 2}, "plans": [page2_plan]}
"data": {
"paging": {"page": 2, "next_page": False, "count": 2},
"plans": [page2_plan],
}
}
mock_session.get.side_effect = [page1_response, page2_response]
# Act

View file

@ -13,8 +13,8 @@ from backend.app.db.models.uploaded_file import (
FileTypeEnum,
UploadedFile,
)
from backend.magic_plan.magic_plan_client import MagicPlanClient
from backend.magic_plan.magic_plan_service import MagicPlanService
from infrastructure.magic_plan.magic_plan_client import MagicPlanClient
from orchestration.magic_plan_orchestrator import MagicPlanService
from backend.magic_plan.magic_plan_trigger_request import MagicPlanTriggerRequest
FIXTURE_DIR = Path(__file__).parents[2] / "magic_plan"
@ -267,7 +267,10 @@ def test_run_creates_uploaded_file_record(
assert uploaded_file.file_source == FileSourceEnum.MAGIC_PLAN.value
assert uploaded_file.file_type == FileTypeEnum.MAGIC_PLAN_JSON.value
assert uploaded_file.s3_file_bucket == S3_BUCKET
assert uploaded_file.s3_file_key == f"documents/uprn/100023336956/magic_plan_{plan_summary.id}.json.gz"
assert (
uploaded_file.s3_file_key
== f"documents/uprn/100023336956/magic_plan_{plan_summary.id}.json.gz"
)
assert uploaded_file.s3_upload_timestamp is not None
assert uploaded_file.uprn == 100023336956
assert uploaded_file.hubspot_deal_id == "deal-789"

View file

@ -0,0 +1,75 @@
from __future__ import annotations
from sqlalchemy import Engine
from sqlmodel import Session, select
from datatypes.magicplan.domain.models import Door, Floor, Plan, Room, Window
from infrastructure.postgres.magic_plan_tables import (
MagicPlanDoorModel,
MagicPlanFloorModel,
MagicPlanPlanModel,
MagicPlanRoomModel,
MagicPlanWindowModel,
)
from repositories.magic_plan.magic_plan_postgres_repository import (
MagicPlanPostgresRepository,
)
def _plan() -> Plan:
window = Window(width_m=1.2, height_m=1.5, area_m2=1.8, opening_type="casement")
door = Door(width_mm=762.0)
room = Room(
name="Living Room",
width_m=4.0,
length_m=5.0,
area_m2=20.0,
windows=[window],
doors=[door],
)
floor = Floor(level=0, name="Ground Floor", rooms=[room])
return Plan(
uid="test-uid-abc123",
name="Test Plan",
address="1 Test Street",
postcode="TE1 1ST",
floors=[floor],
)
def test_save_writes_all_rows(db_engine: Engine) -> None:
# Arrange
plan = _plan()
# Act
with Session(db_engine) as session:
MagicPlanPostgresRepository(session).save(plan, uploaded_file_id=1)
session.commit()
# Assert — one row written to every table in the aggregate
with Session(db_engine) as session:
assert len(session.exec(select(MagicPlanPlanModel)).all()) == 1
assert len(session.exec(select(MagicPlanFloorModel)).all()) == 1
assert len(session.exec(select(MagicPlanRoomModel)).all()) == 1
assert len(session.exec(select(MagicPlanWindowModel)).all()) == 1
assert len(session.exec(select(MagicPlanDoorModel)).all()) == 1
def test_save_is_idempotent(db_engine: Engine) -> None:
# Arrange
plan = _plan()
# Act — save twice with the same plan uid; second must overwrite, not append
with Session(db_engine) as session:
repo = MagicPlanPostgresRepository(session)
repo.save(plan, uploaded_file_id=1)
repo.save(plan, uploaded_file_id=1)
session.commit()
# Assert — row counts do not double
with Session(db_engine) as session:
assert len(session.exec(select(MagicPlanPlanModel)).all()) == 1
assert len(session.exec(select(MagicPlanFloorModel)).all()) == 1
assert len(session.exec(select(MagicPlanRoomModel)).all()) == 1
assert len(session.exec(select(MagicPlanWindowModel)).all()) == 1
assert len(session.exec(select(MagicPlanDoorModel)).all()) == 1