refactored to allow multiple column types

This commit is contained in:
Jun-te Kim 2026-05-22 15:28:26 +00:00
parent 11a498ba4e
commit a747534f37
11 changed files with 420 additions and 153 deletions

View file

@ -1,4 +1,6 @@
import logging
from typing import Any
import boto3
from orchestration.sal_orchestrator import (
SALOrchestrator,
@ -8,6 +10,15 @@ from repositories.unstandardised_address.unstandardised_address_list_csv_s3_repo
UnstandardisedAddressListCsvS3Repository,
)
from domain.addresses.unstandardised_address import AddressList
from domain.sal.column_classifier import ColumnClassifier
from domain.sal.property_type import PropertyType
from domain.sal.wall_type import WallType
from infrastructure.chatgpt.chatgpt import ChatGPT
from infrastructure.chatgpt.chatgpt_column_classifier import (
ChatGptColumnClassifier,
)
logger = logging.getLogger(__name__)
def handler(
@ -28,32 +39,31 @@ def handler(
csv_client, bucket
)
# One ChatGPT-backed classifier per landlord-CSV column, keyed by column name.
chat_gpt = ChatGPT()
classifiers: dict[str, ColumnClassifier[Any]] = {
"Property Type": ChatGptColumnClassifier(
chat_gpt, PropertyType, PropertyType.UNKNOWN
),
"Walls": ChatGptColumnClassifier(chat_gpt, WallType, WallType.UNKNOWN),
}
sal = SALOrchestrator(
unstandardised_address_repo=unstandardised_address_repo,
classifiers=classifiers,
)
addressList: AddressList = sal.get_unstandardised_addresses(input_s3_uri=s3_uri)
column_mapping = {
# "Wall Description": "Walls",
"Property Type": "Property Type",
}
# Cap the batch to the first 20 while the ChatGPT path is under test.
addressList = AddressList(addressList[:20])
col_to_desc_map = sal.get_col_to_description_mappings(
list_of_unstandardised_address=addressList
)
classified = sal.classify_columns(addressList)
for column, mapping in classified.items():
logger.info(
"Classified %d descriptions for column %r.", len(mapping), column
)
"""
----
# TODO Property Type:
# 1) Make a small enum with all property types (5 enum)
# 2) Make an interface with ChatGPTAi to get wall field description and map it to enum
# 3) Stroe in landlord overrides
# TODO Wall Type:
# 1) Make a small enum with all property types (5 enum)
# 2) Make an interface with ChatGPTAi to get wall field description and map it to enum
# 3) Stroe in landlord overrides
---
"""
# TODO: persist `classified` to landlord overrides.
return {"hello": ["200"]}

View file

@ -0,0 +1,39 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum
from typing import Generic, TypeVar
E = TypeVar("E", bound=Enum)
class ClassificationError(Exception):
"""Raised when classifying a column's descriptions fails wholesale.
A whole-batch failure (the AI backend is unreachable, or returns a reply
that cannot be parsed) raises this. A single description that merely
cannot be resolved is not an error -- it maps to the enum's UNKNOWN member.
"""
class ColumnClassifier(ABC, Generic[E]):
"""Port: resolves free-text descriptions into a category enum ``E``.
One classifier handles one landlord-CSV column. Implementations decide
*how* the mapping is performed (an LLM, a lookup table, a rules engine);
``SALOrchestrator`` depends only on this interface.
"""
@abstractmethod
def classify(self, descriptions: set[str]) -> dict[str, E]:
"""Classify each description into a category enum member.
Every input description appears as a key in the result. A description
that cannot be resolved maps to the enum's UNKNOWN member.
Raises:
ClassificationError: If the classification call fails wholesale
(e.g. the backend is unreachable or returns an unparseable
response).
"""
...

View file

@ -14,12 +14,3 @@ class PropertyType(Enum):
MAISONETTE = "Maisonette"
PARK_HOME = "Park home"
UNKNOWN = "Unknown"
class PropertyTypeClassificationError(Exception):
"""Raised when property-type classification fails wholesale.
A whole-batch failure (the AI backend is unreachable, or returns a reply
that cannot be parsed) raises this. A single description that merely
cannot be resolved is not an error -- it maps to ``PropertyType.UNKNOWN``.
"""

View file

@ -1,27 +0,0 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from domain.sal.property_type import PropertyType
class PropertyTypeClassifier(ABC):
"""Port: resolves free-text descriptions into SAL ``PropertyType`` values.
Implementations decide *how* (an LLM, a lookup table, a rules engine);
``SALOrchestrator`` depends only on this interface.
"""
@abstractmethod
def classify(self, descriptions: set[str]) -> dict[str, PropertyType]:
"""Classify each description into a ``PropertyType``.
Every input description appears as a key in the result. A description
that cannot be resolved maps to ``PropertyType.UNKNOWN``.
Raises:
PropertyTypeClassificationError: If the classification call fails
wholesale (e.g. the backend is unreachable or returns an
unparseable response).
"""
...

15
domain/sal/wall_type.py Normal file
View file

@ -0,0 +1,15 @@
from enum import Enum
class WallType(Enum):
"""A landlord-supplied wall construction type, as resolved by the SAL context.
Mirrors the main RdSAP wall constructions. Like the SAL ``PropertyType``,
it carries an explicit ``UNKNOWN`` member for unresolvable CSV values.
"""
CAVITY = "Cavity"
SOLID_BRICK = "Solid Brick"
TIMBER_FRAME = "Timber frame"
SANDSTONE = "Sandstone"
UNKNOWN = "Unknown"

View file

@ -0,0 +1,85 @@
from __future__ import annotations
import json
from enum import Enum
from typing import Any, TypeVar
from domain.sal.column_classifier import ClassificationError, ColumnClassifier
from infrastructure.chatgpt.chatgpt import ChatGPT
from infrastructure.chatgpt.exceptions import ChatGPTClientError
E = TypeVar("E", bound=Enum)
class ChatGptColumnClassifier(ColumnClassifier[E]):
"""ColumnClassifier backed by ChatGPT, parametrised by a category enum.
The same classification path -- prompt, JSON parsing, UNKNOWN fallback --
serves any category enum; only ``category_enum`` and its ``unknown``
member differ between columns.
"""
def __init__(
self,
chat_gpt: ChatGPT,
category_enum: type[E],
unknown: E,
) -> None:
self._chat_gpt = chat_gpt
self._category_enum = category_enum
self._unknown = unknown
def classify(self, descriptions: set[str]) -> dict[str, E]:
if not descriptions:
return {}
try:
reply = self._chat_gpt.generate(
prompt=json.dumps(sorted(descriptions)),
system_prompt=self._system_prompt(),
)
except ChatGPTClientError as error:
raise ClassificationError(
f"ChatGPT classification failed for "
f"{self._category_enum.__name__}."
) from error
try:
raw: dict[str, Any] = json.loads(self._strip_code_fence(reply))
except json.JSONDecodeError as error:
raise ClassificationError(
f"ChatGPT returned a reply that is not valid JSON: {reply!r}"
) from error
return {
description: self._to_category(raw.get(description))
for description in descriptions
}
def _system_prompt(self) -> str:
categories = ", ".join(
member.value
for member in self._category_enum
if member is not self._unknown
)
return (
"Classify each free-text description into exactly one category. "
f"Categories: {categories}. "
"Reply with only a JSON object mapping each original description "
"to its category, and nothing else."
)
def _to_category(self, value: Any) -> E:
"""Map a reply value to a category member, defaulting to UNKNOWN."""
try:
return self._category_enum(value)
except ValueError:
return self._unknown
@staticmethod
def _strip_code_fence(reply: str) -> str:
"""Remove a surrounding markdown code fence, if ChatGPT added one."""
text = reply.strip()
if not text.startswith("```"):
return text
lines = text.splitlines()[1:]
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
return "\n".join(lines)

View file

@ -1,46 +0,0 @@
from __future__ import annotations
import json
from typing import Any
from domain.sal.property_type import PropertyType
from domain.sal.property_type_classifier import PropertyTypeClassifier
from infrastructure.chatgpt.chatgpt import ChatGPT
class ChatGptPropertyTypeClassifier(PropertyTypeClassifier):
"""PropertyTypeClassifier backed by the ChatGPT client."""
_CATEGORIES = ", ".join(
member.value
for member in PropertyType
if member is not PropertyType.UNKNOWN
)
_SYSTEM_PROMPT = (
"Classify each UK property description into exactly one category. "
f"Categories: {_CATEGORIES}. "
"Reply with only a JSON object mapping each original description "
"to its category, and nothing else."
)
def __init__(self, chat_gpt: ChatGPT) -> None:
self._chat_gpt = chat_gpt
def classify(self, descriptions: set[str]) -> dict[str, PropertyType]:
reply = self._chat_gpt.generate(
prompt=json.dumps(sorted(descriptions)),
system_prompt=self._SYSTEM_PROMPT,
)
raw: dict[str, Any] = json.loads(reply)
return {
description: self._to_property_type(raw[description])
for description in descriptions
}
@staticmethod
def _to_property_type(value: Any) -> PropertyType:
"""Map a reply value to a PropertyType, defaulting to UNKNOWN."""
try:
return PropertyType(value)
except ValueError:
return PropertyType.UNKNOWN

View file

@ -1,12 +1,22 @@
from enum import Enum
from typing import Any
from domain.addresses.unstandardised_address import AddressList
from domain.sal.column_classifier import ColumnClassifier
from repositories.unstandardised_address.unstandardised_address_list_repository import (
UnstandardisedAddressListRepository,
)
from domain.addresses.unstandardised_address import AddressList
class SALOrchestrator:
def __init__(self, unstandardised_address_repo: UnstandardisedAddressListRepository) -> None:
def __init__(
self,
unstandardised_address_repo: UnstandardisedAddressListRepository,
classifiers: dict[str, ColumnClassifier[Any]],
) -> None:
self._unstandardised_address_repo = unstandardised_address_repo
# Keyed by landlord-CSV column name.
self._classifiers = classifiers
def get_unstandardised_addresses(
self,
@ -20,6 +30,27 @@ class SALOrchestrator:
mappings: dict[str, set[str]] = {}
for unstandardised_address in list_of_unstandardised_address:
for key, value in unstandardised_address.additional_info.items():
# Lower-case so case-only typos collapse to one variant.
mappings.setdefault(key, set()).add(value.lower())
bucket = mappings.setdefault(key, set())
# A comma-separated value is several descriptions in one cell;
# split it so each is its own entry. Lower-case so case-only
# typos collapse to one variant.
for variant in value.split(","):
variant = variant.strip().lower()
if variant:
bucket.add(variant)
return mappings
def classify_columns(
self, addresses: AddressList
) -> dict[str, dict[str, Enum]]:
"""Classify every registered column's descriptions.
Returns a mapping of column name to ``{description: category}``. A
registered column absent from the addresses contributes an empty
inner mapping.
"""
col_to_desc = self.get_col_to_description_mappings(addresses)
return {
column: classifier.classify(col_to_desc.get(column, set()))
for column, classifier in self._classifiers.items()
}

View file

@ -0,0 +1,135 @@
from __future__ import annotations
from typing import Optional
import pytest
from domain.sal.column_classifier import ClassificationError
from domain.sal.property_type import PropertyType
from domain.sal.wall_type import WallType
from infrastructure.chatgpt.chatgpt import ChatGPT
from infrastructure.chatgpt.chatgpt_column_classifier import (
ChatGptColumnClassifier,
)
from infrastructure.chatgpt.exceptions import ChatGPTClientError
class _FakeChatGPT(ChatGPT):
"""Hand-written ChatGPT stand-in: returns a canned reply, records prompts."""
def __init__(
self,
reply: str = "{}",
error: Optional[Exception] = None,
) -> None:
self.prompts: list[str] = []
self._reply = reply
self._error = error
def generate(self, prompt: str, system_prompt: Optional[str] = None) -> str:
self.prompts.append(prompt)
if self._error is not None:
raise self._error
return self._reply
def _property_type_classifier(
chat_gpt: ChatGPT,
) -> ChatGptColumnClassifier[PropertyType]:
return ChatGptColumnClassifier(chat_gpt, PropertyType, PropertyType.UNKNOWN)
def test_classifies_description_into_its_category() -> None:
# Arrange
chat_gpt = _FakeChatGPT(reply='{"semi-detached": "House"}')
classifier = _property_type_classifier(chat_gpt)
# Act
result = classifier.classify({"semi-detached"})
# Assert
assert result == {"semi-detached": PropertyType.HOUSE}
def test_classifies_when_reply_is_wrapped_in_a_markdown_fence() -> None:
# Arrange: ChatGPT wraps the JSON in a ```json ... ``` code fence.
chat_gpt = _FakeChatGPT(reply='```json\n{"semi-detached": "House"}\n```')
classifier = _property_type_classifier(chat_gpt)
# Act
result = classifier.classify({"semi-detached"})
# Assert
assert result == {"semi-detached": PropertyType.HOUSE}
def test_unrecognised_category_maps_to_unknown() -> None:
# Arrange
chat_gpt = _FakeChatGPT(reply='{"garden shed": "Shed"}')
classifier = _property_type_classifier(chat_gpt)
# Act
result = classifier.classify({"garden shed"})
# Assert
assert result == {"garden shed": PropertyType.UNKNOWN}
def test_description_omitted_from_reply_maps_to_unknown() -> None:
# Arrange: the reply classifies one description but not the other.
chat_gpt = _FakeChatGPT(reply='{"semi-detached": "House"}')
classifier = _property_type_classifier(chat_gpt)
# Act
result = classifier.classify({"semi-detached", "TBC"})
# Assert
assert result == {
"semi-detached": PropertyType.HOUSE,
"TBC": PropertyType.UNKNOWN,
}
def test_chatgpt_failure_raises_classification_error() -> None:
# Arrange
chat_gpt = _FakeChatGPT(error=ChatGPTClientError("backend unreachable"))
classifier = _property_type_classifier(chat_gpt)
# Act / Assert
with pytest.raises(ClassificationError):
classifier.classify({"semi-detached"})
def test_non_json_reply_raises_classification_error_with_the_raw_reply() -> None:
# Arrange
chat_gpt = _FakeChatGPT(reply="sorry, I can't do that")
classifier = _property_type_classifier(chat_gpt)
# Act / Assert: the error surfaces the offending reply for diagnosis.
with pytest.raises(ClassificationError, match="sorry, I can't do that"):
classifier.classify({"semi-detached"})
def test_empty_description_set_returns_empty_without_calling_chatgpt() -> None:
# Arrange
chat_gpt = _FakeChatGPT(reply='{"unused": "House"}')
classifier = _property_type_classifier(chat_gpt)
# Act
result = classifier.classify(set())
# Assert
assert result == {}
assert chat_gpt.prompts == []
def test_classifies_with_a_different_category_enum() -> None:
# Arrange: the same adapter classifies a WallType column.
chat_gpt = _FakeChatGPT(reply='{"solid brick wall": "Solid Brick"}')
classifier = ChatGptColumnClassifier(chat_gpt, WallType, WallType.UNKNOWN)
# Act
result = classifier.classify({"solid brick wall"})
# Assert
assert result == {"solid brick wall": WallType.SOLID_BRICK}

View file

@ -1,45 +0,0 @@
from __future__ import annotations
from typing import Optional
from domain.sal.property_type import PropertyType
from infrastructure.chatgpt.chatgpt import ChatGPT
from infrastructure.chatgpt.chatgpt_property_type_classifier import (
ChatGptPropertyTypeClassifier,
)
class _FakeChatGPT(ChatGPT):
"""Hand-written ChatGPT stand-in: returns a canned reply, records prompts."""
def __init__(self, reply: str = "{}") -> None:
self.prompts: list[str] = []
self._reply = reply
def generate(self, prompt: str, system_prompt: Optional[str] = None) -> str:
self.prompts.append(prompt)
return self._reply
def test_classifies_description_into_property_type() -> None:
# Arrange
chat_gpt = _FakeChatGPT(reply='{"semi-detached": "House"}')
classifier = ChatGptPropertyTypeClassifier(chat_gpt)
# Act
result = classifier.classify({"semi-detached"})
# Assert
assert result == {"semi-detached": PropertyType.HOUSE}
def test_unrecognised_category_maps_to_unknown() -> None:
# Arrange
chat_gpt = _FakeChatGPT(reply='{"garden shed": "Shed"}')
classifier = ChatGptPropertyTypeClassifier(chat_gpt)
# Act
result = classifier.classify({"garden shed"})
# Assert
assert result == {"garden shed": PropertyType.UNKNOWN}

View file

@ -1,7 +1,13 @@
from __future__ import annotations
from enum import Enum
from typing import Any, Optional
from domain.addresses.unstandardised_address import AddressList, UnstandardisedAddress
from domain.postcode import Postcode
from domain.sal.column_classifier import ColumnClassifier
from domain.sal.property_type import PropertyType
from domain.sal.wall_type import WallType
from orchestration.sal_orchestrator import (
SALOrchestrator,
)
@ -20,7 +26,21 @@ class _StubUnstandardisedAddressRepository(UnstandardisedAddressListRepository):
raise NotImplementedError()
def _make_unstandardised_address(landlord_additional_info: dict[str, str]) -> UnstandardisedAddress:
class _StubColumnClassifier(ColumnClassifier[Enum]):
"""Records the descriptions it received; returns a canned mapping."""
def __init__(self, result: dict[str, Enum]) -> None:
self.received: Optional[set[str]] = None
self._result = result
def classify(self, descriptions: set[str]) -> dict[str, Enum]:
self.received = descriptions
return self._result
def _make_unstandardised_address(
landlord_additional_info: dict[str, str],
) -> UnstandardisedAddress:
return UnstandardisedAddress(
address="1 High St",
postcode=Postcode("AA1 1AA"),
@ -28,8 +48,13 @@ def _make_unstandardised_address(landlord_additional_info: dict[str, str]) -> Un
)
def _orchestrator() -> SALOrchestrator:
return SALOrchestrator(unstandardised_address_repo=_StubUnstandardisedAddressRepository())
def _orchestrator(
classifiers: Optional[dict[str, ColumnClassifier[Any]]] = None,
) -> SALOrchestrator:
return SALOrchestrator(
unstandardised_address_repo=_StubUnstandardisedAddressRepository(),
classifiers=classifiers or {},
)
def test_collects_every_value_per_shared_key() -> None:
@ -86,6 +111,19 @@ def test_case_only_variants_collapse_to_one() -> None:
assert mappings == {"description": {"cosy"}}
def test_comma_separated_value_splits_into_individual_entries() -> None:
# arrange: a single cell packs several descriptions, comma-separated.
addresses = AddressList(
[_make_unstandardised_address({"description": "cosy, bright, COSY"})]
)
# act
mappings = _orchestrator().get_col_to_description_mappings(addresses)
# assert: each comma-separated part is its own trimmed, lower-cased entry.
assert mappings == {"description": {"cosy", "bright"}}
def test_empty_address_list_yields_empty_mapping() -> None:
# arrange / act
mappings = _orchestrator().get_col_to_description_mappings(AddressList([]))
@ -103,3 +141,44 @@ def test_single_address_yields_single_value_per_key() -> None:
# assert
assert mappings == {"description": {"cosy"}}
def test_classify_columns_classifies_each_registered_column() -> None:
# arrange: addresses carry two classifiable columns.
addresses = AddressList(
[
_make_unstandardised_address(
{"Property Type": "semi-detached", "Walls": "solid brick"}
),
]
)
property_types = _StubColumnClassifier(
result={"semi-detached": PropertyType.HOUSE}
)
wall_types = _StubColumnClassifier(result={"solid brick": WallType.SOLID_BRICK})
# act
result = _orchestrator(
{"Property Type": property_types, "Walls": wall_types}
).classify_columns(addresses)
# assert: each registered column was classified independently.
assert result == {
"Property Type": {"semi-detached": PropertyType.HOUSE},
"Walls": {"solid brick": WallType.SOLID_BRICK},
}
def test_classify_columns_yields_empty_mapping_for_an_absent_column() -> None:
# arrange: a classifier is registered for a column the addresses lack.
addresses = AddressList([_make_unstandardised_address({"Walls": "cavity"})])
property_types = _StubColumnClassifier(result={})
# act
result = _orchestrator(
{"Property Type": property_types}
).classify_columns(addresses)
# assert: the absent column classified an empty description set.
assert result == {"Property Type": {}}
assert property_types.received == set()