made landlord overrides sqs

This commit is contained in:
Jun-te Kim 2026-05-29 10:41:46 +00:00
parent 429e138ea7
commit 99614820b9
6 changed files with 152 additions and 80 deletions

View file

@ -1,14 +1,12 @@
import logging
import os
from typing import Any
from uuid import UUID
import boto3
from applications.landlord_description_overrides.landlord_description_overrides_trigger_body import (
LandlordDescriptionOverridesTriggerBody,
)
from domain.addresses.unstandardised_address import AddressList
from domain.landlord_description_overrides.built_form_type import BuiltFormType
from domain.landlord_description_overrides.property_type import PropertyType
from domain.landlord_description_overrides.roof_type import RoofType
@ -33,36 +31,90 @@ from infrastructure.postgres.landlord_wall_type_override_postgres_repository imp
LandlordWallTypeOverridePostgresRepository,
)
from infrastructure.s3.csv_s3_client import CsvS3Client
from infrastructure.s3.s3_uri import parse_s3_uri
from orchestration.classifiable_column import ClassifiableColumn
from orchestration.landlord_description_overrides_orchestrator import (
LandlordDescriptionOverridesOrchestrator,
)
from orchestration.task_orchestrator import TaskOrchestrator
from repositories.unstandardised_address.unstandardised_address_list_csv_s3_repository import (
UnstandardisedAddressListCsvS3Repository,
)
from utilities.aws_lambda.subtask_handler import subtask_handler
logger = logging.getLogger(__name__)
def _build_columns(
column_mapping: dict[str, str], chat_gpt: ChatGPT, session: Any
) -> list[ClassifiableColumn[Any]]:
"""One ClassifiableColumn per mapped category.
``column_mapping`` is ``{category -> source CSV header}``. One header may
feed several categories -- e.g. ``"Property Type"`` -> property_type and
built_form_type -- which falls out naturally because each is a separate
entry. Unknown categories are skipped.
"""
factories = {
"property_type": lambda src: ClassifiableColumn(
name="property_type",
source_column=src,
classifier=ChatGptColumnClassifier(
chat_gpt, PropertyType, PropertyType.UNKNOWN
),
repo=LandlordPropertyTypeOverridePostgresRepository(session),
),
"built_form_type": lambda src: ClassifiableColumn(
name="built_form_type",
source_column=src,
classifier=ChatGptColumnClassifier(
chat_gpt, BuiltFormType, BuiltFormType.UNKNOWN
),
repo=LandlordBuiltFormTypeOverridePostgresRepository(session),
),
"wall_type": lambda src: ClassifiableColumn(
name="wall_type",
source_column=src,
classifier=ChatGptColumnClassifier(
chat_gpt,
WallType,
WallType.UNKNOWN,
extra_instructions=wall_type_construction_date_prompt_hint(),
),
repo=LandlordWallTypeOverridePostgresRepository(session),
),
"roof_type": lambda src: ClassifiableColumn(
name="roof_type",
source_column=src,
classifier=ChatGptColumnClassifier(
chat_gpt, RoofType, RoofType.UNKNOWN
),
repo=LandlordRoofTypeOverridePostgresRepository(session),
),
}
columns: list[ClassifiableColumn[Any]] = []
for category, source_column in column_mapping.items():
factory = factories.get(category)
if factory is None:
logger.warning("Unknown classifier category %r; skipping.", category)
continue
columns.append(factory(source_column))
return columns
@subtask_handler()
def handler(
body: dict[str, Any],
context: Any,
) -> dict[str, list[str]]:
# TODO: replace with ``LandlordDescriptionOverridesTriggerBody.model_validate(body)``
# once this lambda is wired into the parent task pipeline via the SQS
# subtask envelope. Until then the trigger fields are hard-coded so the
# local invoker can exercise the full path. See ADR-0003 §Out of scope.
trigger = LandlordDescriptionOverridesTriggerBody(
task_id=UUID("00000000-0000-0000-0000-000000000001"),
sub_task_id=UUID("00000000-0000-0000-0000-000000000002"),
s3_uri="s3://retrofit-data-dev/bulk_onboarding_inputs/hyde2 (1).csv",
portfolio_id=730,
)
body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator
) -> dict[str, int]:
trigger = LandlordDescriptionOverridesTriggerBody.model_validate(body)
bucket = "retrofit-data-dev"
# The classifier reads the ORIGINAL upload (raw landlord headers), so the S3
# bucket comes from the trigger URI rather than a fixed env var.
bucket, _key = parse_s3_uri(trigger.s3_uri)
# boto3.client is overloaded per-service in the installed stubs; cast
# to Any so the strict-mode checker treats it as opaque.
# boto3.client is overloaded per-service in the installed stubs; cast to Any
# so the strict-mode checker treats it as opaque.
boto3_client: Any = (
boto3.client
) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
@ -73,74 +125,23 @@ def handler(
csv_client, bucket
)
# One transactional session per handler invocation: the context manager
# commits on clean exit and rolls back on exception, so the handler never
# invokes ``.commit()`` itself -- transaction semantics live in the
# infrastructure layer.
# Raw rows, not load_batch: the original upload carries the description
# columns but not the canonical address/postcode columns load_batch requires.
rows = csv_client.read_rows(trigger.s3_uri)
engine = make_engine(PostgresConfig.from_env(os.environ))
with transactional_session(engine) as session:
chat_gpt = ChatGPT()
# The "Property Type" CSV column is read by two classifiers: the
# landlord's free-text (e.g. "semi-detached house") encodes both the
# dwelling kind (PropertyType) and how it joins to neighbours
# (BuiltFormType). Each classification lands in its own table.
columns: list[ClassifiableColumn[Any]] = [
ClassifiableColumn(
name="property_type",
source_column="Property Type",
classifier=ChatGptColumnClassifier(
chat_gpt, PropertyType, PropertyType.UNKNOWN
),
repo=LandlordPropertyTypeOverridePostgresRepository(session),
),
ClassifiableColumn(
name="built_form_type",
source_column="Property Type",
classifier=ChatGptColumnClassifier(
chat_gpt, BuiltFormType, BuiltFormType.UNKNOWN
),
repo=LandlordBuiltFormTypeOverridePostgresRepository(session),
),
ClassifiableColumn(
name="wall_type",
source_column="Walls",
classifier=ChatGptColumnClassifier(
chat_gpt,
WallType,
WallType.UNKNOWN,
extra_instructions=wall_type_construction_date_prompt_hint(),
),
repo=LandlordWallTypeOverridePostgresRepository(session),
),
ClassifiableColumn(
name="roof_type",
source_column="Roofs",
classifier=ChatGptColumnClassifier(
chat_gpt, RoofType, RoofType.UNKNOWN
),
repo=LandlordRoofTypeOverridePostgresRepository(session),
),
]
columns = _build_columns(trigger.column_mapping, chat_gpt, session)
orchestrator = LandlordDescriptionOverridesOrchestrator(
unstandardised_address_repo=unstandardised_address_repo,
columns=columns,
)
addressList: AddressList = orchestrator.get_unstandardised_addresses(
input_s3_uri=trigger.s3_uri
classified = orchestrator.classify_and_persist_from_rows(
rows, portfolio_id=trigger.portfolio_id
)
# Cap the batch to the first 20 while the ChatGPT path is under test.
# Remove before wiring into the production subtask pipeline.
addressList = AddressList(addressList[:20])
classified = orchestrator.classify_and_persist(
addressList, portfolio_id=trigger.portfolio_id
)
for column, mapping in classified.items():
logger.info(
"Classified %d descriptions for column %r.", len(mapping), column
)
return {"hello": ["200"]}
counts = {name: len(mapping) for name, mapping in classified.items()}
for name, n in counts.items():
logger.info("Classified %d descriptions for column %r.", n, name)
return counts

View file

@ -13,3 +13,7 @@ class LandlordDescriptionOverridesTriggerBody(BaseModel):
# Python ``int`` is unbounded so the Pydantic side stays simple; the
# SQLModel row class pins the storage to ``BigInteger``.
portfolio_id: int
# category -> source CSV header (the classifier subset of the upload
# mapping). Defaulted so a malformed/empty message classifies nothing
# rather than failing validation.
column_mapping: dict[str, str] = {}

View file

@ -13,6 +13,7 @@ from backend.app.bulk_uploads.schema import (
CombinedResultsResponse,
CombinerTriggerRequest,
FlagsSummary,
LandlordOverridesTriggerRequest,
PostcodeSplitterTriggerRequest,
)
from backend.app.bulk_uploads.scoring import score_bucket
@ -92,6 +93,26 @@ async def trigger_combiner(req: CombinerTriggerRequest):
}
@router.post("/trigger-landlord-overrides", status_code=202)
async def trigger_landlord_overrides(req: LandlordOverridesTriggerRequest):
settings = get_settings()
try:
sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION)
response = sqs.send_message(
QueueUrl=settings.LANDLORD_OVERRIDES_SQS_URL,
MessageBody=req.model_dump_json(),
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"SQS error: {e}")
return {
"task_id": req.task_id,
"sub_task_id": req.sub_task_id,
"sqs_message_id": response.get("MessageId"),
}
@router.get("/{task_id}/combined-results", response_model=CombinedResultsResponse)
async def get_combined_results(
task_id: UUID,

View file

@ -14,6 +14,15 @@ class CombinerTriggerRequest(BaseModel):
sub_task_id: str
class LandlordOverridesTriggerRequest(BaseModel):
task_id: str
sub_task_id: str
s3_uri: str
portfolio_id: int
# category -> source CSV header (the classifier subset of the upload mapping)
column_mapping: dict[str, str]
class FlagsSummary(BaseModel):
duplicates: int
missing: int

View file

@ -42,6 +42,7 @@ class Settings(BaseSettings):
MAGICPLAN_SQS_URL: str = "changeme"
POSTCODE_SPLITTER_SQS_URL: str = "changeme"
COMBINER_SQS_URL: str = "changeme"
LANDLORD_OVERRIDES_SQS_URL: str = "changeme"
# Third parties
EPC_AUTH_TOKEN: str = "changeme"

View file

@ -81,3 +81,39 @@ class LandlordDescriptionOverridesOrchestrator:
continue
column.repo.upsert_all(portfolio_id, mapping)
return classified
def classify_and_persist_from_rows(
self, rows: list[dict[str, str]], portfolio_id: int
) -> dict[str, dict[str, Enum]]:
"""Classify + persist straight from raw CSV rows.
Unlike ``classify_and_persist``, this does not build an ``AddressList``,
so it has no canonical address/postcode requirement -- the classifier
only needs the raw description cells. Used when reading the original
landlord upload (raw headers) rather than the address-matching CSV.
"""
col_to_desc = self._descriptions_from_rows(rows)
classified = {
column.name: column.classifier.classify(
col_to_desc.get(column.source_column, set())
)
for column in self._columns
}
for column in self._columns:
mapping = classified[column.name]
if not mapping:
continue
column.repo.upsert_all(portfolio_id, mapping)
return classified
@staticmethod
def _descriptions_from_rows(rows: list[dict[str, str]]) -> dict[str, set[str]]:
mappings: dict[str, set[str]] = {}
for row in rows:
for key, value in row.items():
bucket = mappings.setdefault(key, set())
for variant in (value or "").split(","):
variant = variant.strip().lower()
if variant:
bucket.add(variant)
return mappings