From 99614820b98ed35c9a3c4e2e4d7c3d1e1b6216b3 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Fri, 29 May 2026 10:41:46 +0000 Subject: [PATCH] made landlord overrides sqs --- .../landlord_description_overrides/handler.py | 161 +++++++++--------- ...lord_description_overrides_trigger_body.py | 4 + backend/app/bulk_uploads/router.py | 21 +++ backend/app/bulk_uploads/schema.py | 9 + backend/app/config.py | 1 + ...lord_description_overrides_orchestrator.py | 36 ++++ 6 files changed, 152 insertions(+), 80 deletions(-) diff --git a/applications/landlord_description_overrides/handler.py b/applications/landlord_description_overrides/handler.py index 7b7b60af..801d1f12 100644 --- a/applications/landlord_description_overrides/handler.py +++ b/applications/landlord_description_overrides/handler.py @@ -1,14 +1,12 @@ import logging import os from typing import Any -from uuid import UUID import boto3 from applications.landlord_description_overrides.landlord_description_overrides_trigger_body import ( LandlordDescriptionOverridesTriggerBody, ) -from domain.addresses.unstandardised_address import AddressList from domain.landlord_description_overrides.built_form_type import BuiltFormType from domain.landlord_description_overrides.property_type import PropertyType from domain.landlord_description_overrides.roof_type import RoofType @@ -33,36 +31,90 @@ from infrastructure.postgres.landlord_wall_type_override_postgres_repository imp LandlordWallTypeOverridePostgresRepository, ) from infrastructure.s3.csv_s3_client import CsvS3Client +from infrastructure.s3.s3_uri import parse_s3_uri from orchestration.classifiable_column import ClassifiableColumn from orchestration.landlord_description_overrides_orchestrator import ( LandlordDescriptionOverridesOrchestrator, ) +from orchestration.task_orchestrator import TaskOrchestrator from repositories.unstandardised_address.unstandardised_address_list_csv_s3_repository import ( UnstandardisedAddressListCsvS3Repository, ) +from utilities.aws_lambda.subtask_handler import subtask_handler logger = logging.getLogger(__name__) +def _build_columns( + column_mapping: dict[str, str], chat_gpt: ChatGPT, session: Any +) -> list[ClassifiableColumn[Any]]: + """One ClassifiableColumn per mapped category. + + ``column_mapping`` is ``{category -> source CSV header}``. One header may + feed several categories -- e.g. ``"Property Type"`` -> property_type and + built_form_type -- which falls out naturally because each is a separate + entry. Unknown categories are skipped. + """ + factories = { + "property_type": lambda src: ClassifiableColumn( + name="property_type", + source_column=src, + classifier=ChatGptColumnClassifier( + chat_gpt, PropertyType, PropertyType.UNKNOWN + ), + repo=LandlordPropertyTypeOverridePostgresRepository(session), + ), + "built_form_type": lambda src: ClassifiableColumn( + name="built_form_type", + source_column=src, + classifier=ChatGptColumnClassifier( + chat_gpt, BuiltFormType, BuiltFormType.UNKNOWN + ), + repo=LandlordBuiltFormTypeOverridePostgresRepository(session), + ), + "wall_type": lambda src: ClassifiableColumn( + name="wall_type", + source_column=src, + classifier=ChatGptColumnClassifier( + chat_gpt, + WallType, + WallType.UNKNOWN, + extra_instructions=wall_type_construction_date_prompt_hint(), + ), + repo=LandlordWallTypeOverridePostgresRepository(session), + ), + "roof_type": lambda src: ClassifiableColumn( + name="roof_type", + source_column=src, + classifier=ChatGptColumnClassifier( + chat_gpt, RoofType, RoofType.UNKNOWN + ), + repo=LandlordRoofTypeOverridePostgresRepository(session), + ), + } + + columns: list[ClassifiableColumn[Any]] = [] + for category, source_column in column_mapping.items(): + factory = factories.get(category) + if factory is None: + logger.warning("Unknown classifier category %r; skipping.", category) + continue + columns.append(factory(source_column)) + return columns + + +@subtask_handler() def handler( - body: dict[str, Any], - context: Any, -) -> dict[str, list[str]]: - # TODO: replace with ``LandlordDescriptionOverridesTriggerBody.model_validate(body)`` - # once this lambda is wired into the parent task pipeline via the SQS - # subtask envelope. Until then the trigger fields are hard-coded so the - # local invoker can exercise the full path. See ADR-0003 §Out of scope. - trigger = LandlordDescriptionOverridesTriggerBody( - task_id=UUID("00000000-0000-0000-0000-000000000001"), - sub_task_id=UUID("00000000-0000-0000-0000-000000000002"), - s3_uri="s3://retrofit-data-dev/bulk_onboarding_inputs/hyde2 (1).csv", - portfolio_id=730, - ) + body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator +) -> dict[str, int]: + trigger = LandlordDescriptionOverridesTriggerBody.model_validate(body) - bucket = "retrofit-data-dev" + # The classifier reads the ORIGINAL upload (raw landlord headers), so the S3 + # bucket comes from the trigger URI rather than a fixed env var. + bucket, _key = parse_s3_uri(trigger.s3_uri) - # boto3.client is overloaded per-service in the installed stubs; cast - # to Any so the strict-mode checker treats it as opaque. + # boto3.client is overloaded per-service in the installed stubs; cast to Any + # so the strict-mode checker treats it as opaque. boto3_client: Any = ( boto3.client ) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] @@ -73,74 +125,23 @@ def handler( csv_client, bucket ) - # One transactional session per handler invocation: the context manager - # commits on clean exit and rolls back on exception, so the handler never - # invokes ``.commit()`` itself -- transaction semantics live in the - # infrastructure layer. + # Raw rows, not load_batch: the original upload carries the description + # columns but not the canonical address/postcode columns load_batch requires. + rows = csv_client.read_rows(trigger.s3_uri) + engine = make_engine(PostgresConfig.from_env(os.environ)) with transactional_session(engine) as session: chat_gpt = ChatGPT() - # The "Property Type" CSV column is read by two classifiers: the - # landlord's free-text (e.g. "semi-detached house") encodes both the - # dwelling kind (PropertyType) and how it joins to neighbours - # (BuiltFormType). Each classification lands in its own table. - columns: list[ClassifiableColumn[Any]] = [ - ClassifiableColumn( - name="property_type", - source_column="Property Type", - classifier=ChatGptColumnClassifier( - chat_gpt, PropertyType, PropertyType.UNKNOWN - ), - repo=LandlordPropertyTypeOverridePostgresRepository(session), - ), - ClassifiableColumn( - name="built_form_type", - source_column="Property Type", - classifier=ChatGptColumnClassifier( - chat_gpt, BuiltFormType, BuiltFormType.UNKNOWN - ), - repo=LandlordBuiltFormTypeOverridePostgresRepository(session), - ), - ClassifiableColumn( - name="wall_type", - source_column="Walls", - classifier=ChatGptColumnClassifier( - chat_gpt, - WallType, - WallType.UNKNOWN, - extra_instructions=wall_type_construction_date_prompt_hint(), - ), - repo=LandlordWallTypeOverridePostgresRepository(session), - ), - ClassifiableColumn( - name="roof_type", - source_column="Roofs", - classifier=ChatGptColumnClassifier( - chat_gpt, RoofType, RoofType.UNKNOWN - ), - repo=LandlordRoofTypeOverridePostgresRepository(session), - ), - ] - + columns = _build_columns(trigger.column_mapping, chat_gpt, session) orchestrator = LandlordDescriptionOverridesOrchestrator( unstandardised_address_repo=unstandardised_address_repo, columns=columns, ) - - addressList: AddressList = orchestrator.get_unstandardised_addresses( - input_s3_uri=trigger.s3_uri + classified = orchestrator.classify_and_persist_from_rows( + rows, portfolio_id=trigger.portfolio_id ) - # Cap the batch to the first 20 while the ChatGPT path is under test. - # Remove before wiring into the production subtask pipeline. - addressList = AddressList(addressList[:20]) - - classified = orchestrator.classify_and_persist( - addressList, portfolio_id=trigger.portfolio_id - ) - for column, mapping in classified.items(): - logger.info( - "Classified %d descriptions for column %r.", len(mapping), column - ) - - return {"hello": ["200"]} + counts = {name: len(mapping) for name, mapping in classified.items()} + for name, n in counts.items(): + logger.info("Classified %d descriptions for column %r.", n, name) + return counts diff --git a/applications/landlord_description_overrides/landlord_description_overrides_trigger_body.py b/applications/landlord_description_overrides/landlord_description_overrides_trigger_body.py index 9f78215e..0ca80ec3 100644 --- a/applications/landlord_description_overrides/landlord_description_overrides_trigger_body.py +++ b/applications/landlord_description_overrides/landlord_description_overrides_trigger_body.py @@ -13,3 +13,7 @@ class LandlordDescriptionOverridesTriggerBody(BaseModel): # Python ``int`` is unbounded so the Pydantic side stays simple; the # SQLModel row class pins the storage to ``BigInteger``. portfolio_id: int + # category -> source CSV header (the classifier subset of the upload + # mapping). Defaulted so a malformed/empty message classifies nothing + # rather than failing validation. + column_mapping: dict[str, str] = {} diff --git a/backend/app/bulk_uploads/router.py b/backend/app/bulk_uploads/router.py index 9928b456..c050b18c 100644 --- a/backend/app/bulk_uploads/router.py +++ b/backend/app/bulk_uploads/router.py @@ -13,6 +13,7 @@ from backend.app.bulk_uploads.schema import ( CombinedResultsResponse, CombinerTriggerRequest, FlagsSummary, + LandlordOverridesTriggerRequest, PostcodeSplitterTriggerRequest, ) from backend.app.bulk_uploads.scoring import score_bucket @@ -92,6 +93,26 @@ async def trigger_combiner(req: CombinerTriggerRequest): } +@router.post("/trigger-landlord-overrides", status_code=202) +async def trigger_landlord_overrides(req: LandlordOverridesTriggerRequest): + settings = get_settings() + + try: + sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION) + response = sqs.send_message( + QueueUrl=settings.LANDLORD_OVERRIDES_SQS_URL, + MessageBody=req.model_dump_json(), + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"SQS error: {e}") + + return { + "task_id": req.task_id, + "sub_task_id": req.sub_task_id, + "sqs_message_id": response.get("MessageId"), + } + + @router.get("/{task_id}/combined-results", response_model=CombinedResultsResponse) async def get_combined_results( task_id: UUID, diff --git a/backend/app/bulk_uploads/schema.py b/backend/app/bulk_uploads/schema.py index ca3b39ea..af797cac 100644 --- a/backend/app/bulk_uploads/schema.py +++ b/backend/app/bulk_uploads/schema.py @@ -14,6 +14,15 @@ class CombinerTriggerRequest(BaseModel): sub_task_id: str +class LandlordOverridesTriggerRequest(BaseModel): + task_id: str + sub_task_id: str + s3_uri: str + portfolio_id: int + # category -> source CSV header (the classifier subset of the upload mapping) + column_mapping: dict[str, str] + + class FlagsSummary(BaseModel): duplicates: int missing: int diff --git a/backend/app/config.py b/backend/app/config.py index fcfb6d5b..f969518d 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -42,6 +42,7 @@ class Settings(BaseSettings): MAGICPLAN_SQS_URL: str = "changeme" POSTCODE_SPLITTER_SQS_URL: str = "changeme" COMBINER_SQS_URL: str = "changeme" + LANDLORD_OVERRIDES_SQS_URL: str = "changeme" # Third parties EPC_AUTH_TOKEN: str = "changeme" diff --git a/orchestration/landlord_description_overrides_orchestrator.py b/orchestration/landlord_description_overrides_orchestrator.py index 389d1afb..6203b8d5 100644 --- a/orchestration/landlord_description_overrides_orchestrator.py +++ b/orchestration/landlord_description_overrides_orchestrator.py @@ -81,3 +81,39 @@ class LandlordDescriptionOverridesOrchestrator: continue column.repo.upsert_all(portfolio_id, mapping) return classified + + def classify_and_persist_from_rows( + self, rows: list[dict[str, str]], portfolio_id: int + ) -> dict[str, dict[str, Enum]]: + """Classify + persist straight from raw CSV rows. + + Unlike ``classify_and_persist``, this does not build an ``AddressList``, + so it has no canonical address/postcode requirement -- the classifier + only needs the raw description cells. Used when reading the original + landlord upload (raw headers) rather than the address-matching CSV. + """ + col_to_desc = self._descriptions_from_rows(rows) + classified = { + column.name: column.classifier.classify( + col_to_desc.get(column.source_column, set()) + ) + for column in self._columns + } + for column in self._columns: + mapping = classified[column.name] + if not mapping: + continue + column.repo.upsert_all(portfolio_id, mapping) + return classified + + @staticmethod + def _descriptions_from_rows(rows: list[dict[str, str]]) -> dict[str, set[str]]: + mappings: dict[str, set[str]] = {} + for row in rows: + for key, value in row.items(): + bucket = mappings.setdefault(key, set()) + for variant in (value or "").split(","): + variant = variant.strip().lower() + if variant: + bucket.add(variant) + return mappings