mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
210 lines
6.8 KiB
Python
210 lines
6.8 KiB
Python
import json
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import pytest
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.orm import Session
|
|
from sqlmodel import SQLModel
|
|
|
|
from datatypes.magicplan.api.response import MagicPlanPlan
|
|
from datatypes.magicplan.domain.mapper import map_plan
|
|
from datatypes.magicplan.domain.models import Door, Floor, Plan, Room, Window
|
|
|
|
from backend.app.db.functions.magic_plan_functions import (
|
|
get_plan_by_uploaded_file_id,
|
|
save_plan,
|
|
)
|
|
from backend.app.db.models.magic_plan import (
|
|
MagicPlanDoorModel,
|
|
MagicPlanFloorModel,
|
|
MagicPlanPlanModel,
|
|
MagicPlanRoomModel,
|
|
MagicPlanWindowModel,
|
|
)
|
|
|
|
FIXTURE_DIR = Path(__file__).parents[4] / "magic_plan"
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def domain_plan() -> Plan:
|
|
data = json.loads(
|
|
(FIXTURE_DIR / "magicplan_api_plan_response_example.json").read_text()
|
|
)
|
|
return map_plan(MagicPlanPlan.model_validate(data["data"]))
|
|
|
|
|
|
def _count(session: Session, model: type[SQLModel]) -> int:
|
|
return session.execute(select(func.count()).select_from(model)).scalar_one()
|
|
|
|
|
|
def test_plan_row_present_after_save(db_session: Session, domain_plan: Plan) -> None:
|
|
# Act
|
|
save_plan(db_session, domain_plan, 1)
|
|
# Assert
|
|
assert _count(db_session, MagicPlanPlanModel) == 1
|
|
|
|
|
|
def test_floor_count_matches_domain(db_session: Session, domain_plan: Plan) -> None:
|
|
# Arrange
|
|
expected = len(domain_plan.floors)
|
|
# Act
|
|
save_plan(db_session, domain_plan, 1)
|
|
# Assert
|
|
assert _count(db_session, MagicPlanFloorModel) == expected
|
|
|
|
|
|
def test_room_count_matches_domain(db_session: Session, domain_plan: Plan) -> None:
|
|
# Arrange
|
|
expected = sum(len(f.rooms) for f in domain_plan.floors)
|
|
# Act
|
|
save_plan(db_session, domain_plan, 1)
|
|
# Assert
|
|
assert _count(db_session, MagicPlanRoomModel) == expected
|
|
|
|
|
|
def test_window_count_matches_domain(db_session: Session, domain_plan: Plan) -> None:
|
|
# Arrange
|
|
expected = sum(len(r.windows) for f in domain_plan.floors for r in f.rooms)
|
|
# Act
|
|
save_plan(db_session, domain_plan, 1)
|
|
# Assert
|
|
assert _count(db_session, MagicPlanWindowModel) == expected
|
|
|
|
|
|
def test_door_count_matches_domain(db_session: Session, domain_plan: Plan) -> None:
|
|
# Arrange
|
|
expected = sum(len(r.doors) for f in domain_plan.floors for r in f.rooms)
|
|
# Act
|
|
save_plan(db_session, domain_plan, 1)
|
|
# Assert
|
|
assert _count(db_session, MagicPlanDoorModel) == expected
|
|
|
|
|
|
def test_save_plan_idempotent(db_session: Session, domain_plan: Plan) -> None:
|
|
# Act — call twice within the same session
|
|
save_plan(db_session, domain_plan, 1)
|
|
save_plan(db_session, domain_plan, 1)
|
|
# Assert — same row counts as a single call
|
|
assert _count(db_session, MagicPlanPlanModel) == 1
|
|
assert _count(db_session, MagicPlanFloorModel) == len(domain_plan.floors)
|
|
assert _count(db_session, MagicPlanRoomModel) == sum(
|
|
len(f.rooms) for f in domain_plan.floors
|
|
)
|
|
assert _count(db_session, MagicPlanWindowModel) == sum(
|
|
len(r.windows) for f in domain_plan.floors for r in f.rooms
|
|
)
|
|
assert _count(db_session, MagicPlanDoorModel) == sum(
|
|
len(r.doors) for f in domain_plan.floors for r in f.rooms
|
|
)
|
|
|
|
|
|
def test_get_plan_returns_none_when_not_found(db_session: Session) -> None:
|
|
# Act
|
|
result: Optional[Plan] = get_plan_by_uploaded_file_id(db_session, 999)
|
|
# Assert
|
|
assert result is None
|
|
|
|
|
|
def test_get_plan_returns_full_structure(db_session: Session) -> None:
|
|
# Arrange
|
|
plan = Plan(
|
|
uid="test-uid-1",
|
|
name="Test Plan",
|
|
address="1 Test St",
|
|
postcode="TE1 1ST",
|
|
floors=[
|
|
Floor(
|
|
level=0,
|
|
name=None,
|
|
rooms=[
|
|
Room(
|
|
name="Living Room",
|
|
width_m=4.0,
|
|
length_m=5.0,
|
|
area_m2=20.0,
|
|
windows=[Window(width_m=1.2, height_m=1.0, area_m2=1.2, opening_type="side_hung")],
|
|
doors=[Door(width_mm=800.0)],
|
|
),
|
|
Room(
|
|
name="Kitchen",
|
|
width_m=3.0,
|
|
length_m=4.0,
|
|
area_m2=12.0,
|
|
windows=[Window(width_m=0.9, height_m=1.0, area_m2=0.9, opening_type="top_hung")],
|
|
doors=[],
|
|
),
|
|
],
|
|
)
|
|
],
|
|
)
|
|
save_plan(db_session, plan, 42)
|
|
|
|
# Act
|
|
result: Optional[Plan] = get_plan_by_uploaded_file_id(db_session, 42)
|
|
|
|
# Assert
|
|
assert result is not None
|
|
assert result.uid == "test-uid-1"
|
|
assert result.name == "Test Plan"
|
|
assert result.address == "1 Test St"
|
|
assert result.postcode == "TE1 1ST"
|
|
assert len(result.floors) == 1
|
|
floor = result.floors[0]
|
|
assert floor.level == 0
|
|
assert len(floor.rooms) == 2
|
|
living = floor.rooms[0]
|
|
assert living.name == "Living Room"
|
|
assert living.width_m == 4.0
|
|
assert living.length_m == 5.0
|
|
assert living.area_m2 == 20.0
|
|
assert len(living.windows) == 1
|
|
assert living.windows[0].width_m == 1.2
|
|
assert living.windows[0].opening_type == "side_hung"
|
|
assert len(living.doors) == 1
|
|
assert living.doors[0].width_mm == 800.0
|
|
kitchen = floor.rooms[1]
|
|
assert kitchen.name == "Kitchen"
|
|
assert len(kitchen.windows) == 1
|
|
assert len(kitchen.doors) == 0
|
|
|
|
|
|
def test_get_plan_floors_ordered_by_level(db_session: Session) -> None:
|
|
# Arrange — floors inserted in reverse level order to confirm DB ordering applies
|
|
plan = Plan(
|
|
uid="test-uid-ordering",
|
|
name=None,
|
|
floors=[
|
|
Floor(level=2, name=None, rooms=[Room(name="Top Room", width_m=1.0, length_m=1.0, area_m2=1.0)]),
|
|
Floor(level=0, name=None, rooms=[Room(name="Ground Room", width_m=1.0, length_m=1.0, area_m2=1.0)]),
|
|
Floor(level=1, name=None, rooms=[Room(name="First Room", width_m=1.0, length_m=1.0, area_m2=1.0)]),
|
|
],
|
|
)
|
|
save_plan(db_session, plan, 43)
|
|
|
|
# Act
|
|
result: Optional[Plan] = get_plan_by_uploaded_file_id(db_session, 43)
|
|
|
|
# Assert
|
|
assert result is not None
|
|
assert [f.level for f in result.floors] == [0, 1, 2]
|
|
|
|
|
|
def test_uploaded_file_id_stored_after_save(db_session: Session, domain_plan: Plan) -> None:
|
|
# Act
|
|
save_plan(db_session, domain_plan, 1)
|
|
# Assert
|
|
row = db_session.execute(select(MagicPlanPlanModel)).scalar_one()
|
|
assert row.uploaded_file_id == 1
|
|
|
|
|
|
def test_save_plan_updates_uploaded_file_id_on_reingest(
|
|
db_session: Session, domain_plan: Plan
|
|
) -> None:
|
|
# Arrange
|
|
save_plan(db_session, domain_plan, 1)
|
|
# Act
|
|
save_plan(db_session, domain_plan, 2)
|
|
# Assert
|
|
row = db_session.execute(select(MagicPlanPlanModel)).scalar_one()
|
|
assert row.uploaded_file_id == 2
|