tests wrong environemnt

This commit is contained in:
Jun-te Kim 2026-05-29 16:17:06 +00:00
parent 47dfe34ec0
commit 3e30b4af40
6 changed files with 226 additions and 30 deletions

View file

@ -17,7 +17,7 @@ from domain.landlord_description_overrides.wall_type_construction_dates import (
from infrastructure.chatgpt.chatgpt import ChatGPT
from infrastructure.chatgpt.chatgpt_column_classifier import ChatGptColumnClassifier
from infrastructure.postgres.config import PostgresConfig
from infrastructure.postgres.engine import make_engine, transactional_session
from infrastructure.postgres.engine import commit_scope, make_engine, make_session
from infrastructure.postgres.landlord_built_form_type_override_postgres_repository import (
LandlordBuiltFormTypeOverridePostgresRepository,
)
@ -130,16 +130,26 @@ def handler(
rows = csv_client.read_rows(trigger.s3_uri)
engine = make_engine(PostgresConfig.from_env(os.environ))
with transactional_session(engine) as session:
# The session is built up front (SQLModel sessions are lazy, so no
# connection is checked out yet) and owned by this handler. Classification
# runs first and calls ChatGPT, which is slow; we deliberately keep no
# transaction open across it. Only the persistence below -- inside
# ``commit_scope`` -- holds a connection.
session = make_session(engine)
try:
chat_gpt = ChatGPT()
columns = _build_columns(trigger.column_mapping, chat_gpt, session)
orchestrator = LandlordDescriptionOverridesOrchestrator(
unstandardised_address_repo=unstandardised_address_repo,
columns=columns,
)
classified = orchestrator.classify_and_persist_from_rows(
rows, portfolio_id=trigger.portfolio_id
)
classified = orchestrator.classify_from_rows(rows)
with commit_scope(session):
orchestrator.persist(classified, portfolio_id=trigger.portfolio_id)
finally:
session.close()
counts = {name: len(mapping) for name, mapping in classified.items()}
for name, n in counts.items():

View file

@ -2,4 +2,4 @@ boto3
pydantic
sqlmodel
psycopg2-binary
openai
openai==1.93.0

View file

@ -40,3 +40,21 @@ def transactional_session(engine: Engine) -> Iterator[Session]:
raise
finally:
session.close()
@contextmanager # pyright: ignore[reportDeprecated]
def commit_scope(session: Session) -> Iterator[Session]:
"""Commit a caller-owned session on clean exit; roll back on error.
Like ``transactional_session`` but for a session the caller already holds
and will close itself. Use it to keep slow, non-DB work *outside* the
transaction: build the session, run the slow work, then enter
``commit_scope`` only for the persistence -- so a connection is checked out
(SQLModel sessions are lazy) for the shortest possible window.
"""
try:
yield session
session.commit()
except Exception:
session.rollback()
raise

View file

@ -60,50 +60,73 @@ class LandlordDescriptionOverridesOrchestrator:
for column in self._columns
}
def persist(
self, classified: dict[str, dict[str, Enum]], portfolio_id: int
) -> None:
"""Persist already-classified results via each column's repository.
``classified`` is keyed by ``ClassifiableColumn.name`` -- the shape
``classify_columns`` and ``classify_from_rows`` return. Each non-empty
mapping is written through the column's own repo under
``source = 'classifier'``; an empty mapping (a registered column absent
from this batch) skips the DB round-trip.
The orchestrator does not commit -- the caller owns the transaction
boundary, and is expected to open it only around this call so the
slow classification never holds a connection.
"""
for column in self._columns:
mapping = classified.get(column.name)
if not mapping:
continue
column.repo.upsert_all(portfolio_id, mapping)
def classify_and_persist(
self, addresses: AddressList, portfolio_id: int
) -> dict[str, dict[str, Enum]]:
"""Classify every registered column and persist the results.
Each non-empty mapping is written via the column's repository under
``source = 'classifier'``. Empty mappings (a registered column whose
``source_column`` is absent from this batch) skip the DB round-trip.
The orchestrator does not commit -- the caller owns the transaction
boundary.
Returns the same shape as ``classify_columns`` so callers can log
per-column counts.
"""
classified = self.classify_columns(addresses)
for column in self._columns:
mapping = classified[column.name]
if not mapping:
continue
column.repo.upsert_all(portfolio_id, mapping)
self.persist(classified, portfolio_id)
return classified
def classify_and_persist_from_rows(
self, rows: list[dict[str, str]], portfolio_id: int
def classify_from_rows(
self, rows: list[dict[str, str]]
) -> dict[str, dict[str, Enum]]:
"""Classify + persist straight from raw CSV rows.
"""Classify raw CSV rows without touching the database.
Unlike ``classify_and_persist``, this does not build an ``AddressList``,
so it has no canonical address/postcode requirement -- the classifier
only needs the raw description cells. Used when reading the original
The classification half of ``classify_and_persist_from_rows``, split
out so a caller can run the slow ChatGPT work *before* opening a
transaction and then write the finished results with ``persist`` inside
one short-lived connection.
Unlike the ``AddressList`` path this builds no ``AddressList``, so it
has no canonical address/postcode requirement -- the classifier only
needs the raw description cells. Used when reading the original
landlord upload (raw headers) rather than the address-matching CSV.
"""
col_to_desc = self._descriptions_from_rows(rows)
classified = {
return {
column.name: column.classifier.classify(
col_to_desc.get(column.source_column, set())
)
for column in self._columns
}
for column in self._columns:
mapping = classified[column.name]
if not mapping:
continue
column.repo.upsert_all(portfolio_id, mapping)
def classify_and_persist_from_rows(
self, rows: list[dict[str, str]], portfolio_id: int
) -> dict[str, dict[str, Enum]]:
"""Classify + persist straight from raw CSV rows in one call.
A convenience composition of ``classify_from_rows`` + ``persist``.
Prefer calling the two separately when classification is slow, so the
transaction opens only around ``persist`` (see the Lambda handler).
"""
classified = self.classify_from_rows(rows)
self.persist(classified, portfolio_id)
return classified
@staticmethod

View file

@ -10,4 +10,5 @@ fuzzywuzzy
pymupdf
playwright==1.58.0
msal
moto[s3,sqs]
moto[s3,sqs]
openai==1.93.0

View file

@ -323,3 +323,147 @@ def test_classify_and_persist_skips_upsert_for_a_column_absent_from_the_batch()
# assert: Property Type wrote; Walls did not.
assert property_type_repo.calls == [(7, {"semi-detached": PropertyType.HOUSE})]
assert wall_type_repo.calls == []
def test_classify_from_rows_classifies_each_column_without_persisting() -> None:
# arrange: raw CSV rows (not an AddressList) carry two classifiable columns.
rows = [{"Property Type": "semi-detached", "Walls": "solid brick"}]
property_types = _StubColumnClassifier({"semi-detached": PropertyType.HOUSE})
wall_types = _StubColumnClassifier(
{"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED}
)
property_type_repo = _StubLandlordOverrideRepository()
wall_type_repo = _StubLandlordOverrideRepository()
# act
result = _orchestrator(
[
_column("property_type", "Property Type", property_types, property_type_repo),
_column("wall_type", "Walls", wall_types, wall_type_repo),
]
).classify_from_rows(rows)
# assert: each classifier ran against its column's descriptions, keyed by
# name -- and NOT a single repo was touched (classification is DB-free, so
# the slow ChatGPT work can run before any transaction opens).
assert result == {
"property_type": {"semi-detached": PropertyType.HOUSE},
"wall_type": {"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED},
}
assert property_type_repo.calls == []
assert wall_type_repo.calls == []
def test_classify_from_rows_splits_and_normalises_descriptions() -> None:
# arrange: one cell packs several descriptions with inconsistent casing,
# spread across rows. The rows path must fold them exactly like the
# AddressList path: comma-split, trimmed, lower-cased, de-duped.
rows = [
{"Walls": "Solid Brick, cavity"},
{"Walls": "SOLID BRICK"},
]
wall_types = _StubColumnClassifier({})
# act
_orchestrator(
[_column("wall_type", "Walls", wall_types)]
).classify_from_rows(rows)
# assert: the classifier saw one normalised entry per distinct variant.
assert wall_types.received == {"solid brick", "cavity"}
def test_classify_from_rows_yields_empty_mapping_for_an_absent_column() -> None:
# arrange: a column is registered for a header the rows lack.
rows = [{"Walls": "cavity"}]
property_types = _StubColumnClassifier({})
# act
result = _orchestrator(
[_column("property_type", "Property Type", property_types)]
).classify_from_rows(rows)
# assert: the absent column classified an empty description set.
assert result == {"property_type": {}}
assert property_types.received == set()
def test_persist_routes_each_columns_mapping_to_its_own_repo() -> None:
# arrange: a finished ``classified`` mapping (as classify_* would return)
# and two columns with distinct repos.
property_type_repo = _StubLandlordOverrideRepository()
wall_type_repo = _StubLandlordOverrideRepository()
columns: list[ClassifiableColumn[Any]] = [
_column("property_type", "Property Type", _StubColumnClassifier({}), property_type_repo),
_column("wall_type", "Walls", _StubColumnClassifier({}), wall_type_repo),
]
classified: dict[str, dict[str, Enum]] = {
"property_type": {"semi-detached": PropertyType.HOUSE},
"wall_type": {"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED},
}
# act
_orchestrator(columns).persist(classified, portfolio_id=42)
# assert: each repo received exactly its own column's mapping.
assert property_type_repo.calls == [(42, {"semi-detached": PropertyType.HOUSE})]
assert wall_type_repo.calls == [
(42, {"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED})
]
def test_persist_skips_empty_and_missing_mappings() -> None:
# arrange: ``property_type`` has an empty mapping; ``wall_type`` is absent
# from ``classified`` entirely. Neither should hit the DB -- and the
# missing key must not raise (``persist`` reads with ``.get``).
property_type_repo = _StubLandlordOverrideRepository()
wall_type_repo = _StubLandlordOverrideRepository()
columns: list[ClassifiableColumn[Any]] = [
_column("property_type", "Property Type", _StubColumnClassifier({}), property_type_repo),
_column("wall_type", "Walls", _StubColumnClassifier({}), wall_type_repo),
]
classified: dict[str, dict[str, Enum]] = {"property_type": {}}
# act
_orchestrator(columns).persist(classified, portfolio_id=7)
# assert: no upserts at all.
assert property_type_repo.calls == []
assert wall_type_repo.calls == []
def test_classify_and_persist_from_rows_composes_classify_then_persist() -> None:
# arrange: the one-shot rows path must classify AND route to repos, so the
# convenience composition stays equivalent to calling the two in sequence.
rows = [{"Property Type": "semi-detached", "Walls": "solid brick"}]
property_type_repo = _StubLandlordOverrideRepository()
wall_type_repo = _StubLandlordOverrideRepository()
columns: list[ClassifiableColumn[Any]] = [
_column(
"property_type",
"Property Type",
_StubColumnClassifier({"semi-detached": PropertyType.HOUSE}),
property_type_repo,
),
_column(
"wall_type",
"Walls",
_StubColumnClassifier(
{"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED}
),
wall_type_repo,
),
]
# act
result = _orchestrator(columns).classify_and_persist_from_rows(rows, portfolio_id=99)
# assert: same return shape as classify_from_rows, and each repo wrote once.
assert result == {
"property_type": {"semi-detached": PropertyType.HOUSE},
"wall_type": {"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED},
}
assert property_type_repo.calls == [(99, {"semi-detached": PropertyType.HOUSE})]
assert wall_type_repo.calls == [
(99, {"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED})
]