diff --git a/applications/landlord_description_overrides/handler.py b/applications/landlord_description_overrides/handler.py index 801d1f12..901a8297 100644 --- a/applications/landlord_description_overrides/handler.py +++ b/applications/landlord_description_overrides/handler.py @@ -17,7 +17,7 @@ from domain.landlord_description_overrides.wall_type_construction_dates import ( from infrastructure.chatgpt.chatgpt import ChatGPT from infrastructure.chatgpt.chatgpt_column_classifier import ChatGptColumnClassifier from infrastructure.postgres.config import PostgresConfig -from infrastructure.postgres.engine import make_engine, transactional_session +from infrastructure.postgres.engine import commit_scope, make_engine, make_session from infrastructure.postgres.landlord_built_form_type_override_postgres_repository import ( LandlordBuiltFormTypeOverridePostgresRepository, ) @@ -130,16 +130,26 @@ def handler( rows = csv_client.read_rows(trigger.s3_uri) engine = make_engine(PostgresConfig.from_env(os.environ)) - with transactional_session(engine) as session: + # The session is built up front (SQLModel sessions are lazy, so no + # connection is checked out yet) and owned by this handler. Classification + # runs first and calls ChatGPT, which is slow; we deliberately keep no + # transaction open across it. Only the persistence below -- inside + # ``commit_scope`` -- holds a connection. + session = make_session(engine) + try: chat_gpt = ChatGPT() columns = _build_columns(trigger.column_mapping, chat_gpt, session) orchestrator = LandlordDescriptionOverridesOrchestrator( unstandardised_address_repo=unstandardised_address_repo, columns=columns, ) - classified = orchestrator.classify_and_persist_from_rows( - rows, portfolio_id=trigger.portfolio_id - ) + + classified = orchestrator.classify_from_rows(rows) + + with commit_scope(session): + orchestrator.persist(classified, portfolio_id=trigger.portfolio_id) + finally: + session.close() counts = {name: len(mapping) for name, mapping in classified.items()} for name, n in counts.items(): diff --git a/applications/landlord_description_overrides/requirements.txt b/applications/landlord_description_overrides/requirements.txt index b2917847..590c4667 100644 --- a/applications/landlord_description_overrides/requirements.txt +++ b/applications/landlord_description_overrides/requirements.txt @@ -2,4 +2,4 @@ boto3 pydantic sqlmodel psycopg2-binary -openai +openai==1.93.0 diff --git a/infrastructure/postgres/engine.py b/infrastructure/postgres/engine.py index ea2b35ad..2558532e 100644 --- a/infrastructure/postgres/engine.py +++ b/infrastructure/postgres/engine.py @@ -40,3 +40,21 @@ def transactional_session(engine: Engine) -> Iterator[Session]: raise finally: session.close() + + +@contextmanager # pyright: ignore[reportDeprecated] +def commit_scope(session: Session) -> Iterator[Session]: + """Commit a caller-owned session on clean exit; roll back on error. + + Like ``transactional_session`` but for a session the caller already holds + and will close itself. Use it to keep slow, non-DB work *outside* the + transaction: build the session, run the slow work, then enter + ``commit_scope`` only for the persistence -- so a connection is checked out + (SQLModel sessions are lazy) for the shortest possible window. + """ + try: + yield session + session.commit() + except Exception: + session.rollback() + raise diff --git a/orchestration/landlord_description_overrides_orchestrator.py b/orchestration/landlord_description_overrides_orchestrator.py index 6203b8d5..e43992cf 100644 --- a/orchestration/landlord_description_overrides_orchestrator.py +++ b/orchestration/landlord_description_overrides_orchestrator.py @@ -60,50 +60,73 @@ class LandlordDescriptionOverridesOrchestrator: for column in self._columns } + def persist( + self, classified: dict[str, dict[str, Enum]], portfolio_id: int + ) -> None: + """Persist already-classified results via each column's repository. + + ``classified`` is keyed by ``ClassifiableColumn.name`` -- the shape + ``classify_columns`` and ``classify_from_rows`` return. Each non-empty + mapping is written through the column's own repo under + ``source = 'classifier'``; an empty mapping (a registered column absent + from this batch) skips the DB round-trip. + + The orchestrator does not commit -- the caller owns the transaction + boundary, and is expected to open it only around this call so the + slow classification never holds a connection. + """ + for column in self._columns: + mapping = classified.get(column.name) + if not mapping: + continue + column.repo.upsert_all(portfolio_id, mapping) + def classify_and_persist( self, addresses: AddressList, portfolio_id: int ) -> dict[str, dict[str, Enum]]: """Classify every registered column and persist the results. - Each non-empty mapping is written via the column's repository under - ``source = 'classifier'``. Empty mappings (a registered column whose - ``source_column`` is absent from this batch) skip the DB round-trip. - The orchestrator does not commit -- the caller owns the transaction - boundary. - Returns the same shape as ``classify_columns`` so callers can log per-column counts. """ classified = self.classify_columns(addresses) - for column in self._columns: - mapping = classified[column.name] - if not mapping: - continue - column.repo.upsert_all(portfolio_id, mapping) + self.persist(classified, portfolio_id) return classified - def classify_and_persist_from_rows( - self, rows: list[dict[str, str]], portfolio_id: int + def classify_from_rows( + self, rows: list[dict[str, str]] ) -> dict[str, dict[str, Enum]]: - """Classify + persist straight from raw CSV rows. + """Classify raw CSV rows without touching the database. - Unlike ``classify_and_persist``, this does not build an ``AddressList``, - so it has no canonical address/postcode requirement -- the classifier - only needs the raw description cells. Used when reading the original + The classification half of ``classify_and_persist_from_rows``, split + out so a caller can run the slow ChatGPT work *before* opening a + transaction and then write the finished results with ``persist`` inside + one short-lived connection. + + Unlike the ``AddressList`` path this builds no ``AddressList``, so it + has no canonical address/postcode requirement -- the classifier only + needs the raw description cells. Used when reading the original landlord upload (raw headers) rather than the address-matching CSV. """ col_to_desc = self._descriptions_from_rows(rows) - classified = { + return { column.name: column.classifier.classify( col_to_desc.get(column.source_column, set()) ) for column in self._columns } - for column in self._columns: - mapping = classified[column.name] - if not mapping: - continue - column.repo.upsert_all(portfolio_id, mapping) + + def classify_and_persist_from_rows( + self, rows: list[dict[str, str]], portfolio_id: int + ) -> dict[str, dict[str, Enum]]: + """Classify + persist straight from raw CSV rows in one call. + + A convenience composition of ``classify_from_rows`` + ``persist``. + Prefer calling the two separately when classification is slow, so the + transaction opens only around ``persist`` (see the Lambda handler). + """ + classified = self.classify_from_rows(rows) + self.persist(classified, portfolio_id) return classified @staticmethod diff --git a/test.requirements.txt b/test.requirements.txt index 26125034..c5b71977 100644 --- a/test.requirements.txt +++ b/test.requirements.txt @@ -10,4 +10,5 @@ fuzzywuzzy pymupdf playwright==1.58.0 msal -moto[s3,sqs] \ No newline at end of file +moto[s3,sqs] +openai==1.93.0 \ No newline at end of file diff --git a/tests/orchestration/test_landlord_description_overrides_orchestrator.py b/tests/orchestration/test_landlord_description_overrides_orchestrator.py index eee4a310..d05b5911 100644 --- a/tests/orchestration/test_landlord_description_overrides_orchestrator.py +++ b/tests/orchestration/test_landlord_description_overrides_orchestrator.py @@ -323,3 +323,147 @@ def test_classify_and_persist_skips_upsert_for_a_column_absent_from_the_batch() # assert: Property Type wrote; Walls did not. assert property_type_repo.calls == [(7, {"semi-detached": PropertyType.HOUSE})] assert wall_type_repo.calls == [] + + +def test_classify_from_rows_classifies_each_column_without_persisting() -> None: + # arrange: raw CSV rows (not an AddressList) carry two classifiable columns. + rows = [{"Property Type": "semi-detached", "Walls": "solid brick"}] + property_types = _StubColumnClassifier({"semi-detached": PropertyType.HOUSE}) + wall_types = _StubColumnClassifier( + {"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED} + ) + property_type_repo = _StubLandlordOverrideRepository() + wall_type_repo = _StubLandlordOverrideRepository() + + # act + result = _orchestrator( + [ + _column("property_type", "Property Type", property_types, property_type_repo), + _column("wall_type", "Walls", wall_types, wall_type_repo), + ] + ).classify_from_rows(rows) + + # assert: each classifier ran against its column's descriptions, keyed by + # name -- and NOT a single repo was touched (classification is DB-free, so + # the slow ChatGPT work can run before any transaction opens). + assert result == { + "property_type": {"semi-detached": PropertyType.HOUSE}, + "wall_type": {"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED}, + } + assert property_type_repo.calls == [] + assert wall_type_repo.calls == [] + + +def test_classify_from_rows_splits_and_normalises_descriptions() -> None: + # arrange: one cell packs several descriptions with inconsistent casing, + # spread across rows. The rows path must fold them exactly like the + # AddressList path: comma-split, trimmed, lower-cased, de-duped. + rows = [ + {"Walls": "Solid Brick, cavity"}, + {"Walls": "SOLID BRICK"}, + ] + wall_types = _StubColumnClassifier({}) + + # act + _orchestrator( + [_column("wall_type", "Walls", wall_types)] + ).classify_from_rows(rows) + + # assert: the classifier saw one normalised entry per distinct variant. + assert wall_types.received == {"solid brick", "cavity"} + + +def test_classify_from_rows_yields_empty_mapping_for_an_absent_column() -> None: + # arrange: a column is registered for a header the rows lack. + rows = [{"Walls": "cavity"}] + property_types = _StubColumnClassifier({}) + + # act + result = _orchestrator( + [_column("property_type", "Property Type", property_types)] + ).classify_from_rows(rows) + + # assert: the absent column classified an empty description set. + assert result == {"property_type": {}} + assert property_types.received == set() + + +def test_persist_routes_each_columns_mapping_to_its_own_repo() -> None: + # arrange: a finished ``classified`` mapping (as classify_* would return) + # and two columns with distinct repos. + property_type_repo = _StubLandlordOverrideRepository() + wall_type_repo = _StubLandlordOverrideRepository() + columns: list[ClassifiableColumn[Any]] = [ + _column("property_type", "Property Type", _StubColumnClassifier({}), property_type_repo), + _column("wall_type", "Walls", _StubColumnClassifier({}), wall_type_repo), + ] + classified: dict[str, dict[str, Enum]] = { + "property_type": {"semi-detached": PropertyType.HOUSE}, + "wall_type": {"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED}, + } + + # act + _orchestrator(columns).persist(classified, portfolio_id=42) + + # assert: each repo received exactly its own column's mapping. + assert property_type_repo.calls == [(42, {"semi-detached": PropertyType.HOUSE})] + assert wall_type_repo.calls == [ + (42, {"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED}) + ] + + +def test_persist_skips_empty_and_missing_mappings() -> None: + # arrange: ``property_type`` has an empty mapping; ``wall_type`` is absent + # from ``classified`` entirely. Neither should hit the DB -- and the + # missing key must not raise (``persist`` reads with ``.get``). + property_type_repo = _StubLandlordOverrideRepository() + wall_type_repo = _StubLandlordOverrideRepository() + columns: list[ClassifiableColumn[Any]] = [ + _column("property_type", "Property Type", _StubColumnClassifier({}), property_type_repo), + _column("wall_type", "Walls", _StubColumnClassifier({}), wall_type_repo), + ] + classified: dict[str, dict[str, Enum]] = {"property_type": {}} + + # act + _orchestrator(columns).persist(classified, portfolio_id=7) + + # assert: no upserts at all. + assert property_type_repo.calls == [] + assert wall_type_repo.calls == [] + + +def test_classify_and_persist_from_rows_composes_classify_then_persist() -> None: + # arrange: the one-shot rows path must classify AND route to repos, so the + # convenience composition stays equivalent to calling the two in sequence. + rows = [{"Property Type": "semi-detached", "Walls": "solid brick"}] + property_type_repo = _StubLandlordOverrideRepository() + wall_type_repo = _StubLandlordOverrideRepository() + columns: list[ClassifiableColumn[Any]] = [ + _column( + "property_type", + "Property Type", + _StubColumnClassifier({"semi-detached": PropertyType.HOUSE}), + property_type_repo, + ), + _column( + "wall_type", + "Walls", + _StubColumnClassifier( + {"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED} + ), + wall_type_repo, + ), + ] + + # act + result = _orchestrator(columns).classify_and_persist_from_rows(rows, portfolio_id=99) + + # assert: same return shape as classify_from_rows, and each repo wrote once. + assert result == { + "property_type": {"semi-detached": PropertyType.HOUSE}, + "wall_type": {"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED}, + } + assert property_type_repo.calls == [(99, {"semi-detached": PropertyType.HOUSE})] + assert wall_type_repo.calls == [ + (99, {"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED}) + ]