diff --git a/applications/bulk_upload_finaliser/Dockerfile b/applications/bulk_upload_finaliser/Dockerfile new file mode 100644 index 00000000..6fa80c48 --- /dev/null +++ b/applications/bulk_upload_finaliser/Dockerfile @@ -0,0 +1,35 @@ +FROM public.ecr.aws/lambda/python:3.11 + +# Postgres host/port/database are baked into the image at build time from the +# deploy workflow's --build-arg values (GitHub Actions DEV_DB_* secrets), +# mirroring the landlord_description_overrides Dockerfile. They map onto the +# POSTGRES_* names PostgresConfig.from_env reads. Username/password are NOT baked +# in -- Terraform injects those as Lambda env vars from Secrets Manager. +ARG DEV_DB_HOST +ARG DEV_DB_PORT +ARG DEV_DB_NAME + +ENV POSTGRES_HOST=${DEV_DB_HOST} +ENV POSTGRES_PORT=${DEV_DB_PORT} +ENV POSTGRES_DATABASE=${DEV_DB_NAME} + +WORKDIR /var/task + +COPY applications/bulk_upload_finaliser/requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# DDD-shaped packages only -- no pandas, no legacy backend/. The finaliser writes +# both `property` and the terminal `bulk_address_uploads` status through DDD repos +# on its own PostgresConfig session (ADR-0013). +COPY domain/ domain/ +COPY infrastructure/ infrastructure/ +COPY orchestration/ orchestration/ +COPY repositories/ repositories/ +COPY utilities/ utilities/ +COPY applications/ applications/ + +# Place the handler at the Lambda task root so the runtime resolves +# ``main.handler`` without an extra package prefix. +COPY applications/bulk_upload_finaliser/handler.py /var/task/main.py + +CMD ["main.handler"] diff --git a/applications/bulk_upload_finaliser/bulk_upload_finaliser_trigger_body.py b/applications/bulk_upload_finaliser/bulk_upload_finaliser_trigger_body.py new file mode 100644 index 00000000..086ca291 --- /dev/null +++ b/applications/bulk_upload_finaliser/bulk_upload_finaliser_trigger_body.py @@ -0,0 +1,22 @@ +from uuid import UUID + +from pydantic import BaseModel, ConfigDict + + +class BulkUploadFinaliserTriggerBody(BaseModel): + """Trigger body for the bulk_upload_finaliser Lambda (ADR-0013). + + Dispatched by the Next.js Finalise action via + ``POST /v1/bulk-uploads/trigger-finaliser``. ``s3_uri`` is the combiner output + (``combined_output_s3_uri``) — the same address/UPRN CSV the old synchronous + ``/finalize`` route read. + """ + + model_config = ConfigDict(extra="allow") + + task_id: UUID + sub_task_id: UUID + s3_uri: str + # bigint in the FE schema; Python int is unbounded so Pydantic stays simple. + portfolio_id: int + bulk_upload_id: UUID diff --git a/applications/bulk_upload_finaliser/handler.py b/applications/bulk_upload_finaliser/handler.py new file mode 100644 index 00000000..72894fa0 --- /dev/null +++ b/applications/bulk_upload_finaliser/handler.py @@ -0,0 +1,103 @@ +"""bulk_upload_finaliser Lambda (ADR-0013). + +Replaces the synchronous Next.js ``/finalize`` property insert. Thin wiring: parse +the trigger, read the combiner output CSV from S3, hand the rows to the +``BulkUploadFinaliserOrchestrator`` (which owns the resolution + persist), then +write the terminal BulkUpload status directly (ADR-0005 hands terminal ownership to +the backend). ``complete`` is written in the *same* transaction as the property +insert (atomic finalise); ``failed`` is written on a fresh session on error. + +PostgresConfig-only, like the landlord classifier Lambda — no legacy ``backend/`` +connection — so a single DB config (POSTGRES_*) drives the whole run. +""" + +import logging +import os +from typing import Any +from uuid import UUID + +import boto3 +from sqlalchemy.engine import Engine + +from applications.bulk_upload_finaliser.bulk_upload_finaliser_trigger_body import ( + BulkUploadFinaliserTriggerBody, +) +from infrastructure.postgres.config import PostgresConfig +from infrastructure.postgres.engine import commit_scope, make_engine, make_session +from infrastructure.s3.csv_s3_client import CsvS3Client +from infrastructure.s3.s3_uri import parse_s3_uri +from orchestration.bulk_upload_finaliser_orchestrator import ( + BulkUploadFinaliserOrchestrator, +) +from orchestration.task_orchestrator import TaskOrchestrator +from repositories.bulk_upload.bulk_upload_status_writer_postgres import ( + BulkUploadStatusWriterPostgresRepository, +) +from repositories.property.property_identity_writer_postgres import ( + PropertyIdentityWriterPostgresRepository, +) +from utilities.aws_lambda.subtask_handler import subtask_handler + +logger = logging.getLogger(__name__) + + +def _run(engine: Engine, trigger: BulkUploadFinaliserTriggerBody) -> int: + bucket, _key = parse_s3_uri(trigger.s3_uri) + + boto3_client: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + boto_s3: Any = boto3_client("s3") + rows = CsvS3Client(boto_s3, bucket).read_rows(trigger.s3_uri) + + session = make_session(engine) + try: + orchestrator = BulkUploadFinaliserOrchestrator( + property_writer=PropertyIdentityWriterPostgresRepository(session) + ) + status_writer = BulkUploadStatusWriterPostgresRepository(session) + # Resolution is pure, so run it before opening the transaction. + inserts = orchestrator.to_property_rows(rows, trigger.portfolio_id) + # Atomic finalise: insert properties and mark `complete` together — a + # failure in either rolls back both, leaving the row for the failure path. + with commit_scope(session): + inserted = orchestrator.persist(inserts) + status_writer.set_status(trigger.task_id, "complete") + finally: + session.close() + + logger.info( + "Finalised bulk upload %s: %d rows read, %d properties inserted.", + trigger.bulk_upload_id, + len(rows), + inserted, + ) + return inserted + + +def _mark_failed(engine: Engine, task_id: UUID) -> None: + session = make_session(engine) + try: + with commit_scope(session): + BulkUploadStatusWriterPostgresRepository(session).set_status( + task_id, "failed" + ) + finally: + session.close() + + +@subtask_handler() +def handler( + body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator +) -> dict[str, int]: + trigger = BulkUploadFinaliserTriggerBody.model_validate(body) + engine = make_engine(PostgresConfig.from_env(os.environ)) + + try: + inserted = _run(engine, trigger) + except Exception: + # Hand the BulkUpload to the terminal `failed` state so the UI leaves + # `finalising`; the @subtask_handler also marks the SubTask FAILED on the + # re-raise below. + _mark_failed(engine, trigger.task_id) + raise + + return {"inserted": inserted} diff --git a/applications/bulk_upload_finaliser/requirements.txt b/applications/bulk_upload_finaliser/requirements.txt new file mode 100644 index 00000000..6a85a255 --- /dev/null +++ b/applications/bulk_upload_finaliser/requirements.txt @@ -0,0 +1,4 @@ +boto3 +pydantic +sqlmodel +psycopg2-binary diff --git a/backend/app/bulk_uploads/router.py b/backend/app/bulk_uploads/router.py index c050b18c..ea02fd80 100644 --- a/backend/app/bulk_uploads/router.py +++ b/backend/app/bulk_uploads/router.py @@ -12,6 +12,7 @@ from backend.app.bulk_uploads.schema import ( CombinedResultRow, CombinedResultsResponse, CombinerTriggerRequest, + FinaliserTriggerRequest, FlagsSummary, LandlordOverridesTriggerRequest, PostcodeSplitterTriggerRequest, @@ -113,6 +114,26 @@ async def trigger_landlord_overrides(req: LandlordOverridesTriggerRequest): } +@router.post("/trigger-finaliser", status_code=202) +async def trigger_finaliser(req: FinaliserTriggerRequest): + settings = get_settings() + + try: + sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION) + response = sqs.send_message( + QueueUrl=settings.FINALISER_SQS_URL, + MessageBody=req.model_dump_json(), + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"SQS error: {e}") + + return { + "task_id": req.task_id, + "sub_task_id": req.sub_task_id, + "sqs_message_id": response.get("MessageId"), + } + + @router.get("/{task_id}/combined-results", response_model=CombinedResultsResponse) async def get_combined_results( task_id: UUID, diff --git a/backend/app/bulk_uploads/schema.py b/backend/app/bulk_uploads/schema.py index af797cac..759f76ca 100644 --- a/backend/app/bulk_uploads/schema.py +++ b/backend/app/bulk_uploads/schema.py @@ -23,6 +23,14 @@ class LandlordOverridesTriggerRequest(BaseModel): column_mapping: dict[str, str] +class FinaliserTriggerRequest(BaseModel): + task_id: str + sub_task_id: str + s3_uri: str # combiner output (combined_output_s3_uri) + portfolio_id: int + bulk_upload_id: str + + class FlagsSummary(BaseModel): duplicates: int missing: int diff --git a/backend/app/config.py b/backend/app/config.py index f969518d..36fb36a8 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -43,6 +43,7 @@ class Settings(BaseSettings): POSTCODE_SPLITTER_SQS_URL: str = "changeme" COMBINER_SQS_URL: str = "changeme" LANDLORD_OVERRIDES_SQS_URL: str = "changeme" + FINALISER_SQS_URL: str = "changeme" # Third parties EPC_AUTH_TOKEN: str = "changeme" diff --git a/backend/ordnanceSurvey/helpers.py b/backend/ordnanceSurvey/helpers.py index c0d6583b..52fbe9c5 100644 --- a/backend/ordnanceSurvey/helpers.py +++ b/backend/ordnanceSurvey/helpers.py @@ -1,4 +1,5 @@ import urllib.parse +from typing import Any from pydantic import ValidationError import requests import pandas as pd @@ -7,13 +8,13 @@ from utils.logger import setup_logger logger = setup_logger() -def os_places_results_to_dataframe(data: dict) -> pd.DataFrame: +def os_places_results_to_dataframe(data: dict[str, Any]) -> pd.DataFrame: """ Flatten the OS Places API response results into a DataFrame. Each result contains either a DPA or LPI record. """ - results = data.get("results", []) - rows = [] + results: list[dict[str, Any]] = data.get("results", []) + rows: list[dict[str, Any]] = [] for r in results: if "DPA" in r: rows.append(r["DPA"]) @@ -22,7 +23,7 @@ def os_places_results_to_dataframe(data: dict) -> pd.DataFrame: return pd.DataFrame(rows) -def lookup_os_places(postcode: str, api_key: str) -> dict: +def lookup_os_places(postcode: str, api_key: str) -> dict[str, Any]: """ Lookup a postcode using the OS Places API. Returns the full API response data or an error dict. diff --git a/backend/ordnanceSurvey/local_handler/.env.local.example b/backend/ordnanceSurvey/local_handler/.env.local.example new file mode 100644 index 00000000..b1f0330e --- /dev/null +++ b/backend/ordnanceSurvey/local_handler/.env.local.example @@ -0,0 +1,21 @@ +# Runtime env for the Ordnance Survey handler when run locally via docker compose. +# Variable names must match backend/app/config.py Settings (DB_*, ORDNANCE_SURVEY_API_KEY) +# plus the AWS creds boto3 needs for S3 access. + +ENVIRONMENT=local + +# Database (OS Places postcode cache) +DB_HOST= +DB_PORT=5432 +DB_NAME= +DB_USERNAME= +DB_PASSWORD= + +# Ordnance Survey Places API +ORDNANCE_SURVEY_API_KEY= + +# AWS — read input CSV from / write results to S3 +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= +AWS_DEFAULT_REGION=eu-west-2 +S3_BUCKET_NAME=retrofit-data-dev diff --git a/backend/ordnanceSurvey/local_handler/run_local.sh b/backend/ordnanceSurvey/local_handler/run_local.sh new file mode 100755 index 00000000..345b60ee --- /dev/null +++ b/backend/ordnanceSurvey/local_handler/run_local.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash +set -euo pipefail +cd "$(dirname "$0")" + +if [ ! -f .env.local ]; then + cp .env.local.example .env.local + echo "Created .env.local from the template — fill it in, then re-run." >&2 + exit 1 +fi + +docker compose build --no-cache +docker compose up --force-recreate diff --git a/backend/ordnanceSurvey/main.py b/backend/ordnanceSurvey/main.py index 6e82b468..03020307 100644 --- a/backend/ordnanceSurvey/main.py +++ b/backend/ordnanceSurvey/main.py @@ -22,6 +22,7 @@ import uuid import os import pandas as pd +from tqdm import tqdm logger: logging.Logger = setup_logger() @@ -152,8 +153,10 @@ def handler(body: dict[str, Any], context: Any, local: bool = False) -> None: df["ordnance_survey_lexiscore"] = None # Process each postcode group at a time - for postcode, group in grouped: - print(f"Processing postcode: {postcode} ({len(group)} rows)") + for postcode, group in tqdm( + grouped, total=grouped.ngroups, desc="OS postcodes", unit="postcode" + ): + tqdm.write(f"Processing postcode: {postcode} ({len(group)} rows)") valid_group = AddressMatch.is_valid_postcode(postcode) if not valid_group: logger.warning(f"Postcode {postcode} is invalid, skipping") diff --git a/backend/ordnanceSurvey/tests/test_os_match.py b/backend/ordnanceSurvey/tests/test_os_match.py new file mode 100644 index 00000000..b2d63689 --- /dev/null +++ b/backend/ordnanceSurvey/tests/test_os_match.py @@ -0,0 +1,153 @@ +# backend/ordnanceSurvey/tests/test_os_match.py +""" +Debug harness for Ordnance Survey address matching. + +Mirrors backend/address2UPRN/tests/test_csv.py, but for the OS Places flow: +for each (User Input, Postcode) case it hits the live OS Places API, scores every +candidate address with AddressMatch.score (exactly as backend/ordnanceSurvey/main.py +does), and prints the full ranked breakdown so you can see *why* a match was or +wasn't found. + +Run with -s to see the ranking: + + pytest backend/ordnanceSurvey/tests/test_os_match.py -s + +Requires ORDNANCE_SURVEY_API_KEY to be set (config Settings / env); skipped otherwise. +""" + +import csv +import os +import time +from pathlib import Path +from typing import Any, Optional + +import pytest + +from backend.ordnanceSurvey.helpers import ( + lookup_os_places, + os_places_results_to_dataframe, +) +from backend.utils.addressMatch import AddressMatch + +FIXTURE_PATH = Path(__file__).parent / "test_data.csv" + +# Be polite to the live OS Places API between cases. +OS_THROTTLE_SECONDS = 1.0 + +# Handler treats best_score <= 0 as "no match" (see ordnanceSurvey/main.py). +MATCH_THRESHOLD = 0.0 + + +@pytest.fixture(autouse=True) +def _throttle_os_requests(): + yield + time.sleep(OS_THROTTLE_SECONDS) + + +def _api_key() -> Optional[str]: + # Read straight from the environment so the debug harness doesn't depend on + # the full Settings model loading cleanly. Falls back to Settings if unset. + key = os.getenv("ORDNANCE_SURVEY_API_KEY") + if not key: + try: + from backend.app.config import get_settings + + key = get_settings().ORDNANCE_SURVEY_API_KEY + except Exception: + key = None + if not key or key == "changeme": + return None + return key + + +def load_test_cases(): + with open(FIXTURE_PATH, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + return [ + pytest.param( + row["User Input"], + row["Postcode"], + (row.get("Expected UPRN") or "").strip(), + id=f'{row["User Input"]} [{row["Postcode"]}]', + ) + for row in reader + ] + + +def _scored_candidates( + user_input: str, postcode: str, api_key: str +) -> list[dict[str, Any]]: + """ + Fetch OS Places candidates for a postcode (bypassing the DB cache) and score + every candidate ADDRESS exactly as ordnanceSurvey/main.py does. Returned + ranked best-first. + """ + response: dict[str, Any] = lookup_os_places(postcode, api_key) + assert response.get("status") == 200, f"OS Places API failed: {response}" + assert "data" in response, f"No data in OS Places response: {response}" + + candidates = os_places_results_to_dataframe(response["data"]) + records: list[dict[str, Any]] = candidates.to_dict("records") # type: ignore[assignment] + + scored: list[dict[str, Any]] = [] + for rec in records: + address = str(rec.get("ADDRESS", "")) + scored.append( + { + "uprn": rec.get("UPRN", "?"), + "address": address, + "normalised": AddressMatch.normalise_address(address), + "score": AddressMatch.score(user_input, address), + } + ) + scored.sort(key=lambda r: r["score"], reverse=True) + return scored + + +def _print_debug( + user_input: str, postcode: str, scored: list[dict[str, Any]] +) -> None: + print(f"\n{'=' * 80}") + print(f"User input : {user_input!r}") + print(f"Normalised : {AddressMatch.normalise_address(user_input)!r}") + print(f"Postcode : {postcode!r}") + print(f"Candidates : {len(scored)}") + print(f"{'-' * 80}") + if not scored: + print("(no OS Places candidates returned for this postcode)") + return + for row in scored[:15]: + print(f" score={row['score']:.4f} uprn={row['uprn']}") + print(f" ADDRESS : {row['address']}") + print(f" normalised : {row['normalised']}") + print(f"{'=' * 80}") + + +@pytest.mark.integration +@pytest.mark.parametrize("user_input,postcode,expected_uprn", load_test_cases()) +def test_os_match_finds_candidate( + user_input: str, + postcode: str, + expected_uprn: str, +): + api_key = _api_key() + if api_key is None: + pytest.skip("ORDNANCE_SURVEY_API_KEY not set") + + scored = _scored_candidates(user_input, postcode, api_key) + _print_debug(user_input, postcode, scored) + + best = scored[0] if scored else None + best_score = float(best["score"]) if best is not None else 0.0 + + # The handler records a match only when best_score > 0. This assertion is + # the debug signal: when it fails, the printed ranking above shows why. + assert best is not None and best_score > MATCH_THRESHOLD, ( + f"No OS match for {user_input!r} @ {postcode!r} " + f"(best_score={best_score:.4f}). See ranking above." + ) + + if expected_uprn: + assert str(best["uprn"]) == expected_uprn, ( + f"Best match UPRN {best['uprn']!r} != expected {expected_uprn!r}" + ) diff --git a/deployment/terraform/lambda/bulkUploadFinaliser/main.tf b/deployment/terraform/lambda/bulkUploadFinaliser/main.tf new file mode 100644 index 00000000..0d6685d0 --- /dev/null +++ b/deployment/terraform/lambda/bulkUploadFinaliser/main.tf @@ -0,0 +1,49 @@ +data "terraform_remote_state" "shared" { + backend = "s3" + config = { + bucket = "assessment-model-terraform-state" + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} + +data "aws_secretsmanager_secret_version" "db_credentials" { + secret_id = "${var.stage}/assessment_model/db_credentials" +} + +locals { + db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string) +} + +module "lambda" { + source = "../../modules/lambda_with_sqs" + + name = "bulk-upload-finaliser" + stage = var.stage + + image_uri = local.image_uri + + # The finaliser reads the combiner CSV and does one bulk INSERT — IO-light, but + # a property list can be ~40,000 rows, so 300s leaves ample headroom under the + # queue visibility timeout. batch_size = 1 keeps one upload per invocation so a + # bad record can't redrive its siblings; maximum_concurrency caps DB write + # fan-out. + timeout = 300 + batch_size = 1 + maximum_concurrency = 2 + + environment = merge( + { + STAGE = var.stage + LOG_LEVEL = "info" + POSTGRES_USERNAME = local.db_credentials.db_assessment_model_username + POSTGRES_PASSWORD = local.db_credentials.db_assessment_model_password + }, + ) +} + +# Attach S3 read policy so the handler can read the combiner output CSV. +resource "aws_iam_role_policy_attachment" "bulk_upload_finaliser_s3_read" { + role = module.lambda.role_name + policy_arn = data.terraform_remote_state.shared.outputs.bulk_upload_finaliser_s3_read_arn +} diff --git a/deployment/terraform/lambda/bulkUploadFinaliser/outputs.tf b/deployment/terraform/lambda/bulkUploadFinaliser/outputs.tf new file mode 100644 index 00000000..505d902a --- /dev/null +++ b/deployment/terraform/lambda/bulkUploadFinaliser/outputs.tf @@ -0,0 +1,9 @@ +output "bulk_upload_finaliser_queue_url" { + value = module.lambda.queue_url + description = "URL of the Bulk Upload Finaliser SQS queue (wire into the FastAPI FINALISER_SQS_URL)" +} + +output "bulk_upload_finaliser_queue_arn" { + value = module.lambda.queue_arn + description = "ARN of the Bulk Upload Finaliser SQS queue" +} diff --git a/deployment/terraform/lambda/bulkUploadFinaliser/provider.tf b/deployment/terraform/lambda/bulkUploadFinaliser/provider.tf new file mode 100644 index 00000000..98a588f1 --- /dev/null +++ b/deployment/terraform/lambda/bulkUploadFinaliser/provider.tf @@ -0,0 +1,16 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = ">= 5.0" + } + } + + backend "s3" { + bucket = "bulk-upload-finaliser-terraform-state" + key = "terraform.tfstate" + region = "eu-west-2" + } + + required_version = ">= 1.2.0" +} diff --git a/deployment/terraform/lambda/bulkUploadFinaliser/variables.tf b/deployment/terraform/lambda/bulkUploadFinaliser/variables.tf new file mode 100644 index 00000000..5d45b312 --- /dev/null +++ b/deployment/terraform/lambda/bulkUploadFinaliser/variables.tf @@ -0,0 +1,27 @@ +variable "lambda_name" { + type = string + description = "Logical name of the lambda (e.g. bulkUploadFinaliser)" +} + +variable "stage" { + description = "Deployment stage (e.g. dev, prod)" + type = string +} + +variable "ecr_repo_url" { + type = string + description = "ECR repository URL (no tag, no digest)" +} + +variable "image_digest" { + type = string + description = "Image digest (sha256:...)" +} + +locals { + image_uri = "${var.ecr_repo_url}@${var.image_digest}" +} + +output "resolved_image_uri" { + value = local.image_uri +} diff --git a/deployment/terraform/lambda/fast-api/main.tf b/deployment/terraform/lambda/fast-api/main.tf index 3a2b5a5f..dea9b7d9 100644 --- a/deployment/terraform/lambda/fast-api/main.tf +++ b/deployment/terraform/lambda/fast-api/main.tf @@ -46,6 +46,15 @@ data "terraform_remote_state" "bulk_address2uprn_combiner" { } } +data "terraform_remote_state" "bulk_upload_finaliser" { + backend = "s3" + config = { + bucket = "bulk-upload-finaliser-terraform-state", + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} + ############################################ # Load Credentials ############################################ @@ -105,6 +114,7 @@ module "fastapi" { CATEGORISATION_SQS_URL = data.terraform_remote_state.categorisation.outputs.categorisation_queue_url POSTCODE_SPLITTER_SQS_URL = data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_url COMBINER_SQS_URL = data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_url + FINALISER_SQS_URL = data.terraform_remote_state.bulk_upload_finaliser.outputs.bulk_upload_finaliser_queue_url } } @@ -126,7 +136,8 @@ module "fastapi_sqs_policy" { data.terraform_remote_state.engine.outputs.ara_engine_queue_arn, data.terraform_remote_state.categorisation.outputs.categorisation_queue_arn, data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_arn, - data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_arn + data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_arn, + data.terraform_remote_state.bulk_upload_finaliser.outputs.bulk_upload_finaliser_queue_arn ] conditions = null diff --git a/deployment/terraform/shared/main.tf b/deployment/terraform/shared/main.tf index 82a3820a..804082fc 100644 --- a/deployment/terraform/shared/main.tf +++ b/deployment/terraform/shared/main.tf @@ -558,6 +558,36 @@ output "bulk_address2uprn_combiner_s3_arn" { value = module.bulk_address2uprn_combiner_s3.policy_arn } +################################################ +# Bulk Upload Finaliser – Lambda ECR (ADR-0013) +################################################ +module "bulk_upload_finaliser_state_bucket" { + source = "../modules/tf_state_bucket" + bucket_name = "bulk-upload-finaliser-terraform-state" +} + +module "bulk_upload_finaliser_registry" { + source = "../modules/container_registry" + name = "bulk_upload_finaliser" + stage = var.stage +} + +# The finaliser only reads the combiner output (bulk_final_outputs) to insert +# property rows; it writes to Postgres, not S3. +module "bulk_upload_finaliser_s3_read" { + source = "../modules/s3_iam_policy" + + policy_name = "BulkUploadFinaliserReadS3" + policy_description = "Allow bulk_upload_finaliser Lambda to read combiner output from retrofit-data bucket" + bucket_arns = ["arn:aws:s3:::retrofit-data-${var.stage}"] + actions = ["s3:GetObject", "s3:ListBucket"] + resource_paths = ["/bulk_final_outputs/*"] +} + +output "bulk_upload_finaliser_s3_read_arn" { + value = module.bulk_upload_finaliser_s3_read.policy_arn +} + ################################################ # Categorisation – Lambda ECR ################################################ diff --git a/docs/adr/0013-bulk-upload-finaliser-writes-properties.md b/docs/adr/0013-bulk-upload-finaliser-writes-properties.md new file mode 100644 index 00000000..378351cf --- /dev/null +++ b/docs/adr/0013-bulk-upload-finaliser-writes-properties.md @@ -0,0 +1,120 @@ +# ADR-0013: The `bulk_upload_finaliser` Lambda writes properties and the terminal status + +**Status:** Accepted +**Date:** 2026-06-04 + +> Companion to the `assessment-model` (frontend) repo's +> [ADR-0005](https://github.com/Hestia-Homes/assessment-model/blob/main/docs/adr/0005-async-bulk-upload-finaliser.md), +> which owns the state-machine change and the Drizzle schema. This ADR owns the +> **backend write path**. It applies the direct-write pattern, transaction-boundary +> rule, and port/adapter/row-mirror file layout established by +> [ADR-0003](0003-python-writes-landlord-overrides-directly.md). + +## Context + +Finalising a BulkUpload inserts one `property` row per source row — up to ~40,000. +Today a synchronous Next.js route does it; frontend ADR-0005 moves it to a dispatched +Lambda for the same reasons ADR-0003 moved the classifier write into Python: the work +is internal (a Lambda computes the rows, the same Lambda persists them), the Lambda +already sits next to a Postgres connection, and routing 40k inserts back through an +HTTP hop buys nothing. + +`PropertyRow` in this repo +([`infrastructure/postgres/property_table.py`](../../infrastructure/postgres/property_table.py)) +is currently a **defensive read-only view** — its docstring states the backend never +inserts properties. Finalise changes that. + +## Decision + +1. **New application `applications/bulk_upload_finaliser/`** wraps a handler in + `@subtask_handler` (auto-injected `TaskOrchestrator`; `run_subtask` owns the + subtask start/complete/fail and the Task cascade), exactly like + `applications/landlord_description_overrides/handler.py`. The trigger body: + + ```python + class BulkUploadFinaliserTriggerBody(SubtaskTriggerBody): # task_id, sub_task_id + s3_uri: str # combiner output (combined_output_s3_uri) + portfolio_id: int + bulk_upload_id: UUID + ``` + + Dispatched via a new `POST /v1/bulk-uploads/trigger-finaliser` FastAPI endpoint + (auth `validate_token`) that enqueues to a new SQS queue — mirroring + `trigger-postcode-splitter` / `trigger-landlord-overrides`. + +2. **`PropertyRow` drops its "backend never inserts" invariant** and gains the + insertable columns finalise needs — the **exact nine** the frontend route writes + today: `portfolio_id`, `creation_status` (`'READY'`), `uprn`, + `landlord_property_id` (← `Internal Reference`), `address` (matched ?? user + input), `postcode`, `user_inputted_address`, `user_inputted_postcode`, + `lexiscore`. Drizzle remains the schema source of truth (ADR-0003); this is a + mirror change only. + +3. **Insert policy lives in SQL, idempotent:** + + ```sql + INSERT INTO property (portfolio_id, creation_status, uprn, landlord_property_id, + address, postcode, user_inputted_address, + user_inputted_postcode, lexiscore) + VALUES … + ON CONFLICT (portfolio_id, uprn) WHERE uprn IS NOT NULL + DO NOTHING; + ``` + + This reproduces the frontend's `onConflictDoNothing` exactly — existing properties + are not churned on a re-run. + +4. **The Lambda writes the terminal BulkUpload status directly — DDD, on its own + session.** On success it sets `status='complete'`; on failure `status='failed'`. + Rather than the combiner's `backend/app/db/functions` (which open the legacy + `backend/` connection on the separate `DB_*` config), the finaliser stays + **PostgresConfig-only like the landlord classifier Lambda**: it writes status + through a DDD `BulkUploadStatusWriter` port (`repositories/bulk_upload/`) backed + by a minimal `BulkAddressUploadRow` mirror (`infrastructure/postgres/`), on the + *same* session. So a single DB config (`POSTGRES_*`) drives the run and the + image needs no `backend/`. The `complete` flip happens **in the same transaction + as the property insert** (atomic finalise — either both land or neither); + `failed` is written on a fresh session in the error path. Next.js no longer + writes `complete`; it owns only the `awaiting_review → finalising` + compare-and-swap at dispatch (frontend ADR-0005). + +5. **DDD layering (ADR-0003 + the landlord precedent).** Domain logic — resolving + combiner rows into property identity rows — lives in + `orchestration/bulk_upload_finaliser_orchestrator.py`; the Lambda + `applications/bulk_upload_finaliser/handler.py` stays thin (parse trigger, wire + S3 + engine + repos, delegate, report status). New seams: + - Property insert: port `repositories/property/property_identity_writer.py` + (+ `PropertyIdentityInsert` DTO), adapter + `repositories/property/property_identity_writer_postgres.py`, writing the + amended `infrastructure/postgres/property_table.py` mirror. A dedicated *writer* + distinct from the aggregate-hydrating `PropertyRepository`, which needs an EPC + slice the finaliser doesn't. + - Status: port `repositories/bulk_upload/bulk_upload_status_writer.py`, adapter + `…_postgres.py`, writing the `infrastructure/postgres/bulk_address_upload_table.py` + mirror. + - Transaction boundary stays in the `infrastructure/postgres/engine` helper + (`commit_scope`); the handler opens the context (once, around insert+complete), + never calls `.commit()`; orchestrator and repositories never commit. + +6. **`property_overrides` is out of scope (v2).** The table exists (frontend + migration 0221) but this Lambda does not populate it in v1. When v2 lands it adds + a `PropertyOverrideRow` mirror + `infrastructure/property/property_overrides_postgres_repository.py` + using `onConflictDoUpdate` per frontend ADR-0005. + +## Consequences + +**Positive.** +- 40k inserts run in a Lambda with a single `sub_task` row to observe, not behind a + synchronous HTTP request. +- Reuses the `@subtask_handler` / SQS / direct-Postgres machinery — reviewers have + the classifier handler and the combiner status-writers as precedents. + +**Negative.** +- **`PropertyRow` is no longer read-only.** The defensive invariant that made + accidental backend property writes impossible is gone; correctness now rests on the + finaliser being the only inserting caller. +- **Terminal-status ownership crosses the repo boundary.** The backend now writes a + BulkUpload status (`complete`/`failed`) that Next.js used to own — coordinated only + through the status column (frontend ADR-0005's "two writers, extended"). +- The Drizzle/SQLModel lockstep risk from ADR-0003 now extends to `property`'s + insertable columns. diff --git a/infrastructure/postgres/bulk_address_upload_table.py b/infrastructure/postgres/bulk_address_upload_table.py new file mode 100644 index 00000000..e43d7e84 --- /dev/null +++ b/infrastructure/postgres/bulk_address_upload_table.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from typing import ClassVar, Optional +from uuid import UUID + +from sqlmodel import Field, SQLModel + + +class BulkAddressUploadRow(SQLModel, table=True): + """Minimal mirror of the FE-owned ``bulk_address_uploads`` table. + + The schema source of truth is the Next.js Drizzle repo + (``src/app/db/schema/bulk_address_uploads.ts``); the backend's fuller mirror + lives at ``backend/app/db/models/bulk_address_uploads.py``. This DDD-side + mirror declares only the columns the finaliser writes — the terminal status — + so the finaliser can flip status on its own ``PostgresConfig`` session without + pulling in the legacy ``backend/`` connection (ADR-0013). Keep the column names + in step with the Drizzle source. + """ + + __tablename__: ClassVar[str] = "bulk_address_uploads" # pyright: ignore[reportIncompatibleVariableOverride] + + id: UUID = Field(primary_key=True) + task_id: Optional[UUID] = Field(default=None, index=True) + status: str + updated_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc) + ) diff --git a/infrastructure/postgres/property_table.py b/infrastructure/postgres/property_table.py index 6bd2d644..c333cad4 100644 --- a/infrastructure/postgres/property_table.py +++ b/infrastructure/postgres/property_table.py @@ -2,24 +2,49 @@ from __future__ import annotations from typing import ClassVar, Optional +from sqlalchemy import Column +from sqlalchemy import Enum as SAEnum from sqlmodel import Field, SQLModel +# Mirror of the FE-owned `creation_status` pgEnum (property.ts: +# propertyCreationStatusEnum = {LOADING, READY, ERROR}). A single SAEnum instance +# so test-schema create_all emits one CREATE TYPE; prod owns the type via Drizzle. +property_creation_status_sa_enum = SAEnum( + "LOADING", "READY", "ERROR", name="creation_status" +) + class PropertyRow(SQLModel, table=True): - """Defensive view of the FE-owned ``property`` table. + """Mirror of the FE-owned ``property`` table. - The schema and migrations for ``property`` are owned by the front-end - Next.js repo; this declares only the identity columns the modelling backend - reads/writes, so FE-owned migrations to other columns don't ripple into us. + The schema and migrations for ``property`` are owned by the front-end Next.js + repo (``src/app/db/schema/property.ts``); this declares the identity columns + the modelling backend reads, plus the subset the ``bulk_upload_finaliser`` + Lambda **inserts** at Finalise (ADR-0013). It is no longer read-only — the + finaliser is the one backend caller that inserts. Columns not declared here + are still owned by FE migrations and don't ripple into us. """ __tablename__: ClassVar[str] = "property" # pyright: ignore[reportIncompatibleVariableOverride] - # Non-Optional: this is a read-only defensive view of the FE-owned ``property`` - # table — the backend never inserts rows, so every row read carries an id. - id: int = Field(primary_key=True) + # bigserial in the FE schema — DB-assigned on insert, so Optional/None on the + # way in and always populated on the way out. + id: Optional[int] = Field(default=None, primary_key=True) portfolio_id: int - postcode: str - address: str + # Nullable in the FE schema. The finaliser writes `matched ?? user-inputted`, + # which is absent for fully-unmatched rows. + postcode: Optional[str] = Field(default=None) + address: Optional[str] = Field(default=None) uprn: Optional[int] = Field(default=None) landlord_property_id: Optional[str] = Field(default=None) + + # Insertable columns the finaliser writes (ADR-0013). All nullable in the FE + # schema except `creation_status` (NOT NULL); the finaliser always sets it to + # 'READY', so a nullable mirror is safe — the real column enforces NOT NULL. + creation_status: Optional[str] = Field( + default=None, + sa_column=Column(property_creation_status_sa_enum, nullable=True), + ) + user_inputted_address: Optional[str] = Field(default=None) + user_inputted_postcode: Optional[str] = Field(default=None) + lexiscore: Optional[float] = Field(default=None) diff --git a/orchestration/bulk_upload_finaliser_orchestrator.py b/orchestration/bulk_upload_finaliser_orchestrator.py new file mode 100644 index 00000000..44d65178 --- /dev/null +++ b/orchestration/bulk_upload_finaliser_orchestrator.py @@ -0,0 +1,125 @@ +"""Finalises a BulkUpload into ``property`` rows (ADR-0013). + +The domain logic of Finalise: turn the combiner output rows into property identity +rows (the same resolution the old Next.js ``/finalize`` route did) and persist them +through the injected writer. Like every orchestrator it never commits — the caller +owns the transaction boundary (see the Lambda handler). +""" + +from __future__ import annotations + +import re +from typing import Any, Optional + +from repositories.property.property_identity_writer import ( + PropertyIdentityInsert, + PropertyIdentityWriter, +) + +# Combiner-output columns — identical to the old frontend /finalize route and the +# backend combined-results reader (router.py). +ADDRESS_COLS = ("Address 1", "Address 2", "Address 3") +POSTCODE_COL = "postcode" +INTERNAL_REF_COL = "Internal Reference" +UPRN_COL = "address2uprn_uprn" +MATCHED_ADDRESS_COL = "address2uprn_address" +LEXISCORE_COL = "address2uprn_lexiscore" +MISSING_SENTINEL = "invalid postcode" +UK_POSTCODE_RE = re.compile(r"[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}", re.IGNORECASE) + + +def _normalize(value: Any) -> str: + if value is None: + return "" + return str(value).strip() + + +def _is_missing(value: str) -> bool: + return value == "" or value.lower() == MISSING_SENTINEL + + +def _parse_uprn(raw: Any) -> Optional[int]: + val = _normalize(raw) + if _is_missing(val): + return None + try: + return int(val) + except ValueError: + return None + + +def _parse_lexiscore(raw: Any) -> Optional[float]: + val = _normalize(raw) + if _is_missing(val): + return None + try: + return float(val) + except ValueError: + return None + + +def _extract_postcode(matched: Optional[str], fallback: str) -> Optional[str]: + if matched: + m = UK_POSTCODE_RE.search(matched) + if m: + return m.group(0).upper() + return fallback or None + + +class BulkUploadFinaliserOrchestrator: + def __init__(self, property_writer: PropertyIdentityWriter) -> None: + self._property_writer = property_writer + + def to_property_rows( + self, rows: list[dict[str, str]], portfolio_id: int + ) -> list[PropertyIdentityInsert]: + """Resolve combiner rows into property identity inserts. + + Pure (no DB / IO), so the caller can run it before opening a transaction. + Reproduces the old ``/finalize`` route's resolution exactly: matched + address falls back to the user-inputted one; postcode is extracted from + the matched address or falls back to the user-inputted postcode. + """ + return [self._row_to_insert(raw, portfolio_id) for raw in rows] + + def persist(self, inserts: list[PropertyIdentityInsert]) -> int: + """Insert the resolved rows via the writer (idempotent — see the adapter). + + Does not commit; the caller opens the transaction around this call. + Returns the number of properties actually inserted. + """ + return self._property_writer.insert_all(inserts) + + @staticmethod + def _row_to_insert( + raw: dict[str, str], portfolio_id: int + ) -> PropertyIdentityInsert: + user_inputted_address = ( + ", ".join(p for p in (_normalize(raw.get(c)) for c in ADDRESS_COLS) if p) + or None + ) + user_inputted_postcode = _normalize(raw.get(POSTCODE_COL)) or None + + uprn = _parse_uprn(raw.get(UPRN_COL)) + + matched_address_raw = _normalize(raw.get(MATCHED_ADDRESS_COL)) + matched_address = ( + None if _is_missing(matched_address_raw) else matched_address_raw + ) + + address = matched_address or user_inputted_address + postcode = _extract_postcode(matched_address, user_inputted_postcode or "") + internal_ref = _normalize(raw.get(INTERNAL_REF_COL)) or None + lexiscore = _parse_lexiscore(raw.get(LEXISCORE_COL)) + + return PropertyIdentityInsert( + portfolio_id=portfolio_id, + uprn=uprn, + landlord_property_id=internal_ref, + address=address, + postcode=postcode, + user_inputted_address=user_inputted_address, + user_inputted_postcode=user_inputted_postcode, + lexiscore=lexiscore, + creation_status="READY", + ) diff --git a/pytest.ini b/pytest.ini index 47b40c17..1303dcb4 100644 --- a/pytest.ini +++ b/pytest.ini @@ -7,6 +7,7 @@ testpaths = recommendations/tests backend/tests backend/address2UPRN/tests + backend/ordnanceSurvey/tests backend/app/db/functions/tests backend/categorisation/tests backend/condition/tests @@ -25,7 +26,5 @@ testpaths = etl/epc_clean/tests etl/hubspot/tests etl/spatial/tests - domain/sap10_ml/tests - tests/ markers = integration: mark a test as an integration test diff --git a/repositories/bulk_upload/bulk_upload_status_writer.py b/repositories/bulk_upload/bulk_upload_status_writer.py new file mode 100644 index 00000000..70fdb1b2 --- /dev/null +++ b/repositories/bulk_upload/bulk_upload_status_writer.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from uuid import UUID + + +class BulkUploadStatusWriter(ABC): + """Port: writes the terminal BulkUpload status at Finalise (ADR-0013). + + The finaliser owns the terminal write — Next.js no longer writes ``complete``; + it only flips ``awaiting_review → finalising`` at dispatch (ADR-0005). This is + the backend half of that "two writers" split, alongside the combiner's + ``combining``/``awaiting_review`` writes. + """ + + @abstractmethod + def set_status(self, task_id: UUID, status: str) -> None: + """Set the status of the bulk_address_uploads row for ``task_id``. + + Does not commit — the caller owns the transaction boundary, so the status + flip can share the same transaction as the property insert (atomic + finalise) or run in its own session for the failure path. + """ + ... diff --git a/repositories/bulk_upload/bulk_upload_status_writer_postgres.py b/repositories/bulk_upload/bulk_upload_status_writer_postgres.py new file mode 100644 index 00000000..ba83c4d7 --- /dev/null +++ b/repositories/bulk_upload/bulk_upload_status_writer_postgres.py @@ -0,0 +1,33 @@ +"""Postgres adapter for ``BulkUploadStatusWriter`` (ADR-0013). + +Flips the ``bulk_address_uploads`` status on the caller's session — so the +finaliser can mark ``complete`` in the *same* transaction as the property insert +(atomic finalise), and mark ``failed`` in a fresh session on the error path. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from uuid import UUID + +from sqlmodel import Session, select + +from infrastructure.postgres.bulk_address_upload_table import BulkAddressUploadRow +from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter + + +class BulkUploadStatusWriterPostgresRepository(BulkUploadStatusWriter): + def __init__(self, session: Session) -> None: + self._session = session + + def set_status(self, task_id: UUID, status: str) -> None: + row = self._session.exec( + select(BulkAddressUploadRow).where( + BulkAddressUploadRow.task_id == task_id + ) + ).first() + if row is None: + raise ValueError(f"No bulk_address_uploads row for task_id {task_id}") + row.status = status + row.updated_at = datetime.now(timezone.utc) + self._session.add(row) diff --git a/repositories/property/property_identity_writer.py b/repositories/property/property_identity_writer.py new file mode 100644 index 00000000..a5db0129 --- /dev/null +++ b/repositories/property/property_identity_writer.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Optional + + +@dataclass(frozen=True) +class PropertyIdentityInsert: + """One row the finaliser inserts into the FE-owned ``property`` table. + + Mirrors the exact column set today's Next.js ``/finalize`` writes (ADR-0013): + nine fields plus ``creation_status='READY'``. ``address``/``postcode`` are the + resolved (matched ?? user-inputted) values and may be ``None``. + """ + + portfolio_id: int + uprn: Optional[int] + landlord_property_id: Optional[str] + address: Optional[str] + postcode: Optional[str] + user_inputted_address: Optional[str] + user_inputted_postcode: Optional[str] + lexiscore: Optional[float] + creation_status: str = "READY" + + +class PropertyIdentityWriter(ABC): + """Port: bulk-inserts ``property`` identity rows at Finalise (ADR-0013). + + Distinct from ``PropertyRepository`` (which hydrates the Property *aggregate* + for reads and needs an EPC slice). This writer only persists identity rows and + has no read/EPC dependency. Idempotent on re-run: existing properties are not + churned — see the adapter's conflict policy. + """ + + @abstractmethod + def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: + """Insert all rows, skipping any whose ``(portfolio_id, uprn)`` already + exists (the FE partial unique index, ``WHERE uprn IS NOT NULL``). Rows with + no UPRN are always inserted. Returns the number of rows actually inserted. + An empty list is a no-op returning 0.""" + ... diff --git a/repositories/property/property_identity_writer_postgres.py b/repositories/property/property_identity_writer_postgres.py new file mode 100644 index 00000000..2be9aba5 --- /dev/null +++ b/repositories/property/property_identity_writer_postgres.py @@ -0,0 +1,63 @@ +"""Postgres adapter for ``PropertyIdentityWriter`` (ADR-0013). + +Bulk-inserts ``property`` identity rows through the ``PropertyRow`` SQLModel +mirror. The conflict policy lives in SQL and reproduces today's Next.js +``onConflictDoNothing`` exactly: skip a row whose ``(portfolio_id, uprn)`` already +exists under the FE partial unique index ``uq_property_portfolio_uprn`` +(``WHERE uprn IS NOT NULL``); rows with no UPRN are always inserted. This makes a +re-run leave existing properties untouched — contrast ``property_overrides``, +which recalculates (ADR-0005). +""" + +from __future__ import annotations + +from typing import Any, cast + +from sqlalchemy import Table +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlmodel import Session + +from infrastructure.postgres.property_table import PropertyRow +from repositories.property.property_identity_writer import ( + PropertyIdentityInsert, + PropertyIdentityWriter, +) + + +class PropertyIdentityWriterPostgresRepository(PropertyIdentityWriter): + def __init__(self, session: Session) -> None: + self._session = session + # SQLModel injects ``__table__`` at runtime on table=True classes but the + # stubs don't expose it; pin to ``Table`` so the dialect insert is typed. + self._table: Table = cast(Table, getattr(PropertyRow, "__table__")) + + def insert_all(self, rows: list[PropertyIdentityInsert]) -> int: + if not rows: + return 0 + + values = [ + { + "portfolio_id": r.portfolio_id, + "creation_status": r.creation_status, + "uprn": r.uprn, + "landlord_property_id": r.landlord_property_id, + "address": r.address, + "postcode": r.postcode, + "user_inputted_address": r.user_inputted_address, + "user_inputted_postcode": r.user_inputted_postcode, + "lexiscore": r.lexiscore, + } + for r in rows + ] + + stmt = pg_insert(self._table).values(values) + # Matches `uq_property_portfolio_uprn` (partial: WHERE uprn IS NOT NULL). + stmt = stmt.on_conflict_do_nothing( + index_elements=["portfolio_id", "uprn"], + index_where=self._table.c.uprn.isnot(None), + ) + + # SQLModel re-exports SQLAlchemy's Session.execute; one overload is marked + # deprecated in the stubs but the INSERT path is supported. + result = self._session.execute(stmt) # pyright: ignore[reportDeprecated] + return cast(int, result.rowcount) diff --git a/repositories/property/property_postgres_repository.py b/repositories/property/property_postgres_repository.py index 55a32ed3..6226e1e9 100644 --- a/repositories/property/property_postgres_repository.py +++ b/repositories/property/property_postgres_repository.py @@ -25,9 +25,12 @@ class PropertyPostgresRepository(PropertyRepository): if row is None: raise ValueError(f"property {property_id} not found") identity = PropertyIdentity( + # `postcode`/`address` are nullable in the FE schema (the finaliser may + # insert a row with neither); coerce the degenerate null to "" so the + # identity type stays a plain str. portfolio_id=row.portfolio_id, - postcode=row.postcode, - address=row.address, + postcode=row.postcode or "", + address=row.address or "", uprn=row.uprn, landlord_property_id=row.landlord_property_id, ) @@ -53,8 +56,8 @@ class PropertyPostgresRepository(PropertyRepository): Property( identity=PropertyIdentity( portfolio_id=row.portfolio_id, - postcode=row.postcode, - address=row.address, + postcode=row.postcode or "", + address=row.address or "", uprn=row.uprn, landlord_property_id=row.landlord_property_id, ),