landlord overrid orchestration

This commit is contained in:
Jun-te Kim 2026-05-26 15:27:45 +00:00
parent 96aeed4f2e
commit 8422041215
39 changed files with 1576 additions and 164 deletions

View file

@ -25,7 +25,7 @@ Invoke `/ubiquitous-language` in any session to extract new terms from the conve
| **Postcode** | A UK postal code used to group nearby addresses; the primary search key for finding EPC records. | "zip code", "postal code" |
| **Unstandardised Address** | A frozen dataclass (`domain.addresses.unstandardised_address.UnstandardisedAddress`) capturing a single address exactly as a customer supplied it, before any standardisation: a free-text `address` line (intentionally NOT normalised), a canonical `postcode` (a `Postcode` value object, sanitised on construction), an optional `org_reference` (the customer's own identifier for the property), and `additional_info` (the full source row — every column of the customer's upload, preserved verbatim). | "user address", "asset list", "raw address", "landlord address", "Hyde address" |
| **Address List** | A nominal `NewType` over `list[UnstandardisedAddress]` (`domain.addresses.unstandardised_address.AddressList`) — a batch of unstandardised addresses, such as one customer's bulk-onboarding upload or a postcode-grouped sub-batch produced for downstream processing. Being nominal, it is constructed explicitly: `AddressList([...])`. It is the raw *input* to ingestion; the standardised *output* is a **Standardised Asset List**. | "asset list", "Hyde address list", "user addresses" |
| **Standardised Asset List (SAL)** | A customer's property portfolio after ingestion has cleaned and standardised it — each property carrying a canonical field set (UPRN, standardised address, postcode, property type, built form, …). It is the standardised *output* of the pipeline whose raw *input* is an **Address List** of **Unstandardised Addresses**; generated by the `SALOrchestrator`. (Legacy implementation: `asset_list.AssetList` via `load_standardised_asset_list`.) | "address list" (that is the raw input), "asset register", "portfolio list" |
| **Standardised Asset List (SAL)** | A customer's property portfolio after ingestion has cleaned and standardised it — each property carrying a canonical field set (UPRN, standardised address, postcode, property type, built form, …). It is the standardised *output* of the pipeline whose raw *input* is an **Address List** of **Unstandardised Addresses**. (Legacy implementation: `asset_list.AssetList` via `load_standardised_asset_list`.) | "address list" (that is the raw input), "asset register", "portfolio list" |
| **Dwelling** | A single residential unit that can hold an EPC — a house, flat, or maisonette. | "property", "unit", "home" |
## Address Matching

View file

@ -1,69 +0,0 @@
import logging
from typing import Any
import boto3
from orchestration.sal_orchestrator import (
SALOrchestrator,
)
from infrastructure.s3.csv_s3_client import CsvS3Client
from repositories.unstandardised_address.unstandardised_address_list_csv_s3_repository import (
UnstandardisedAddressListCsvS3Repository,
)
from domain.addresses.unstandardised_address import AddressList
from domain.sal.column_classifier import ColumnClassifier
from domain.sal.property_type import PropertyType
from domain.sal.wall_type import WallType
from infrastructure.chatgpt.chatgpt import ChatGPT
from infrastructure.chatgpt.chatgpt_column_classifier import (
ChatGptColumnClassifier,
)
logger = logging.getLogger(__name__)
def handler(
body: dict[str, Any],
context: Any,
) -> dict[str, list[str]]:
s3_uri = "s3://retrofit-data-dev/bulk_onboarding_inputs/hyde2 (1).csv"
bucket = "retrofit-data-dev"
# boto3.client is overloaded per-service in the installed stubs; cast
# to Any so the strict-mode checker treats it as opaque.
boto3_client: Any = boto3.client # noqa
boto_s3: Any = boto3_client("s3")
csv_client = CsvS3Client(boto_s3, bucket)
unstandardised_address_repo = UnstandardisedAddressListCsvS3Repository(
csv_client, bucket
)
# One ChatGPT-backed classifier per landlord-CSV column, keyed by column name.
chat_gpt = ChatGPT()
classifiers: dict[str, ColumnClassifier[Any]] = {
"Property Type": ChatGptColumnClassifier(
chat_gpt, PropertyType, PropertyType.UNKNOWN
),
"Walls": ChatGptColumnClassifier(chat_gpt, WallType, WallType.UNKNOWN),
}
sal = SALOrchestrator(
unstandardised_address_repo=unstandardised_address_repo,
classifiers=classifiers,
)
addressList: AddressList = sal.get_unstandardised_addresses(input_s3_uri=s3_uri)
# Cap the batch to the first 20 while the ChatGPT path is under test.
addressList = AddressList(addressList[:20])
classified = sal.classify_columns(addressList)
for column, mapping in classified.items():
logger.info(
"Classified %d descriptions for column %r.", len(mapping), column
)
# TODO: persist `classified` to landlord overrides.
return {"hello": ["200"]}

View file

@ -0,0 +1,128 @@
import logging
import os
from typing import Any
from uuid import UUID
import boto3
from applications.landlord_description_overrides.landlord_description_overrides_trigger_body import (
LandlordDescriptionOverridesTriggerBody,
)
from domain.addresses.unstandardised_address import AddressList
from domain.landlord_description_overrides.built_form_type import BuiltFormType
from domain.landlord_description_overrides.property_type import PropertyType
from domain.landlord_description_overrides.wall_type import WallType
from infrastructure.chatgpt.chatgpt import ChatGPT
from infrastructure.chatgpt.chatgpt_column_classifier import ChatGptColumnClassifier
from infrastructure.postgres.config import PostgresConfig
from infrastructure.postgres.engine import make_engine, transactional_session
from infrastructure.postgres.landlord_built_form_type_override_postgres_repository import (
LandlordBuiltFormTypeOverridePostgresRepository,
)
from infrastructure.postgres.landlord_property_type_override_postgres_repository import (
LandlordPropertyTypeOverridePostgresRepository,
)
from infrastructure.postgres.landlord_wall_type_override_postgres_repository import (
LandlordWallTypeOverridePostgresRepository,
)
from infrastructure.s3.csv_s3_client import CsvS3Client
from orchestration.classifiable_column import ClassifiableColumn
from orchestration.landlord_description_overrides_orchestrator import (
LandlordDescriptionOverridesOrchestrator,
)
from repositories.unstandardised_address.unstandardised_address_list_csv_s3_repository import (
UnstandardisedAddressListCsvS3Repository,
)
logger = logging.getLogger(__name__)
def handler(
body: dict[str, Any],
context: Any,
) -> dict[str, list[str]]:
# TODO: replace with ``LandlordDescriptionOverridesTriggerBody.model_validate(body)``
# once this lambda is wired into the parent task pipeline via the SQS
# subtask envelope. Until then the trigger fields are hard-coded so the
# local invoker can exercise the full path. See ADR-0003 §Out of scope.
trigger = LandlordDescriptionOverridesTriggerBody(
task_id=UUID("00000000-0000-0000-0000-000000000001"),
sub_task_id=UUID("00000000-0000-0000-0000-000000000002"),
s3_uri="s3://retrofit-data-dev/bulk_onboarding_inputs/hyde2 (1).csv",
portfolio_id=730,
)
bucket = "retrofit-data-dev"
# boto3.client is overloaded per-service in the installed stubs; cast
# to Any so the strict-mode checker treats it as opaque.
boto3_client: Any = (
boto3.client
) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
boto_s3: Any = boto3_client("s3")
csv_client = CsvS3Client(boto_s3, bucket)
unstandardised_address_repo = UnstandardisedAddressListCsvS3Repository(
csv_client, bucket
)
# One transactional session per handler invocation: the context manager
# commits on clean exit and rolls back on exception, so the handler never
# invokes ``.commit()`` itself -- transaction semantics live in the
# infrastructure layer.
engine = make_engine(PostgresConfig.from_env(os.environ))
with transactional_session(engine) as session:
chat_gpt = ChatGPT()
# The "Property Type" CSV column is read by two classifiers: the
# landlord's free-text (e.g. "semi-detached house") encodes both the
# dwelling kind (PropertyType) and how it joins to neighbours
# (BuiltFormType). Each classification lands in its own table.
columns: list[ClassifiableColumn[Any]] = [
ClassifiableColumn(
name="property_type",
source_column="Property Type",
classifier=ChatGptColumnClassifier(
chat_gpt, PropertyType, PropertyType.UNKNOWN
),
repo=LandlordPropertyTypeOverridePostgresRepository(session),
),
ClassifiableColumn(
name="built_form_type",
source_column="Property Type",
classifier=ChatGptColumnClassifier(
chat_gpt, BuiltFormType, BuiltFormType.UNKNOWN
),
repo=LandlordBuiltFormTypeOverridePostgresRepository(session),
),
ClassifiableColumn(
name="wall_type",
source_column="Walls",
classifier=ChatGptColumnClassifier(
chat_gpt, WallType, WallType.UNKNOWN
),
repo=LandlordWallTypeOverridePostgresRepository(session),
),
]
orchestrator = LandlordDescriptionOverridesOrchestrator(
unstandardised_address_repo=unstandardised_address_repo,
columns=columns,
)
addressList: AddressList = orchestrator.get_unstandardised_addresses(
input_s3_uri=trigger.s3_uri
)
# Cap the batch to the first 20 while the ChatGPT path is under test.
# Remove before wiring into the production subtask pipeline.
addressList = AddressList(addressList[:20])
classified = orchestrator.classify_and_persist(
addressList, portfolio_id=trigger.portfolio_id
)
for column, mapping in classified.items():
logger.info(
"Classified %d descriptions for column %r.", len(mapping), column
)
return {"hello": ["200"]}

View file

@ -0,0 +1,15 @@
from uuid import UUID
from pydantic import BaseModel, ConfigDict
class LandlordDescriptionOverridesTriggerBody(BaseModel):
model_config = ConfigDict(extra="allow")
task_id: UUID
sub_task_id: UUID
s3_uri: str
# ``portfolio_id`` is ``bigint`` in the ``landlord_*_overrides`` schema --
# Python ``int`` is unbounded so the Pydantic side stays simple; the
# SQLModel row class pins the storage to ``BigInteger``.
portfolio_id: int

View file

@ -0,0 +1,77 @@
# ADR-0003: Python writes landlord overrides directly to Postgres
**Status:** Accepted
**Date:** 2026-05-26
**Supersedes (in part):** [assessment-model/docs/adr/0002-landlord-override-vocabulary.md](https://github.com/.../assessment-model/blob/main/docs/adr/0002-landlord-override-vocabulary.md) — specifically the clause beginning *"Writes happen from Next.js …"*.
## Context
ADR-0002 (in the `assessment-model` TS repo) defined the `landlord_property_type_overrides` and `landlord_wall_type_overrides` tables and noted that the Model service would POST classification results to a Next.js route handler, with Next.js performing the upsert. Drizzle remained the schema source of truth.
That extra hop has not been built and is now judged unnecessary for the present scope:
- The classification result is internal — a Lambda computes it, the same Lambda persists it. No third party needs to participate in the write.
- Drizzle remains the schema's source of truth either way: the Python adapter mirrors the schema in a SQLModel row, but the migrations stay with Drizzle. Adding a Next.js route would not change which side owns schema definition.
- The Python lambda already lives next to a Postgres connection in the existing pipeline (`subtask`/`tasks` tables are written from Python today). Adding two more tables to that adapter surface is a small, well-understood change. Routing the same writes through Next.js would mean: lambda → JSON-over-HTTP → Next.js route → Drizzle → Postgres, instead of lambda → SQLAlchemy → Postgres. Three extra moving parts to ship, deploy, monitor, and authenticate for no behavioural gain.
## Decision
The Model service (specifically `applications/landlord_description_overrides/handler.py`) writes directly to `landlord_property_type_overrides` and `landlord_wall_type_overrides` via a SQLAlchemy-backed `LandlordOverrideRepository[E]` adapter. No Next.js route handler is required.
Transaction boundaries live in `infrastructure/postgres/engine.transactional_session` — a context manager that commits on clean exit and rolls back on exception. The application layer (`handler.py`) never calls `.commit()` or `.rollback()` itself; it only opens the context. Orchestration and repository code likewise never commits — keeping transaction semantics confined to one infrastructure helper.
The conflict policy lives in SQL and is identical for every adapter implementation:
```sql
INSERT INTO landlord_property_type_overrides (portfolio_id, description, value, source)
VALUES …
ON CONFLICT (portfolio_id, description)
DO UPDATE SET value = EXCLUDED.value,
source = EXCLUDED.source,
updated_at = now()
WHERE landlord_property_type_overrides.source = 'classifier';
```
The `WHERE existing.source = 'classifier'` guard is load-bearing: it lets the classifier refresh its own past output while leaving `source = 'user'` rows untouched. This is the contract ADR-0002's `source` column was added for.
`UNKNOWN` values are persisted, not skipped — consistent with ADR-0002 §5. A future user override can upgrade them.
## Consequences
**Positive.**
- One fewer service to deploy, monitor, and authenticate.
- The classifier and persistence live in the same process — failures surface against a single `sub_task` row, not split across two systems.
- The Postgres adapter mirrors the existing `subtask`/`tasks` repositories, so reviewers have a precedent to compare against.
**Negative.**
- The Python repo now holds two schemas — the schema-source-of-truth Drizzle definition lives in the TS repo, and the Python `SQLModel` row class shadows it. They must stay in lockstep. Mitigations: the TS schema header comment (`landlord_overrides.ts:12`) already names the Python source-of-truth file; a future ADR may add a CI check that diffs the two.
- The boundary that ADR-0002 anticipated for pgEnum validation (a Next.js route validating incoming values before insert) is gone. Pydantic + the Python `Enum` type catch invalid values on the producing side, and Postgres's pgEnum will reject anything that slips through.
## File layout
This ADR also fixes a placement convention for Postgres adapters going forward. The codebase currently has the ChatGPT classifier split cleanly along DDD lines — port in `domain/`, adapter in `infrastructure/chatgpt/` — but the `tasks` Postgres adapter does not follow the same shape: its concrete class lives in `repositories/tasks/`, not `infrastructure/postgres/`.
The convention going forward is:
- **Port (protocol / abstract base):** `repositories/<aggregate>/<thing>_repository.py`
- **Postgres adapter (concrete):** `infrastructure/postgres/<thing>_postgres_repository.py`
- **SQLModel row class:** `infrastructure/postgres/<thing>_table.py`
The new `LandlordOverrideRepository` family follows this convention.
**Existing outliers to relocate in a follow-up:**
- `repositories/tasks/task_postgres_repository.py``infrastructure/postgres/task_postgres_repository.py`
- `repositories/tasks/subtask_postgres_repository.py``infrastructure/postgres/subtask_postgres_repository.py`
Both moves are mechanical (import-path updates only). They are intentionally out of scope for the present PR.
## Out of scope (deferred to follow-up work)
- Relocating `task_postgres_repository.py` and `subtask_postgres_repository.py` into `infrastructure/postgres/` per the convention above.
- Extracting a shared upsert helper / base class once a third `landlord_*_overrides` column lands — until then the two adapters' 95%-identical bodies are kept side-by-side for direct comparison.
- Switching `applications/landlord_description_overrides/handler.py` to acquire its `Session` via a `@subtask_handler()`-style decorator instead of building its own engine.
- A cross-repo PR amending ADR-0002 to point at this ADR.
- A CI check (or codegen) that diffs the Drizzle pgEnum literals against the Python `Enum.value` strings.

View file

@ -0,0 +1,20 @@
from enum import Enum
class BuiltFormType(Enum):
"""A landlord-supplied built form, as resolved by the landlord-description-overrides context.
Mirrors the EPC built-form values. ``NOT_RECORDED`` is the legitimate
EPC value for properties whose built form the surveyor did not capture;
``UNKNOWN`` is the classifier fallback for landlord values that cannot be
resolved at all.
"""
DETACHED = "Detached"
SEMI_DETACHED = "Semi-Detached"
MID_TERRACE = "Mid-Terrace"
END_TERRACE = "End-Terrace"
ENCLOSED_MID_TERRACE = "Enclosed Mid-Terrace"
ENCLOSED_END_TERRACE = "Enclosed End-Terrace"
NOT_RECORDED = "Not Recorded"
UNKNOWN = "Unknown"

View file

@ -21,7 +21,7 @@ class ColumnClassifier(ABC, Generic[E]):
One classifier handles one landlord-CSV column. Implementations decide
*how* the mapping is performed (an LLM, a lookup table, a rules engine);
``SALOrchestrator`` depends only on this interface.
``LandlordDescriptionOverridesOrchestrator`` depends only on this interface.
"""
@abstractmethod

View file

@ -2,7 +2,7 @@ from enum import Enum
class PropertyType(Enum):
"""A landlord-supplied property type, as resolved by the SAL context.
"""A landlord-supplied property type, as resolved by the landlord-description-overrides context.
Distinct from the EPC context's ``PropertyType``: a landlord CSV value
may be unresolvable, so this enum carries an explicit ``UNKNOWN`` member.

View file

@ -0,0 +1,70 @@
from enum import Enum
class RoofType(Enum):
"""A landlord-supplied roof description, as resolved by the landlord-description-overrides context.
Each member is one full EPC roof-description string, combining shape
(flat, pitched, roof room(s), thatched) with insulation state and, for
pitched roofs, the loft-insulation depth in millimetres. Adjacency
markers like ``(another dwelling above)`` represent a unit whose top
boundary is another dwelling rather than a roof of its own; they are
kept as members because they appear in the same EPC column.
``UNKNOWN`` covers values the classifier cannot resolve -- most
commonly raw ``Average thermal transmittance`` U-value strings that
carry no shape/insulation information.
"""
FLAT_INSULATED = "Flat, insulated"
FLAT_INSULATED_ASSUMED = "Flat, insulated (assumed)"
FLAT_LIMITED_INSULATION = "Flat, limited insulation"
FLAT_LIMITED_INSULATION_ASSUMED = "Flat, limited insulation (assumed)"
FLAT_NO_INSULATION = "Flat, no insulation"
FLAT_NO_INSULATION_ASSUMED = "Flat, no insulation (assumed)"
PITCHED_INSULATED = "Pitched, insulated"
PITCHED_INSULATED_ASSUMED = "Pitched, insulated (assumed)"
PITCHED_INSULATED_AT_RAFTERS = "Pitched, insulated at rafters"
PITCHED_LIMITED_INSULATION = "Pitched, limited insulation"
PITCHED_LIMITED_INSULATION_ASSUMED = "Pitched, limited insulation (assumed)"
PITCHED_NO_INSULATION = "Pitched, no insulation"
PITCHED_NO_INSULATION_ASSUMED = "Pitched, no insulation (assumed)"
PITCHED_UNKNOWN_LOFT_INSULATION = "Pitched, Unknown loft insulation"
PITCHED_LOFT_0MM = "Pitched, 0 mm loft insulation"
PITCHED_LOFT_12MM = "Pitched, 12 mm loft insulation"
PITCHED_LOFT_25MM = "Pitched, 25 mm loft insulation"
PITCHED_LOFT_50MM = "Pitched, 50 mm loft insulation"
PITCHED_LOFT_75MM = "Pitched, 75 mm loft insulation"
PITCHED_LOFT_100MM = "Pitched, 100 mm loft insulation"
PITCHED_LOFT_125MM = "Pitched, 125 mm loft insulation"
PITCHED_LOFT_150MM = "Pitched, 150 mm loft insulation"
PITCHED_LOFT_175MM = "Pitched, 175 mm loft insulation"
PITCHED_LOFT_200MM = "Pitched, 200 mm loft insulation"
PITCHED_LOFT_225MM = "Pitched, 225 mm loft insulation"
PITCHED_LOFT_250MM = "Pitched, 250 mm loft insulation"
PITCHED_LOFT_270MM = "Pitched, 270 mm loft insulation"
PITCHED_LOFT_300MM = "Pitched, 300 mm loft insulation"
PITCHED_LOFT_350MM = "Pitched, 350 mm loft insulation"
PITCHED_LOFT_400MM = "Pitched, 400 mm loft insulation"
PITCHED_LOFT_400_PLUS_MM = "Pitched, 400+ mm loft insulation"
ROOF_ROOM_INSULATED = "Roof room(s), insulated"
ROOF_ROOM_INSULATED_ASSUMED = "Roof room(s), insulated (assumed)"
ROOF_ROOM_LIMITED_INSULATION = "Roof room(s), limited insulation"
ROOF_ROOM_LIMITED_INSULATION_ASSUMED = "Roof room(s), limited insulation (assumed)"
ROOF_ROOM_NO_INSULATION = "Roof room(s), no insulation"
ROOF_ROOM_NO_INSULATION_ASSUMED = "Roof room(s), no insulation (assumed)"
ROOF_ROOM_CEILING_INSULATED = "Roof room(s), ceiling insulated"
ROOF_ROOM_THATCHED = "Roof room(s), thatched"
ROOF_ROOM_THATCHED_WITH_ADDITIONAL_INSULATION = "Roof room(s), thatched with additional insulation"
THATCHED = "Thatched"
THATCHED_WITH_ADDITIONAL_INSULATION = "Thatched, with additional insulation"
ADJACENT_ANOTHER_DWELLING_ABOVE = "(another dwelling above)"
ADJACENT_SAME_DWELLING_ABOVE = "(same dwelling above)"
ADJACENT_OTHER_PREMISES_ABOVE = "(other premises above)"
ADJACENT_ANOTHER_PREMISES_ABOVE = "(another premises above)"
ANOTHER_PREMISES_ABOVE = "Another Premises Above"
UNKNOWN = "Unknown"

View file

@ -0,0 +1,70 @@
from enum import Enum
class WallType(Enum):
"""A landlord-supplied wall description, as resolved by the landlord-description-overrides context.
Each member is one full EPC wall-description string, combining material
(cavity, solid brick, sandstone, ) with construction/insulation state
(as built, filled cavity, with internal insulation, ). ``UNKNOWN`` covers
values the classifier cannot resolve most commonly raw
``Average thermal transmittance`` U-value strings that carry no material
information.
"""
CAVITY_FILLED = "Cavity wall, filled cavity"
CAVITY_AS_BUILT_INSULATED_ASSUMED = "Cavity wall, as built, insulated (assumed)"
CAVITY_AS_BUILT_NO_INSULATION_ASSUMED = "Cavity wall, as built, no insulation (assumed)"
CAVITY_AS_BUILT_PARTIAL_INSULATION_ASSUMED = "Cavity wall, as built, partial insulation (assumed)"
CAVITY_WITH_INTERNAL_INSULATION = "Cavity wall, with internal insulation"
CAVITY_WITH_EXTERNAL_INSULATION = "Cavity wall, with external insulation"
CAVITY_FILLED_AND_INTERNAL_INSULATION = "Cavity wall, filled cavity and internal insulation"
CAVITY_FILLED_AND_EXTERNAL_INSULATION = "Cavity wall, filled cavity and external insulation"
SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED = "Solid brick, as built, no insulation (assumed)"
SOLID_BRICK_AS_BUILT_INSULATED_ASSUMED = "Solid brick, as built, insulated (assumed)"
SOLID_BRICK_AS_BUILT_PARTIAL_INSULATION_ASSUMED = "Solid brick, as built, partial insulation (assumed)"
SOLID_BRICK_WITH_INTERNAL_INSULATION = "Solid brick, with internal insulation"
SOLID_BRICK_WITH_EXTERNAL_INSULATION = "Solid brick, with external insulation"
TIMBER_FRAME_AS_BUILT_NO_INSULATION_ASSUMED = "Timber frame, as built, no insulation (assumed)"
TIMBER_FRAME_AS_BUILT_INSULATED_ASSUMED = "Timber frame, as built, insulated (assumed)"
TIMBER_FRAME_AS_BUILT_PARTIAL_INSULATION_ASSUMED = "Timber frame, as built, partial insulation (assumed)"
TIMBER_FRAME_WITH_ADDITIONAL_INSULATION = "Timber frame, with additional insulation"
SANDSTONE_AS_BUILT_NO_INSULATION_ASSUMED = "Sandstone, as built, no insulation (assumed)"
SANDSTONE_AS_BUILT_INSULATED_ASSUMED = "Sandstone, as built, insulated (assumed)"
SANDSTONE_AS_BUILT_PARTIAL_INSULATION_ASSUMED = "Sandstone, as built, partial insulation (assumed)"
SANDSTONE_WITH_INTERNAL_INSULATION = "Sandstone, with internal insulation"
SANDSTONE_WITH_EXTERNAL_INSULATION = "Sandstone, with external insulation"
GRANITE_OR_WHIN_AS_BUILT_NO_INSULATION_ASSUMED = "Granite or whin, as built, no insulation (assumed)"
GRANITE_OR_WHIN_AS_BUILT_INSULATED_ASSUMED = "Granite or whin, as built, insulated (assumed)"
GRANITE_OR_WHIN_AS_BUILT_PARTIAL_INSULATION_ASSUMED = "Granite or whin, as built, partial insulation (assumed)"
GRANITE_OR_WHIN_WITH_INTERNAL_INSULATION = "Granite or whin, with internal insulation"
GRANITE_OR_WHIN_WITH_EXTERNAL_INSULATION = "Granite or whin, with external insulation"
SYSTEM_BUILT_AS_BUILT_NO_INSULATION_ASSUMED = "System built, as built, no insulation (assumed)"
SYSTEM_BUILT_AS_BUILT_INSULATED_ASSUMED = "System built, as built, insulated (assumed)"
SYSTEM_BUILT_AS_BUILT_PARTIAL_INSULATION_ASSUMED = "System built, as built, partial insulation (assumed)"
SYSTEM_BUILT_WITH_INTERNAL_INSULATION = "System built, with internal insulation"
SYSTEM_BUILT_WITH_EXTERNAL_INSULATION = "System built, with external insulation"
PARK_HOME_AS_BUILT = "Park home wall, as built"
PARK_HOME_WITH_INTERNAL_INSULATION = "Park home wall, with internal insulation"
PARK_HOME_WITH_EXTERNAL_INSULATION = "Park home wall, with external insulation"
COB_AS_BUILT = "Cob, as built"
COB_WITH_INTERNAL_INSULATION = "Cob, with internal insulation"
COB_WITH_EXTERNAL_INSULATION = "Cob, with external insulation"
CURTAIN_WALL = "Curtain wall"
CURTAIN_WALL_AS_BUILT_NO_INSULATION_ASSUMED = "Curtain Wall, as built, no insulation (assumed)"
CURTAIN_WALL_AS_BUILT_INSULATED_ASSUMED = "Curtain Wall, as built, insulated (assumed)"
CURTAIN_WALL_FILLED = "Curtain Wall, filled cavity"
CURTAIN_WALL_WITH_INTERNAL_INSULATION = "Curtain Wall, with internal insulation"
BASEMENT_WALL = "Basement wall"
BASEMENT_WALL_AS_BUILT = "Basement wall, as built"
UNKNOWN = "Unknown"

View file

@ -1,15 +0,0 @@
from enum import Enum
class WallType(Enum):
"""A landlord-supplied wall construction type, as resolved by the SAL context.
Mirrors the main RdSAP wall constructions. Like the SAL ``PropertyType``,
it carries an explicit ``UNKNOWN`` member for unresolvable CSV values.
"""
CAVITY = "Cavity"
SOLID_BRICK = "Solid Brick"
TIMBER_FRAME = "Timber frame"
SANDSTONE = "Sandstone"
UNKNOWN = "Unknown"

View file

@ -4,7 +4,10 @@ import json
from enum import Enum
from typing import Any, TypeVar
from domain.sal.column_classifier import ClassificationError, ColumnClassifier
from domain.landlord_description_overrides.column_classifier import (
ClassificationError,
ColumnClassifier,
)
from infrastructure.chatgpt.chatgpt import ChatGPT
from infrastructure.chatgpt.exceptions import ChatGPTClientError

View file

@ -1,3 +1,6 @@
from collections.abc import Iterator
from contextlib import contextmanager
from sqlalchemy.engine import Engine
from sqlmodel import Session, create_engine
@ -16,3 +19,24 @@ def make_engine(config: PostgresConfig) -> Engine:
def make_session(engine: Engine) -> Session:
return Session(engine)
@contextmanager # pyright: ignore[reportDeprecated]
def transactional_session(engine: Engine) -> Iterator[Session]:
"""Yield a session whose lifecycle owns the transaction.
On clean exit the session commits; on any exception it rolls back and
re-raises. Either way the session is closed. Callers in the application
layer can do their work inside the ``with`` block without ever invoking
``.commit()`` / ``.rollback()`` themselves -- transaction semantics stay
in the infrastructure layer.
"""
session = Session(engine)
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()

View file

@ -0,0 +1,82 @@
"""Postgres adapter for ``LandlordOverrideRepository[BuiltFormType]``.
Writes to ``landlord_built_form_type_overrides`` (Drizzle-managed; mirrored by
``LandlordBuiltFormTypeOverrideRow``). The conflict policy lives in the SQL --
see ADR-0003 §Decision. Shape mirrors
``LandlordPropertyTypeOverridePostgresRepository``; the duplication is
deliberate while there are only three columns -- if a fourth lands and the
duplication becomes painful, extract a shared upsert helper then.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import cast
from sqlalchemy import Table
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlmodel import Session
from domain.landlord_description_overrides.built_form_type import BuiltFormType
from infrastructure.postgres.landlord_built_form_type_override_table import (
LandlordBuiltFormTypeOverrideRow,
)
from infrastructure.postgres.landlord_override_enums import OverrideSource
from repositories.landlord_overrides.landlord_override_repository import (
LandlordOverrideRepository,
)
class LandlordBuiltFormTypeOverridePostgresRepository(
LandlordOverrideRepository[BuiltFormType]
):
def __init__(self, session: Session) -> None:
self._session = session
def upsert_all(
self,
portfolio_id: int,
descriptions_to_values: dict[str, BuiltFormType],
) -> None:
if not descriptions_to_values:
return
now = datetime.now(timezone.utc)
rows = [
{
"portfolio_id": portfolio_id,
"description": description,
"value": value.value,
"source": OverrideSource.CLASSIFIER,
"created_at": now,
"updated_at": now,
}
for description, value in descriptions_to_values.items()
]
# SQLModel's class-level ``__table__`` is injected at runtime on
# ``table=True`` classes but isn't exposed by the stubs; pin it to
# ``Table`` via ``getattr`` so the dialect insert helper below
# carries through with strict types.
table: Table = cast(
Table, getattr(LandlordBuiltFormTypeOverrideRow, "__table__")
)
stmt = pg_insert(table).values(rows)
# The classifier may refresh its own past output, but must never
# overwrite a user correction -- the ``WHERE existing.source =
# 'classifier'`` guard enforces that. See ADR-0003 §Decision.
stmt = stmt.on_conflict_do_update(
index_elements=["portfolio_id", "description"],
set_={
"value": stmt.excluded.value,
"source": stmt.excluded.source,
"updated_at": stmt.excluded.updated_at,
},
where=table.c.source == OverrideSource.CLASSIFIER,
)
# SQLModel re-exports SQLAlchemy's ``Session.execute``; one of the
# overload signatures is marked deprecated in stubs, which fires
# here even though our INSERT path is the supported one.
self._session.execute(stmt) # pyright: ignore[reportDeprecated]

View file

@ -0,0 +1,69 @@
"""SQLModel mirror of the ``landlord_built_form_type_overrides`` Drizzle table.
The schema source of truth lives in the ``assessment-model`` TS repo
(`src/app/db/schema/landlord_overrides.ts`). The migrations are owned there;
this row class only mirrors the columns so the Python lambda can read/write.
See ADR-0003. Shape mirrors ``LandlordPropertyTypeOverrideRow`` -- the only
differences are the table name, the ``built_form_type`` pgEnum on ``value``,
and the unique-constraint name.
"""
from datetime import datetime, timezone
from typing import ClassVar
from uuid import UUID, uuid4
from sqlalchemy import BigInteger, Column, UniqueConstraint
from sqlalchemy import Enum as SAEnum
from sqlmodel import Field, SQLModel
from domain.landlord_description_overrides.built_form_type import BuiltFormType
from infrastructure.postgres.landlord_override_enums import override_source_sa_enum
class LandlordBuiltFormTypeOverrideRow(SQLModel, table=True):
__tablename__: ClassVar[str] = "landlord_built_form_type_overrides" # pyright: ignore[reportIncompatibleVariableOverride]
__table_args__: ClassVar[tuple[UniqueConstraint, ...]] = ( # pyright: ignore[reportIncompatibleVariableOverride]
UniqueConstraint(
"portfolio_id",
"description",
name="landlord_built_form_type_overrides_portfolio_description_unique",
),
)
id: UUID = Field(default_factory=uuid4, primary_key=True)
# bigint to match the Drizzle ``portfolio_id`` FK; SQLModel's default int
# mapping is 32-bit Integer and would overflow once portfolio IDs exceed
# 2^31. The FK to ``portfolio.id`` is enforced by the Drizzle migration,
# not declared here -- the ``portfolio`` table is not modelled in Python.
portfolio_id: int = Field(
sa_column=Column(BigInteger, nullable=False, index=True),
)
description: str = Field(nullable=False)
value: BuiltFormType = Field(
sa_column=Column(
SAEnum(
BuiltFormType,
name="built_form_type",
values_callable=lambda cls: [m.value for m in cls], # pyright: ignore[reportUnknownLambdaType, reportUnknownMemberType, reportUnknownVariableType]
),
nullable=False,
),
)
# Shared SAEnum -- see ``landlord_override_enums`` for why this single
# instance is reused by every ``landlord_*_overrides`` row class.
source: str = Field(
sa_column=Column(override_source_sa_enum, nullable=False),
)
created_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc),
nullable=False,
)
updated_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc),
nullable=False,
)

View file

@ -0,0 +1,35 @@
"""Shared pgEnum definitions used by every ``landlord_*_overrides`` row class.
The ``override_source`` pgEnum is referenced by both
``landlord_property_type_overrides`` and ``landlord_wall_type_overrides``
(per the Drizzle schema -- see ``landlord_overrides.ts``). Defining it once
here and reusing the same SQLAlchemy ``Enum`` instance across both row
classes keeps SQLModel's metadata coherent: ``create_all`` emits exactly one
``CREATE TYPE override_source`` statement, not two parallel ones colliding
on the same pgEnum name.
"""
from __future__ import annotations
from sqlalchemy import Enum as SAEnum
class OverrideSource:
"""Mirror of the ``override_source`` pgEnum.
Drizzle defines this as ``('classifier', 'user')`` in
``landlord_overrides.ts``. Modelled here as string constants so callers
don't sprinkle magic strings; the column is constrained by Postgres,
and the only Python-side producer (the classifier path) writes the
literal ``OverrideSource.CLASSIFIER``.
"""
CLASSIFIER = "classifier"
USER = "user"
override_source_sa_enum = SAEnum(
OverrideSource.CLASSIFIER,
OverrideSource.USER,
name="override_source",
)

View file

@ -0,0 +1,82 @@
"""Postgres adapter for ``LandlordOverrideRepository[PropertyType]``.
Writes to ``landlord_property_type_overrides`` (Drizzle-managed; mirrored by
``LandlordPropertyTypeOverrideRow``). The conflict policy lives in the SQL --
see ADR-0003 §Decision.
Per the convention this ADR fixes, Postgres adapters live in
``infrastructure/postgres/``. The existing ``task_postgres_repository.py`` /
``subtask_postgres_repository.py`` are outliers still under ``repositories/``;
relocating them is tracked as a follow-up in ADR-0003 §"File layout".
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import cast
from sqlalchemy import Table
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlmodel import Session
from domain.landlord_description_overrides.property_type import PropertyType
from infrastructure.postgres.landlord_override_enums import OverrideSource
from infrastructure.postgres.landlord_property_type_override_table import (
LandlordPropertyTypeOverrideRow,
)
from repositories.landlord_overrides.landlord_override_repository import (
LandlordOverrideRepository,
)
class LandlordPropertyTypeOverridePostgresRepository(
LandlordOverrideRepository[PropertyType]
):
def __init__(self, session: Session) -> None:
self._session = session
def upsert_all(
self,
portfolio_id: int,
descriptions_to_values: dict[str, PropertyType],
) -> None:
if not descriptions_to_values:
return
now = datetime.now(timezone.utc)
rows = [
{
"portfolio_id": portfolio_id,
"description": description,
"value": value.value,
"source": OverrideSource.CLASSIFIER,
"created_at": now,
"updated_at": now,
}
for description, value in descriptions_to_values.items()
]
# SQLModel's class-level ``__table__`` is injected at runtime on
# ``table=True`` classes but isn't exposed by the stubs; pin it to
# ``Table`` via ``getattr`` so the dialect insert helper below
# carries through with strict types.
table: Table = cast(Table, getattr(LandlordPropertyTypeOverrideRow, "__table__"))
stmt = pg_insert(table).values(rows)
# The classifier may refresh its own past output, but must never
# overwrite a user correction -- the ``WHERE existing.source =
# 'classifier'`` guard enforces that. See ADR-0003 §Decision.
stmt = stmt.on_conflict_do_update(
index_elements=["portfolio_id", "description"],
set_={
"value": stmt.excluded.value,
"source": stmt.excluded.source,
"updated_at": stmt.excluded.updated_at,
},
where=table.c.source == OverrideSource.CLASSIFIER,
)
# SQLModel re-exports SQLAlchemy's ``Session.execute``; one of the
# overload signatures is marked deprecated in stubs, which fires
# here even though our INSERT path is the supported one.
self._session.execute(stmt) # pyright: ignore[reportDeprecated]

View file

@ -0,0 +1,67 @@
"""SQLModel mirror of the ``landlord_property_type_overrides`` Drizzle table.
The schema source of truth lives in the ``assessment-model`` TS repo
(`src/app/db/schema/landlord_overrides.ts`). The migrations are owned there;
this row class only mirrors the columns so the Python lambda can read/write.
See ADR-0003.
"""
from datetime import datetime, timezone
from typing import ClassVar
from uuid import UUID, uuid4
from sqlalchemy import BigInteger, Column, UniqueConstraint
from sqlalchemy import Enum as SAEnum
from sqlmodel import Field, SQLModel
from domain.landlord_description_overrides.property_type import PropertyType
from infrastructure.postgres.landlord_override_enums import override_source_sa_enum
class LandlordPropertyTypeOverrideRow(SQLModel, table=True):
__tablename__: ClassVar[str] = "landlord_property_type_overrides" # pyright: ignore[reportIncompatibleVariableOverride]
__table_args__: ClassVar[tuple[UniqueConstraint, ...]] = ( # pyright: ignore[reportIncompatibleVariableOverride]
UniqueConstraint(
"portfolio_id",
"description",
name="landlord_property_type_overrides_portfolio_description_unique",
),
)
id: UUID = Field(default_factory=uuid4, primary_key=True)
# bigint to match the Drizzle ``portfolio_id`` FK; SQLModel's default int
# mapping is 32-bit Integer and would overflow once portfolio IDs exceed
# 2^31. The FK to ``portfolio.id`` is enforced by the Drizzle migration,
# not declared here -- the ``portfolio`` table is not modelled in Python.
portfolio_id: int = Field(
sa_column=Column(BigInteger, nullable=False, index=True),
)
description: str = Field(nullable=False)
value: PropertyType = Field(
sa_column=Column(
SAEnum(
PropertyType,
name="property_type",
values_callable=lambda cls: [m.value for m in cls], # pyright: ignore[reportUnknownLambdaType, reportUnknownMemberType, reportUnknownVariableType]
),
nullable=False,
),
)
# Shared SAEnum -- see ``landlord_override_enums`` for why this single
# instance is reused by every ``landlord_*_overrides`` row class.
source: str = Field(
sa_column=Column(override_source_sa_enum, nullable=False),
)
created_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc),
nullable=False,
)
updated_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc),
nullable=False,
)

View file

@ -0,0 +1,80 @@
"""Postgres adapter for ``LandlordOverrideRepository[WallType]``.
Writes to ``landlord_wall_type_overrides`` (Drizzle-managed; mirrored by
``LandlordWallTypeOverrideRow``). The conflict policy lives in the SQL --
see ADR-0003 §Decision. Shape mirrors
``LandlordPropertyTypeOverridePostgresRepository``; the duplication is
deliberate while there are only two columns -- if a third lands and the
duplication becomes painful, extract a shared upsert helper then.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import cast
from sqlalchemy import Table
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlmodel import Session
from domain.landlord_description_overrides.wall_type import WallType
from infrastructure.postgres.landlord_override_enums import OverrideSource
from infrastructure.postgres.landlord_wall_type_override_table import (
LandlordWallTypeOverrideRow,
)
from repositories.landlord_overrides.landlord_override_repository import (
LandlordOverrideRepository,
)
class LandlordWallTypeOverridePostgresRepository(
LandlordOverrideRepository[WallType]
):
def __init__(self, session: Session) -> None:
self._session = session
def upsert_all(
self,
portfolio_id: int,
descriptions_to_values: dict[str, WallType],
) -> None:
if not descriptions_to_values:
return
now = datetime.now(timezone.utc)
rows = [
{
"portfolio_id": portfolio_id,
"description": description,
"value": value.value,
"source": OverrideSource.CLASSIFIER,
"created_at": now,
"updated_at": now,
}
for description, value in descriptions_to_values.items()
]
# SQLModel's class-level ``__table__`` is injected at runtime on
# ``table=True`` classes but isn't exposed by the stubs; pin it to
# ``Table`` via ``getattr`` so the dialect insert helper below
# carries through with strict types.
table: Table = cast(Table, getattr(LandlordWallTypeOverrideRow, "__table__"))
stmt = pg_insert(table).values(rows)
# The classifier may refresh its own past output, but must never
# overwrite a user correction -- the ``WHERE existing.source =
# 'classifier'`` guard enforces that. See ADR-0003 §Decision.
stmt = stmt.on_conflict_do_update(
index_elements=["portfolio_id", "description"],
set_={
"value": stmt.excluded.value,
"source": stmt.excluded.source,
"updated_at": stmt.excluded.updated_at,
},
where=table.c.source == OverrideSource.CLASSIFIER,
)
# SQLModel re-exports SQLAlchemy's ``Session.execute``; one of the
# overload signatures is marked deprecated in stubs, which fires
# here even though our INSERT path is the supported one.
self._session.execute(stmt) # pyright: ignore[reportDeprecated]

View file

@ -0,0 +1,69 @@
"""SQLModel mirror of the ``landlord_wall_type_overrides`` Drizzle table.
The schema source of truth lives in the ``assessment-model`` TS repo
(`src/app/db/schema/landlord_overrides.ts`). The migrations are owned there;
this row class only mirrors the columns so the Python lambda can read/write.
See ADR-0003. Shape mirrors ``LandlordPropertyTypeOverrideRow`` -- the only
differences are the table name, the ``wall_type`` pgEnum on ``value``, and
the unique-constraint name.
"""
from datetime import datetime, timezone
from typing import ClassVar
from uuid import UUID, uuid4
from sqlalchemy import BigInteger, Column, UniqueConstraint
from sqlalchemy import Enum as SAEnum
from sqlmodel import Field, SQLModel
from domain.landlord_description_overrides.wall_type import WallType
from infrastructure.postgres.landlord_override_enums import override_source_sa_enum
class LandlordWallTypeOverrideRow(SQLModel, table=True):
__tablename__: ClassVar[str] = "landlord_wall_type_overrides" # pyright: ignore[reportIncompatibleVariableOverride]
__table_args__: ClassVar[tuple[UniqueConstraint, ...]] = ( # pyright: ignore[reportIncompatibleVariableOverride]
UniqueConstraint(
"portfolio_id",
"description",
name="landlord_wall_type_overrides_portfolio_description_unique",
),
)
id: UUID = Field(default_factory=uuid4, primary_key=True)
# bigint to match the Drizzle ``portfolio_id`` FK; SQLModel's default int
# mapping is 32-bit Integer and would overflow once portfolio IDs exceed
# 2^31. The FK to ``portfolio.id`` is enforced by the Drizzle migration,
# not declared here -- the ``portfolio`` table is not modelled in Python.
portfolio_id: int = Field(
sa_column=Column(BigInteger, nullable=False, index=True),
)
description: str = Field(nullable=False)
value: WallType = Field(
sa_column=Column(
SAEnum(
WallType,
name="wall_type",
values_callable=lambda cls: [m.value for m in cls], # pyright: ignore[reportUnknownLambdaType, reportUnknownMemberType, reportUnknownVariableType]
),
nullable=False,
),
)
# Shared SAEnum -- see ``landlord_override_enums`` for why this single
# instance is reused by every ``landlord_*_overrides`` row class.
source: str = Field(
sa_column=Column(override_source_sa_enum, nullable=False),
)
created_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc),
nullable=False,
)
updated_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc),
nullable=False,
)

View file

@ -0,0 +1,37 @@
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Generic, TypeVar
from domain.landlord_description_overrides.column_classifier import ColumnClassifier
from repositories.landlord_overrides.landlord_override_repository import (
LandlordOverrideRepository,
)
E = TypeVar("E", bound=Enum)
@dataclass(frozen=True)
class ClassifiableColumn(Generic[E]):
"""Pairs a column's classifier with the repository that persists its results.
The orchestrator registers one ``ClassifiableColumn`` per
(source column, target enum) pair. Bundling the classifier and the
repository together makes the "this enum lands in this table" invariant
structural -- the handler can no longer wire ``PropertyType``
classifications to a ``WallType`` repo by keying two dicts with the same
string.
``source_column`` is the landlord-CSV header to read from; ``name`` is the
unique key the orchestrator uses to report this classification's results
(and the key the handler logs). Two ``ClassifiableColumn``s may share a
``source_column`` -- e.g. the ``"Property Type"`` CSV column feeds both
``PropertyType`` and ``BuiltFormType`` classifiers off the same free-text
description -- but each must have a unique ``name``.
"""
name: str
source_column: str
classifier: ColumnClassifier[E]
repo: LandlordOverrideRepository[E]

View file

@ -0,0 +1,83 @@
from enum import Enum
from typing import Any
from domain.addresses.unstandardised_address import AddressList
from orchestration.classifiable_column import ClassifiableColumn
from repositories.unstandardised_address.unstandardised_address_list_repository import (
UnstandardisedAddressListRepository,
)
class LandlordDescriptionOverridesOrchestrator:
def __init__(
self,
unstandardised_address_repo: UnstandardisedAddressListRepository,
columns: list[ClassifiableColumn[Any]],
) -> None:
self._unstandardised_address_repo = unstandardised_address_repo
# Each entry is one (source CSV column, target enum) classification.
# Two entries may share ``source_column`` -- e.g. ``"Property Type"``
# feeds both PropertyType and BuiltFormType classifiers -- so the
# registry is a list rather than a dict keyed by header.
self._columns = columns
def get_unstandardised_addresses(
self,
input_s3_uri: str,
) -> AddressList:
return self._unstandardised_address_repo.load_batch(input_s3_uri)
def get_col_to_description_mappings(
self, list_of_unstandardised_address: AddressList
) -> dict[str, set[str]]:
mappings: dict[str, set[str]] = {}
for unstandardised_address in list_of_unstandardised_address:
for key, value in unstandardised_address.additional_info.items():
bucket = mappings.setdefault(key, set())
# A comma-separated value is several descriptions in one cell;
# split it so each is its own entry. Lower-case so case-only
# typos collapse to one variant.
for variant in value.split(","):
variant = variant.strip().lower()
if variant:
bucket.add(variant)
return mappings
def classify_columns(
self, addresses: AddressList
) -> dict[str, dict[str, Enum]]:
"""Classify every registered column's descriptions.
Returns a mapping of ``ClassifiableColumn.name`` to
``{description: category}``. A registered column whose ``source_column``
is absent from the addresses contributes an empty inner mapping.
"""
col_to_desc = self.get_col_to_description_mappings(addresses)
return {
column.name: column.classifier.classify(
col_to_desc.get(column.source_column, set())
)
for column in self._columns
}
def classify_and_persist(
self, addresses: AddressList, portfolio_id: int
) -> dict[str, dict[str, Enum]]:
"""Classify every registered column and persist the results.
Each non-empty mapping is written via the column's repository under
``source = 'classifier'``. Empty mappings (a registered column whose
``source_column`` is absent from this batch) skip the DB round-trip.
The orchestrator does not commit -- the caller owns the transaction
boundary.
Returns the same shape as ``classify_columns`` so callers can log
per-column counts.
"""
classified = self.classify_columns(addresses)
for column in self._columns:
mapping = classified[column.name]
if not mapping:
continue
column.repo.upsert_all(portfolio_id, mapping)
return classified

View file

@ -1,56 +0,0 @@
from enum import Enum
from typing import Any
from domain.addresses.unstandardised_address import AddressList
from domain.sal.column_classifier import ColumnClassifier
from repositories.unstandardised_address.unstandardised_address_list_repository import (
UnstandardisedAddressListRepository,
)
class SALOrchestrator:
def __init__(
self,
unstandardised_address_repo: UnstandardisedAddressListRepository,
classifiers: dict[str, ColumnClassifier[Any]],
) -> None:
self._unstandardised_address_repo = unstandardised_address_repo
# Keyed by landlord-CSV column name.
self._classifiers = classifiers
def get_unstandardised_addresses(
self,
input_s3_uri: str,
) -> AddressList:
return self._unstandardised_address_repo.load_batch(input_s3_uri)
def get_col_to_description_mappings(
self, list_of_unstandardised_address: AddressList
) -> dict[str, set[str]]:
mappings: dict[str, set[str]] = {}
for unstandardised_address in list_of_unstandardised_address:
for key, value in unstandardised_address.additional_info.items():
bucket = mappings.setdefault(key, set())
# A comma-separated value is several descriptions in one cell;
# split it so each is its own entry. Lower-case so case-only
# typos collapse to one variant.
for variant in value.split(","):
variant = variant.strip().lower()
if variant:
bucket.add(variant)
return mappings
def classify_columns(
self, addresses: AddressList
) -> dict[str, dict[str, Enum]]:
"""Classify every registered column's descriptions.
Returns a mapping of column name to ``{description: category}``. A
registered column absent from the addresses contributes an empty
inner mapping.
"""
col_to_desc = self.get_col_to_description_mappings(addresses)
return {
column: classifier.classify(col_to_desc.get(column, set()))
for column, classifier in self._classifiers.items()
}

57
playground.py Normal file
View file

@ -0,0 +1,57 @@
"""Read a file and return unique values from a chosen column."""
from pathlib import Path
import argparse
import sys
import pandas as pd
def read_file(path: str | Path) -> pd.DataFrame:
path = Path(path)
suffix = path.suffix.lower()
if suffix == ".csv":
return pd.read_csv(path)
if suffix == ".tsv":
return pd.read_csv(path, sep="\t")
if suffix in {".xlsx", ".xls"}:
return pd.read_excel(path)
if suffix == ".parquet":
return pd.read_parquet(path)
if suffix == ".json":
return pd.read_json(path)
raise ValueError(f"Unsupported file type: {suffix}")
def get_unique(path: str | Path, column: str, dropna: bool = True) -> list:
df = read_file(Path(path))
if column not in df.columns:
raise KeyError(f"Column {column!r} not found. Available: {list(df.columns)}")
series = df[column].dropna() if dropna else df[column]
return series.unique().tolist()
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--path", default="/workspaces/model/certificates-2026.csv")
parser.add_argument("--column", nargs="walls_description")
parser.add_argument("--keep-na", action="store_true")
args, _ = parser.parse_known_args()
df = read_file(args.path)
if not args.column:
print("Available columns:")
for c in df.columns:
print(f" - {c}")
return 0
column = "roof_description"
series = df[column] if args.keep_na else df[column].dropna()
for value in series.unique():
print(value)
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,38 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum
from typing import Generic, TypeVar
E = TypeVar("E", bound=Enum)
class LandlordOverrideRepository(ABC, Generic[E]):
"""Port: persists landlord (description -> category) overrides for a portfolio.
One repository implementation targets one ``landlord_<category>_overrides``
table. The category enum ``E`` (e.g. ``PropertyType``, ``WallType``) determines
which table the adapter writes to; the orchestrator depends only on this
interface and never names a concrete table.
Concrete adapters live in ``infrastructure/`` (see ADR-0003): for example
``infrastructure/postgres/landlord_property_type_override_postgres_repository.py``.
"""
@abstractmethod
def upsert_all(
self,
portfolio_id: int,
descriptions_to_values: dict[str, E],
) -> None:
"""Upsert each ``(portfolio_id, description) -> value`` row with ``source='classifier'``.
On conflict with an existing row whose ``source = 'classifier'``, the row
is updated (value, source, updated_at). On conflict with a row whose
``source = 'user'``, the existing row is preserved -- the classifier
never overwrites a user correction. See ADR-0003 §Decision.
An empty ``descriptions_to_values`` mapping is a no-op; callers may
skip this call entirely when they have nothing to write.
"""
...

View file

@ -4,9 +4,9 @@ from typing import Optional
import pytest
from domain.sal.column_classifier import ClassificationError
from domain.sal.property_type import PropertyType
from domain.sal.wall_type import WallType
from domain.landlord_description_overrides.column_classifier import ClassificationError
from domain.landlord_description_overrides.property_type import PropertyType
from domain.landlord_description_overrides.wall_type import WallType
from infrastructure.chatgpt.chatgpt import ChatGPT
from infrastructure.chatgpt.chatgpt_column_classifier import (
ChatGptColumnClassifier,

View file

@ -4,12 +4,17 @@ from enum import Enum
from typing import Any, Optional
from domain.addresses.unstandardised_address import AddressList, UnstandardisedAddress
from domain.landlord_description_overrides.built_form_type import BuiltFormType
from domain.landlord_description_overrides.column_classifier import ColumnClassifier
from domain.landlord_description_overrides.property_type import PropertyType
from domain.landlord_description_overrides.wall_type import WallType
from domain.postcode import Postcode
from domain.sal.column_classifier import ColumnClassifier
from domain.sal.property_type import PropertyType
from domain.sal.wall_type import WallType
from orchestration.sal_orchestrator import (
SALOrchestrator,
from orchestration.classifiable_column import ClassifiableColumn
from orchestration.landlord_description_overrides_orchestrator import (
LandlordDescriptionOverridesOrchestrator,
)
from repositories.landlord_overrides.landlord_override_repository import (
LandlordOverrideRepository,
)
from repositories.unstandardised_address.unstandardised_address_list_repository import (
UnstandardisedAddressListRepository,
@ -38,6 +43,18 @@ class _StubColumnClassifier(ColumnClassifier[Enum]):
return self._result
class _StubLandlordOverrideRepository(LandlordOverrideRepository[Enum]):
"""Records every ``upsert_all`` call so tests can assert routing."""
def __init__(self) -> None:
self.calls: list[tuple[int, dict[str, Enum]]] = []
def upsert_all(
self, portfolio_id: int, descriptions_to_values: dict[str, Enum]
) -> None:
self.calls.append((portfolio_id, dict(descriptions_to_values)))
def _make_unstandardised_address(
landlord_additional_info: dict[str, str],
) -> UnstandardisedAddress:
@ -49,11 +66,25 @@ def _make_unstandardised_address(
def _orchestrator(
classifiers: Optional[dict[str, ColumnClassifier[Any]]] = None,
) -> SALOrchestrator:
return SALOrchestrator(
columns: Optional[list[ClassifiableColumn[Any]]] = None,
) -> LandlordDescriptionOverridesOrchestrator:
return LandlordDescriptionOverridesOrchestrator(
unstandardised_address_repo=_StubUnstandardisedAddressRepository(),
classifiers=classifiers or {},
columns=columns or [],
)
def _column(
name: str,
source_column: str,
classifier: ColumnClassifier[Any],
repo: Optional[LandlordOverrideRepository[Any]] = None,
) -> ClassifiableColumn[Any]:
return ClassifiableColumn(
name=name,
source_column=source_column,
classifier=classifier,
repo=repo or _StubLandlordOverrideRepository(),
)
@ -155,30 +186,140 @@ def test_classify_columns_classifies_each_registered_column() -> None:
property_types = _StubColumnClassifier(
result={"semi-detached": PropertyType.HOUSE}
)
wall_types = _StubColumnClassifier(result={"solid brick": WallType.SOLID_BRICK})
wall_types = _StubColumnClassifier(result={"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED})
# act
result = _orchestrator(
{"Property Type": property_types, "Walls": wall_types}
[
_column("property_type", "Property Type", property_types),
_column("wall_type", "Walls", wall_types),
]
).classify_columns(addresses)
# assert: each registered column was classified independently.
# assert: each registered column was classified independently, keyed by name.
assert result == {
"Property Type": {"semi-detached": PropertyType.HOUSE},
"Walls": {"solid brick": WallType.SOLID_BRICK},
"property_type": {"semi-detached": PropertyType.HOUSE},
"wall_type": {"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED},
}
def test_classify_columns_yields_empty_mapping_for_an_absent_column() -> None:
# arrange: a classifier is registered for a column the addresses lack.
# arrange: a classifier is registered for a source column the addresses lack.
addresses = AddressList([_make_unstandardised_address({"Walls": "cavity"})])
property_types = _StubColumnClassifier(result={})
# act
result = _orchestrator(
{"Property Type": property_types}
[_column("property_type", "Property Type", property_types)]
).classify_columns(addresses)
# assert: the absent column classified an empty description set.
assert result == {"Property Type": {}}
assert result == {"property_type": {}}
assert property_types.received == set()
def test_classify_columns_runs_two_classifiers_against_a_shared_source_column() -> None:
# arrange: the "Property Type" landlord column feeds two classifiers --
# PropertyType (what kind of dwelling) and BuiltFormType (how it joins
# to neighbours). Both must run against the same description set; each
# result is keyed by its column's ``name``.
addresses = AddressList(
[_make_unstandardised_address({"Property Type": "semi-detached house"})]
)
property_types = _StubColumnClassifier(
result={"semi-detached house": PropertyType.HOUSE}
)
built_form_types = _StubColumnClassifier(
result={"semi-detached house": BuiltFormType.SEMI_DETACHED}
)
# act
result = _orchestrator(
[
_column("property_type", "Property Type", property_types),
_column("built_form_type", "Property Type", built_form_types),
]
).classify_columns(addresses)
# assert: both classifiers saw the same description set, and the two
# results live under their own ``name`` keys without colliding.
assert property_types.received == {"semi-detached house"}
assert built_form_types.received == {"semi-detached house"}
assert result == {
"property_type": {"semi-detached house": PropertyType.HOUSE},
"built_form_type": {"semi-detached house": BuiltFormType.SEMI_DETACHED},
}
def test_classify_and_persist_writes_each_columns_mapping_to_its_own_repo() -> None:
# arrange: two columns with distinct repos -- the orchestrator must
# route each column's classifications to its own repo, not mix them.
addresses = AddressList(
[
_make_unstandardised_address(
{"Property Type": "semi-detached", "Walls": "solid brick"}
),
]
)
property_type_repo = _StubLandlordOverrideRepository()
wall_type_repo = _StubLandlordOverrideRepository()
columns: list[ClassifiableColumn[Any]] = [
_column(
"property_type",
"Property Type",
_StubColumnClassifier({"semi-detached": PropertyType.HOUSE}),
property_type_repo,
),
_column(
"wall_type",
"Walls",
_StubColumnClassifier({"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED}),
wall_type_repo,
),
]
# act
result = _orchestrator(columns).classify_and_persist(addresses, portfolio_id=42)
# assert: each repo received exactly its own column's mapping, under the
# given portfolio_id, and the return value mirrors classify_columns.
assert property_type_repo.calls == [(42, {"semi-detached": PropertyType.HOUSE})]
assert wall_type_repo.calls == [
(42, {"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED})
]
assert result == {
"property_type": {"semi-detached": PropertyType.HOUSE},
"wall_type": {"solid brick": WallType.SOLID_BRICK_AS_BUILT_NO_INSULATION_ASSUMED},
}
def test_classify_and_persist_skips_upsert_for_a_column_absent_from_the_batch() -> None:
# arrange: ``Walls`` is registered but the address has no ``Walls`` column.
# The orchestrator should still classify (yielding an empty mapping) but
# must NOT call ``upsert_all`` -- an empty bulk insert is a noisy no-op.
addresses = AddressList(
[_make_unstandardised_address({"Property Type": "semi-detached"})]
)
property_type_repo = _StubLandlordOverrideRepository()
wall_type_repo = _StubLandlordOverrideRepository()
columns: list[ClassifiableColumn[Any]] = [
_column(
"property_type",
"Property Type",
_StubColumnClassifier({"semi-detached": PropertyType.HOUSE}),
property_type_repo,
),
_column(
"wall_type",
"Walls",
_StubColumnClassifier({}),
wall_type_repo,
),
]
# act
_orchestrator(columns).classify_and_persist(addresses, portfolio_id=7)
# assert: Property Type wrote; Walls did not.
assert property_type_repo.calls == [(7, {"semi-detached": PropertyType.HOUSE})]
assert wall_type_repo.calls == []

View file

@ -0,0 +1,147 @@
"""Integration tests for the source-aware upsert policy.
The conflict policy lives entirely in SQL (``INSERT ... ON CONFLICT
... DO UPDATE ... WHERE existing.source = 'classifier'``). The only way to
verify it correctly distinguishes ``EXCLUDED.source`` from the qualified
``landlord_property_type_overrides.source`` is against a real Postgres --
the ``db_engine`` fixture in ``tests/conftest.py`` spins one up per test.
"""
from __future__ import annotations
from collections.abc import Iterator
import pytest
from sqlalchemy import Engine
from sqlmodel import Session, select
from domain.landlord_description_overrides.property_type import PropertyType
from infrastructure.postgres.landlord_override_enums import OverrideSource
from infrastructure.postgres.landlord_property_type_override_postgres_repository import (
LandlordPropertyTypeOverridePostgresRepository,
)
from infrastructure.postgres.landlord_property_type_override_table import (
LandlordPropertyTypeOverrideRow,
)
@pytest.fixture
def session(db_engine: Engine) -> Iterator[Session]:
with Session(db_engine) as s:
yield s
def _select_row(
session: Session, portfolio_id: int, description: str
) -> LandlordPropertyTypeOverrideRow:
rows = session.exec(
select(LandlordPropertyTypeOverrideRow).where(
LandlordPropertyTypeOverrideRow.portfolio_id == portfolio_id,
LandlordPropertyTypeOverrideRow.description == description,
)
).all()
assert len(rows) == 1, f"expected exactly one row, got {len(rows)}"
return rows[0]
def test_inserts_a_fresh_row_with_source_classifier(session: Session) -> None:
# arrange
repo = LandlordPropertyTypeOverridePostgresRepository(session)
# act
repo.upsert_all(portfolio_id=1, descriptions_to_values={"cosy": PropertyType.HOUSE})
session.commit()
# assert
row = _select_row(session, portfolio_id=1, description="cosy")
assert row.value is PropertyType.HOUSE
assert row.source == OverrideSource.CLASSIFIER
def test_reupsert_overwrites_a_classifier_row(session: Session) -> None:
# arrange: a stale classifier row exists.
repo = LandlordPropertyTypeOverridePostgresRepository(session)
repo.upsert_all(portfolio_id=1, descriptions_to_values={"cosy": PropertyType.FLAT})
session.commit()
# act: re-classify with a different category.
repo.upsert_all(portfolio_id=1, descriptions_to_values={"cosy": PropertyType.HOUSE})
session.commit()
# assert: the new classification wins.
row = _select_row(session, portfolio_id=1, description="cosy")
assert row.value is PropertyType.HOUSE
assert row.source == OverrideSource.CLASSIFIER
def test_reupsert_does_not_overwrite_a_user_row(session: Session) -> None:
# arrange: a user has corrected the row to ``BUNGALOW``. The classifier
# path never produces ``source = 'user'``; we install the row directly
# to mimic the override frontend.
user_row = LandlordPropertyTypeOverrideRow(
portfolio_id=1,
description="cosy",
value=PropertyType.BUNGALOW,
source=OverrideSource.USER,
)
session.add(user_row)
session.commit()
# act: the classifier re-runs and tries to classify the same description
# as a ``HOUSE``. Under the source-aware conflict policy, this must be
# silently skipped -- user edits beat classifier reruns.
repo = LandlordPropertyTypeOverridePostgresRepository(session)
repo.upsert_all(portfolio_id=1, descriptions_to_values={"cosy": PropertyType.HOUSE})
session.commit()
# assert: the user row is unchanged.
row = _select_row(session, portfolio_id=1, description="cosy")
assert row.value is PropertyType.BUNGALOW
assert row.source == OverrideSource.USER
def test_upsert_keeps_other_portfolios_descriptions_independent(
session: Session,
) -> None:
# arrange: the unique key is ``(portfolio_id, description)``, so the same
# description for two different portfolios must coexist as two rows.
repo = LandlordPropertyTypeOverridePostgresRepository(session)
# act
repo.upsert_all(portfolio_id=1, descriptions_to_values={"cosy": PropertyType.HOUSE})
repo.upsert_all(portfolio_id=2, descriptions_to_values={"cosy": PropertyType.FLAT})
session.commit()
# assert: both rows survive with their own values.
assert _select_row(session, 1, "cosy").value is PropertyType.HOUSE
assert _select_row(session, 2, "cosy").value is PropertyType.FLAT
def test_upsert_persists_unknown_so_a_user_can_resolve_it_later(
session: Session,
) -> None:
# arrange / act: a description the classifier couldn't resolve still
# lands -- per ADR-0002 §5 / ADR-0003 §Decision, so a future user
# override can upgrade it to a real value.
repo = LandlordPropertyTypeOverridePostgresRepository(session)
repo.upsert_all(
portfolio_id=1,
descriptions_to_values={"unparseable nonsense": PropertyType.UNKNOWN},
)
session.commit()
# assert: the row exists with value=UNKNOWN, source=classifier.
row = _select_row(session, portfolio_id=1, description="unparseable nonsense")
assert row.value is PropertyType.UNKNOWN
assert row.source == OverrideSource.CLASSIFIER
def test_upsert_all_with_empty_mapping_is_a_no_op(session: Session) -> None:
# arrange / act
repo = LandlordPropertyTypeOverridePostgresRepository(session)
repo.upsert_all(portfolio_id=1, descriptions_to_values={})
session.commit()
# assert: nothing was inserted.
rows = session.exec(select(LandlordPropertyTypeOverrideRow)).all()
assert rows == []

View file

@ -0,0 +1,158 @@
"""Integration tests for the source-aware upsert policy on the WallType table.
Mirror of ``test_landlord_property_type_override_postgres_repository.py`` --
the SQL is structurally identical, but the conflict policy lives in two
separate concrete adapters and so warrants two parallel test suites until
(if) the adapters are factored through a shared upsert helper.
"""
from __future__ import annotations
from collections.abc import Iterator
import pytest
from sqlalchemy import Engine
from sqlmodel import Session, select
from domain.landlord_description_overrides.wall_type import WallType
from infrastructure.postgres.landlord_override_enums import OverrideSource
from infrastructure.postgres.landlord_wall_type_override_postgres_repository import (
LandlordWallTypeOverridePostgresRepository,
)
from infrastructure.postgres.landlord_wall_type_override_table import (
LandlordWallTypeOverrideRow,
)
@pytest.fixture
def session(db_engine: Engine) -> Iterator[Session]:
with Session(db_engine) as s:
yield s
def _select_row(
session: Session, portfolio_id: int, description: str
) -> LandlordWallTypeOverrideRow:
rows = session.exec(
select(LandlordWallTypeOverrideRow).where(
LandlordWallTypeOverrideRow.portfolio_id == portfolio_id,
LandlordWallTypeOverrideRow.description == description,
)
).all()
assert len(rows) == 1, f"expected exactly one row, got {len(rows)}"
return rows[0]
def test_inserts_a_fresh_row_with_source_classifier(session: Session) -> None:
# arrange
repo = LandlordWallTypeOverridePostgresRepository(session)
# act
repo.upsert_all(
portfolio_id=1, descriptions_to_values={"cavity insulated": WallType.CAVITY}
)
session.commit()
# assert
row = _select_row(session, portfolio_id=1, description="cavity insulated")
assert row.value is WallType.CAVITY
assert row.source == OverrideSource.CLASSIFIER
def test_reupsert_overwrites_a_classifier_row(session: Session) -> None:
# arrange: a stale classifier row exists.
repo = LandlordWallTypeOverridePostgresRepository(session)
repo.upsert_all(
portfolio_id=1, descriptions_to_values={"old red brick": WallType.CAVITY}
)
session.commit()
# act: re-classify with a different category.
repo.upsert_all(
portfolio_id=1, descriptions_to_values={"old red brick": WallType.SOLID_BRICK}
)
session.commit()
# assert: the new classification wins.
row = _select_row(session, portfolio_id=1, description="old red brick")
assert row.value is WallType.SOLID_BRICK
assert row.source == OverrideSource.CLASSIFIER
def test_reupsert_does_not_overwrite_a_user_row(session: Session) -> None:
# arrange: a user has corrected the row to ``SANDSTONE``. The classifier
# path never produces ``source = 'user'``; we install the row directly
# to mimic the override frontend.
user_row = LandlordWallTypeOverrideRow(
portfolio_id=1,
description="old red brick",
value=WallType.SANDSTONE,
source=OverrideSource.USER,
)
session.add(user_row)
session.commit()
# act: the classifier re-runs and tries to classify the same description
# as ``SOLID_BRICK``. Under the source-aware conflict policy, this must
# be silently skipped -- user edits beat classifier reruns.
repo = LandlordWallTypeOverridePostgresRepository(session)
repo.upsert_all(
portfolio_id=1, descriptions_to_values={"old red brick": WallType.SOLID_BRICK}
)
session.commit()
# assert: the user row is unchanged.
row = _select_row(session, portfolio_id=1, description="old red brick")
assert row.value is WallType.SANDSTONE
assert row.source == OverrideSource.USER
def test_upsert_keeps_other_portfolios_descriptions_independent(
session: Session,
) -> None:
# arrange / act: the unique key is ``(portfolio_id, description)``, so the
# same description for two different portfolios must coexist as two rows.
repo = LandlordWallTypeOverridePostgresRepository(session)
repo.upsert_all(
portfolio_id=1, descriptions_to_values={"old red brick": WallType.CAVITY}
)
repo.upsert_all(
portfolio_id=2, descriptions_to_values={"old red brick": WallType.SOLID_BRICK}
)
session.commit()
# assert: both rows survive with their own values.
assert _select_row(session, 1, "old red brick").value is WallType.CAVITY
assert _select_row(session, 2, "old red brick").value is WallType.SOLID_BRICK
def test_upsert_persists_unknown_so_a_user_can_resolve_it_later(
session: Session,
) -> None:
# arrange / act: a description the classifier couldn't resolve still
# lands -- per ADR-0002 §5 / ADR-0003 §Decision, so a future user
# override can upgrade it to a real value.
repo = LandlordWallTypeOverridePostgresRepository(session)
repo.upsert_all(
portfolio_id=1,
descriptions_to_values={"unparseable wall description": WallType.UNKNOWN},
)
session.commit()
# assert: the row exists with value=UNKNOWN, source=classifier.
row = _select_row(
session, portfolio_id=1, description="unparseable wall description"
)
assert row.value is WallType.UNKNOWN
assert row.source == OverrideSource.CLASSIFIER
def test_upsert_all_with_empty_mapping_is_a_no_op(session: Session) -> None:
# arrange / act
repo = LandlordWallTypeOverridePostgresRepository(session)
repo.upsert_all(portfolio_id=1, descriptions_to_values={})
session.commit()
# assert: nothing was inserted.
rows = session.exec(select(LandlordWallTypeOverrideRow)).all()
assert rows == []