Merge branch 'main' into feature/handle-new-magicplan-response-structure

This commit is contained in:
Daniel Roth 2026-06-05 10:16:14 +00:00
commit ebd6f1623f
40 changed files with 1276 additions and 42 deletions

View file

@ -284,6 +284,46 @@ jobs:
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
# ============================================================
# Build Bulk Upload Finaliser image and Push
# ============================================================
bulkUploadFinaliser_image:
needs: [determine_stage, shared_terraform]
uses: ./.github/workflows/_build_image.yml
with:
ecr_repo: bulk_upload_finaliser-${{ needs.determine_stage.outputs.stage }}
dockerfile_path: applications/bulk_upload_finaliser/Dockerfile
build_context: .
build_args: |
DEV_DB_HOST=$DEV_DB_HOST
DEV_DB_PORT=$DEV_DB_PORT
DEV_DB_NAME=$DEV_DB_NAME
secrets:
AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
DEV_DB_HOST: ${{ secrets.DEV_DB_HOST }}
DEV_DB_PORT: ${{ secrets.DEV_DB_PORT }}
DEV_DB_NAME: ${{ secrets.DEV_DB_NAME }}
# ============================================================
# Deploy Bulk Upload Finaliser Lambda
# ============================================================
bulkUploadFinaliser_lambda:
needs: [bulkUploadFinaliser_image, determine_stage, shared_terraform]
uses: ./.github/workflows/_deploy_lambda.yml
with:
lambda_name: bulkUploadFinaliser
lambda_path: deployment/terraform/lambda/bulkUploadFinaliser
stage: ${{ needs.determine_stage.outputs.stage }}
ecr_repo: bulk_upload_finaliser-${{ needs.determine_stage.outputs.stage }}
image_digest: ${{ needs.bulkUploadFinaliser_image.outputs.image_digest }}
terraform_apply: ${{ needs.determine_stage.outputs.terraform_apply }}
secrets:
AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
# ============================================================
# Condition ETL image and Push
# ============================================================
@ -459,7 +499,7 @@ jobs:
# Deploy FastAPI Lambda
# ============================================================
fast_api_lambda:
needs: [determine_stage, ara_engine_lambda, categorisation_lambda, postcodeSplitter_lambda, bulk_address2uprn_combiner_lambda]
needs: [determine_stage, ara_engine_lambda, categorisation_lambda, postcodeSplitter_lambda, bulk_address2uprn_combiner_lambda, bulkUploadFinaliser_lambda]
uses: ./.github/workflows/_deploy_lambda.yml
with:
lambda_name: ara_fast_api

View file

@ -63,6 +63,16 @@ jobs:
build_context: .
service_name: bulk-address2uprn-combiner
# ============================================================
# Bulk Upload Finaliser
# ============================================================
bulk_upload_finaliser_smoke_test:
uses: ./.github/workflows/_smoke_test_lambda.yml
with:
dockerfile_path: applications/bulk_upload_finaliser/Dockerfile
build_context: .
service_name: bulk-upload-finaliser
# ============================================================
# Condition ETL
# ============================================================

View file

@ -0,0 +1,38 @@
FROM public.ecr.aws/lambda/python:3.11
# Postgres host/port/database are baked into the image at build time from the
# deploy workflow's --build-arg values (GitHub Actions DEV_DB_* secrets),
# mirroring the landlord_description_overrides Dockerfile. They map onto the
# POSTGRES_* names PostgresConfig.from_env reads. Username/password are NOT baked
# in -- Terraform injects those as Lambda env vars from Secrets Manager.
ARG DEV_DB_HOST
ARG DEV_DB_PORT
ARG DEV_DB_NAME
ENV POSTGRES_HOST=${DEV_DB_HOST}
ENV POSTGRES_PORT=${DEV_DB_PORT}
ENV POSTGRES_DATABASE=${DEV_DB_NAME}
WORKDIR /var/task
COPY applications/bulk_upload_finaliser/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# DDD-shaped packages only -- no pandas, no legacy backend/. The finaliser writes
# both `property` and the terminal `bulk_address_uploads` status through DDD repos
# on its own PostgresConfig session (ADR-0013). `datatypes/` comes in via the
# Property aggregate's import closure (domain/property/property.py -> site_notes ->
# datatypes/epc/...), enforced by tests/test_lambda_packaging.py.
COPY datatypes/ datatypes/
COPY domain/ domain/
COPY infrastructure/ infrastructure/
COPY orchestration/ orchestration/
COPY repositories/ repositories/
COPY utilities/ utilities/
COPY applications/ applications/
# Place the handler at the Lambda task root so the runtime resolves
# ``main.handler`` without an extra package prefix.
COPY applications/bulk_upload_finaliser/handler.py /var/task/main.py
CMD ["main.handler"]

View file

@ -0,0 +1,22 @@
from uuid import UUID
from pydantic import BaseModel, ConfigDict
class BulkUploadFinaliserTriggerBody(BaseModel):
"""Trigger body for the bulk_upload_finaliser Lambda (ADR-0013).
Dispatched by the Next.js Finalise action via
``POST /v1/bulk-uploads/trigger-finaliser``. ``s3_uri`` is the combiner output
(``combined_output_s3_uri``) the same address/UPRN CSV the old synchronous
``/finalize`` route read.
"""
model_config = ConfigDict(extra="allow")
task_id: UUID
sub_task_id: UUID
s3_uri: str
# bigint in the FE schema; Python int is unbounded so Pydantic stays simple.
portfolio_id: int
bulk_upload_id: UUID

View file

@ -0,0 +1,104 @@
"""bulk_upload_finaliser Lambda (ADR-0013).
Replaces the synchronous Next.js ``/finalize`` property insert. Thin wiring: parse
the trigger, read the combiner output CSV from S3, hand the rows to the
``BulkUploadFinaliserOrchestrator`` (which owns the resolution + persist), then
write the terminal BulkUpload status directly (ADR-0005 hands terminal ownership to
the backend). ``complete`` is written in the *same* transaction as the property
insert (atomic finalise); ``failed`` is written on a fresh session on error.
PostgresConfig-only, like the landlord classifier Lambda no legacy ``backend/``
connection so a single DB config (POSTGRES_*) drives the whole run.
"""
import logging
import os
from typing import Any
from uuid import UUID
import boto3
from sqlalchemy.engine import Engine
from applications.bulk_upload_finaliser.bulk_upload_finaliser_trigger_body import (
BulkUploadFinaliserTriggerBody,
)
from infrastructure.postgres.config import PostgresConfig
from infrastructure.postgres.engine import commit_scope, make_engine, make_session
from infrastructure.s3.csv_s3_client import CsvS3Client
from infrastructure.s3.s3_uri import parse_s3_uri
from orchestration.bulk_upload_finaliser_orchestrator import (
BulkUploadFinaliserOrchestrator,
)
from orchestration.task_orchestrator import TaskOrchestrator
from repositories.bulk_upload.bulk_upload_status_writer_postgres import (
BulkUploadStatusWriterPostgresRepository,
)
from repositories.property.property_postgres_repository import (
PropertyPostgresRepository,
)
from utilities.aws_lambda.subtask_handler import subtask_handler
logger = logging.getLogger(__name__)
def _run(engine: Engine, trigger: BulkUploadFinaliserTriggerBody) -> int:
bucket, _key = parse_s3_uri(trigger.s3_uri)
boto3_client: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
boto_s3: Any = boto3_client("s3")
rows = CsvS3Client(boto_s3, bucket).read_rows(trigger.s3_uri)
session = make_session(engine)
try:
orchestrator = BulkUploadFinaliserOrchestrator(
# Write-only path: no EpcRepository needed for inserts.
property_repo=PropertyPostgresRepository(session),
status_writer=BulkUploadStatusWriterPostgresRepository(session),
)
# Atomic finalise: the orchestrator inserts properties and marks `complete`
# via its injected writers; the transaction here makes them land together —
# a failure in either rolls back both, leaving the row for the failure path.
with commit_scope(session):
inserted = orchestrator.finalise(
rows, trigger.portfolio_id, trigger.task_id
)
finally:
session.close()
logger.info(
"Finalised bulk upload %s: %d rows read, %d properties inserted.",
trigger.bulk_upload_id,
len(rows),
inserted,
)
return inserted
def _mark_failed(engine: Engine, task_id: UUID) -> None:
session = make_session(engine)
try:
with commit_scope(session):
BulkUploadStatusWriterPostgresRepository(session).set_status(
task_id, "failed"
)
finally:
session.close()
@subtask_handler()
def handler(
body: dict[str, Any], context: Any, task_orchestrator: TaskOrchestrator
) -> dict[str, int]:
trigger = BulkUploadFinaliserTriggerBody.model_validate(body)
engine = make_engine(PostgresConfig.from_env(os.environ))
try:
inserted = _run(engine, trigger)
except Exception:
# Hand the BulkUpload to the terminal `failed` state so the UI leaves
# `finalising`; the @subtask_handler also marks the SubTask FAILED on the
# re-raise below.
_mark_failed(engine, trigger.task_id)
raise
return {"inserted": inserted}

View file

@ -0,0 +1,4 @@
boto3
pydantic
sqlmodel
psycopg2-binary

View file

@ -12,6 +12,7 @@ from backend.app.bulk_uploads.schema import (
CombinedResultRow,
CombinedResultsResponse,
CombinerTriggerRequest,
FinaliserTriggerRequest,
FlagsSummary,
LandlordOverridesTriggerRequest,
PostcodeSplitterTriggerRequest,
@ -113,6 +114,26 @@ async def trigger_landlord_overrides(req: LandlordOverridesTriggerRequest):
}
@router.post("/trigger-finaliser", status_code=202)
async def trigger_finaliser(req: FinaliserTriggerRequest):
settings = get_settings()
try:
sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION)
response = sqs.send_message(
QueueUrl=settings.FINALISER_SQS_URL,
MessageBody=req.model_dump_json(),
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"SQS error: {e}")
return {
"task_id": req.task_id,
"sub_task_id": req.sub_task_id,
"sqs_message_id": response.get("MessageId"),
}
@router.get("/{task_id}/combined-results", response_model=CombinedResultsResponse)
async def get_combined_results(
task_id: UUID,

View file

@ -23,6 +23,14 @@ class LandlordOverridesTriggerRequest(BaseModel):
column_mapping: dict[str, str]
class FinaliserTriggerRequest(BaseModel):
task_id: str
sub_task_id: str
s3_uri: str # combiner output (combined_output_s3_uri)
portfolio_id: int
bulk_upload_id: str
class FlagsSummary(BaseModel):
duplicates: int
missing: int

View file

@ -43,6 +43,7 @@ class Settings(BaseSettings):
POSTCODE_SPLITTER_SQS_URL: str = "changeme"
COMBINER_SQS_URL: str = "changeme"
LANDLORD_OVERRIDES_SQS_URL: str = "changeme"
FINALISER_SQS_URL: str = "changeme"
# Third parties
EPC_AUTH_TOKEN: str = "changeme"

View file

@ -1,4 +1,5 @@
import urllib.parse
from typing import Any
from pydantic import ValidationError
import requests
import pandas as pd
@ -7,13 +8,13 @@ from utils.logger import setup_logger
logger = setup_logger()
def os_places_results_to_dataframe(data: dict) -> pd.DataFrame:
def os_places_results_to_dataframe(data: dict[str, Any]) -> pd.DataFrame:
"""
Flatten the OS Places API response results into a DataFrame.
Each result contains either a DPA or LPI record.
"""
results = data.get("results", [])
rows = []
results: list[dict[str, Any]] = data.get("results", [])
rows: list[dict[str, Any]] = []
for r in results:
if "DPA" in r:
rows.append(r["DPA"])
@ -22,7 +23,7 @@ def os_places_results_to_dataframe(data: dict) -> pd.DataFrame:
return pd.DataFrame(rows)
def lookup_os_places(postcode: str, api_key: str) -> dict:
def lookup_os_places(postcode: str, api_key: str) -> dict[str, Any]:
"""
Lookup a postcode using the OS Places API.
Returns the full API response data or an error dict.

View file

@ -0,0 +1,21 @@
# Runtime env for the Ordnance Survey handler when run locally via docker compose.
# Variable names must match backend/app/config.py Settings (DB_*, ORDNANCE_SURVEY_API_KEY)
# plus the AWS creds boto3 needs for S3 access.
ENVIRONMENT=local
# Database (OS Places postcode cache)
DB_HOST=
DB_PORT=5432
DB_NAME=
DB_USERNAME=
DB_PASSWORD=
# Ordnance Survey Places API
ORDNANCE_SURVEY_API_KEY=
# AWS — read input CSV from / write results to S3
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=eu-west-2
S3_BUCKET_NAME=retrofit-data-dev

View file

@ -12,9 +12,11 @@ payload = {
{
"body": json.dumps(
{
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
"sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0",
"s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/calico/missinguprn.csv",
"task_id": "ea1a20b2-a21f-464c-9999-25f684405b69",
"sub_task_id": "747c99df-a1ff-41a0-98c2-5265d7797f90",
"s3_uri": "s3://retrofit-data-dev/ara_raw_outputs/hyde/output1(Sheet2).csv",
"lexiscore_column": "address2uprn_lexiscore",
"lexiscore_threshold": 0.2,
}
)
}

View file

@ -0,0 +1,12 @@
#!/usr/bin/env bash
set -euo pipefail
cd "$(dirname "$0")"
if [ ! -f .env.local ]; then
cp .env.local.example .env.local
echo "Created .env.local from the template — fill it in, then re-run." >&2
exit 1
fi
docker compose build --no-cache
docker compose up --force-recreate

View file

@ -22,6 +22,7 @@ import uuid
import os
import pandas as pd
from tqdm import tqdm
logger: logging.Logger = setup_logger()
@ -152,8 +153,10 @@ def handler(body: dict[str, Any], context: Any, local: bool = False) -> None:
df["ordnance_survey_lexiscore"] = None
# Process each postcode group at a time
for postcode, group in grouped:
print(f"Processing postcode: {postcode} ({len(group)} rows)")
for postcode, group in tqdm(
grouped, total=grouped.ngroups, desc="OS postcodes", unit="postcode"
):
tqdm.write(f"Processing postcode: {postcode} ({len(group)} rows)")
valid_group = AddressMatch.is_valid_postcode(postcode)
if not valid_group:
logger.warning(f"Postcode {postcode} is invalid, skipping")

View file

@ -0,0 +1,2 @@
User Input,Postcode,Expected UPRN
"115 NORTHWALK, CROYDON, SURREY",CR0 9ES,
1 User Input Postcode Expected UPRN
2 115 NORTHWALK, CROYDON, SURREY CR0 9ES

View file

@ -0,0 +1,153 @@
# backend/ordnanceSurvey/tests/test_os_match.py
"""
Debug harness for Ordnance Survey address matching.
Mirrors backend/address2UPRN/tests/test_csv.py, but for the OS Places flow:
for each (User Input, Postcode) case it hits the live OS Places API, scores every
candidate address with AddressMatch.score (exactly as backend/ordnanceSurvey/main.py
does), and prints the full ranked breakdown so you can see *why* a match was or
wasn't found.
Run with -s to see the ranking:
pytest backend/ordnanceSurvey/tests/test_os_match.py -s
Requires ORDNANCE_SURVEY_API_KEY to be set (config Settings / env); skipped otherwise.
"""
import csv
import os
import time
from pathlib import Path
from typing import Any, Optional
import pytest
from backend.ordnanceSurvey.helpers import (
lookup_os_places,
os_places_results_to_dataframe,
)
from backend.utils.addressMatch import AddressMatch
FIXTURE_PATH = Path(__file__).parent / "test_data.csv"
# Be polite to the live OS Places API between cases.
OS_THROTTLE_SECONDS = 1.0
# Handler treats best_score <= 0 as "no match" (see ordnanceSurvey/main.py).
MATCH_THRESHOLD = 0.0
@pytest.fixture(autouse=True)
def _throttle_os_requests():
yield
time.sleep(OS_THROTTLE_SECONDS)
def _api_key() -> Optional[str]:
# Read straight from the environment so the debug harness doesn't depend on
# the full Settings model loading cleanly. Falls back to Settings if unset.
key = os.getenv("ORDNANCE_SURVEY_API_KEY")
if not key:
try:
from backend.app.config import get_settings
key = get_settings().ORDNANCE_SURVEY_API_KEY
except Exception:
key = None
if not key or key == "changeme":
return None
return key
def load_test_cases():
with open(FIXTURE_PATH, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
return [
pytest.param(
row["User Input"],
row["Postcode"],
(row.get("Expected UPRN") or "").strip(),
id=f'{row["User Input"]} [{row["Postcode"]}]',
)
for row in reader
]
def _scored_candidates(
user_input: str, postcode: str, api_key: str
) -> list[dict[str, Any]]:
"""
Fetch OS Places candidates for a postcode (bypassing the DB cache) and score
every candidate ADDRESS exactly as ordnanceSurvey/main.py does. Returned
ranked best-first.
"""
response: dict[str, Any] = lookup_os_places(postcode, api_key)
assert response.get("status") == 200, f"OS Places API failed: {response}"
assert "data" in response, f"No data in OS Places response: {response}"
candidates = os_places_results_to_dataframe(response["data"])
records: list[dict[str, Any]] = candidates.to_dict("records") # type: ignore[assignment]
scored: list[dict[str, Any]] = []
for rec in records:
address = str(rec.get("ADDRESS", ""))
scored.append(
{
"uprn": rec.get("UPRN", "?"),
"address": address,
"normalised": AddressMatch.normalise_address(address),
"score": AddressMatch.score(user_input, address),
}
)
scored.sort(key=lambda r: r["score"], reverse=True)
return scored
def _print_debug(
user_input: str, postcode: str, scored: list[dict[str, Any]]
) -> None:
print(f"\n{'=' * 80}")
print(f"User input : {user_input!r}")
print(f"Normalised : {AddressMatch.normalise_address(user_input)!r}")
print(f"Postcode : {postcode!r}")
print(f"Candidates : {len(scored)}")
print(f"{'-' * 80}")
if not scored:
print("(no OS Places candidates returned for this postcode)")
return
for row in scored[:15]:
print(f" score={row['score']:.4f} uprn={row['uprn']}")
print(f" ADDRESS : {row['address']}")
print(f" normalised : {row['normalised']}")
print(f"{'=' * 80}")
@pytest.mark.integration
@pytest.mark.parametrize("user_input,postcode,expected_uprn", load_test_cases())
def test_os_match_finds_candidate(
user_input: str,
postcode: str,
expected_uprn: str,
):
api_key = _api_key()
if api_key is None:
pytest.skip("ORDNANCE_SURVEY_API_KEY not set")
scored = _scored_candidates(user_input, postcode, api_key)
_print_debug(user_input, postcode, scored)
best = scored[0] if scored else None
best_score = float(best["score"]) if best is not None else 0.0
# The handler records a match only when best_score > 0. This assertion is
# the debug signal: when it fails, the printed ranking above shows why.
assert best is not None and best_score > MATCH_THRESHOLD, (
f"No OS match for {user_input!r} @ {postcode!r} "
f"(best_score={best_score:.4f}). See ranking above."
)
if expected_uprn:
assert str(best["uprn"]) == expected_uprn, (
f"Best match UPRN {best['uprn']!r} != expected {expected_uprn!r}"
)

View file

@ -0,0 +1,50 @@
from unittest.mock import MagicMock, patch
from backend.pashub_fetcher.token_getter import get_token_from_local_storage
def _configure_playwright_mock(mock_sync_playwright: MagicMock) -> None:
mock_page = MagicMock()
mock_page.url = "https://pashub.net/dashboard"
mock_page.evaluate.return_value = "fake-token"
mock_context = MagicMock()
mock_context.new_page.return_value = mock_page
mock_browser = MagicMock()
mock_browser.new_context.return_value = mock_context
mock_p = MagicMock()
mock_p.chromium.launch.return_value = mock_browser
mock_sync_playwright.return_value.__enter__.return_value = mock_p
@patch("backend.pashub_fetcher.token_getter.shutil.rmtree")
@patch("backend.pashub_fetcher.token_getter.glob.glob")
@patch("backend.pashub_fetcher.token_getter.sync_playwright")
def test_playwright_tmp_dirs_are_cleaned_up_after_browser_close(
mock_sync_playwright: MagicMock,
mock_glob: MagicMock,
mock_rmtree: MagicMock,
) -> None:
# Arrange
fake_artifacts = ["/tmp/playwright-artifacts-abc12"]
fake_profiles = ["/tmp/playwright_chromiumdev_profile-xyz99"]
def glob_side_effect(pattern: str) -> list[str]:
if "playwright-artifacts-*" in pattern:
return fake_artifacts
if "playwright_chromiumdev_profile-*" in pattern:
return fake_profiles
return []
mock_glob.side_effect = glob_side_effect
_configure_playwright_mock(mock_sync_playwright)
# Act
get_token_from_local_storage("user@example.com", "secret")
# Assert
mock_rmtree.assert_any_call("/tmp/playwright-artifacts-abc12", ignore_errors=True)
mock_rmtree.assert_any_call("/tmp/playwright_chromiumdev_profile-xyz99", ignore_errors=True)

View file

@ -1,4 +1,6 @@
import glob
import os
import shutil
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
@ -90,5 +92,12 @@ def get_token_from_local_storage(
context.close()
browser.close()
for pattern in (
"/tmp/playwright-artifacts-*",
"/tmp/playwright_chromiumdev_profile-*",
):
for path in glob.glob(pattern):
shutil.rmtree(path, ignore_errors=True)
if record_video and video_dir:
logger.info(f"Video(s) saved in: {video_dir}")

View file

@ -61,7 +61,7 @@ SHAREPOINT_SITE: str = "ECO"
def _build_requests(excel_path: str) -> list[PashubToAraTriggerRequest]:
wb = load_workbook(excel_path, data_only=True)
ws = wb.worksheets[1]
ws = wb.worksheets[0]
headers: dict[str, int] = {}
for col in range(1, ws.max_column + 1):
@ -107,6 +107,7 @@ def _build_requests(excel_path: str) -> list[PashubToAraTriggerRequest]:
deal_stage=deal_stage,
sharepoint_link=SHAREPOINT_PROPERTIES_FOLDER or None,
sharepoint_site=SHAREPOINT_SITE,
get_other_files=True,
)
)

View file

@ -0,0 +1,49 @@
data "terraform_remote_state" "shared" {
backend = "s3"
config = {
bucket = "assessment-model-terraform-state"
key = "env:/${var.stage}/terraform.tfstate"
region = "eu-west-2"
}
}
data "aws_secretsmanager_secret_version" "db_credentials" {
secret_id = "${var.stage}/assessment_model/db_credentials"
}
locals {
db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)
}
module "lambda" {
source = "../../modules/lambda_with_sqs"
name = "bulk-upload-finaliser"
stage = var.stage
image_uri = local.image_uri
# The finaliser reads the combiner CSV and does one bulk INSERT IO-light, but
# a property list can be ~40,000 rows, so 300s leaves ample headroom under the
# queue visibility timeout. batch_size = 1 keeps one upload per invocation so a
# bad record can't redrive its siblings; maximum_concurrency caps DB write
# fan-out.
timeout = 300
batch_size = 1
maximum_concurrency = 2
environment = merge(
{
STAGE = var.stage
LOG_LEVEL = "info"
POSTGRES_USERNAME = local.db_credentials.db_assessment_model_username
POSTGRES_PASSWORD = local.db_credentials.db_assessment_model_password
},
)
}
# Attach S3 read policy so the handler can read the combiner output CSV.
resource "aws_iam_role_policy_attachment" "bulk_upload_finaliser_s3_read" {
role = module.lambda.role_name
policy_arn = data.terraform_remote_state.shared.outputs.bulk_upload_finaliser_s3_read_arn
}

View file

@ -0,0 +1,9 @@
output "bulk_upload_finaliser_queue_url" {
value = module.lambda.queue_url
description = "URL of the Bulk Upload Finaliser SQS queue (wire into the FastAPI FINALISER_SQS_URL)"
}
output "bulk_upload_finaliser_queue_arn" {
value = module.lambda.queue_arn
description = "ARN of the Bulk Upload Finaliser SQS queue"
}

View file

@ -0,0 +1,16 @@
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = ">= 5.0"
}
}
backend "s3" {
bucket = "bulk-upload-finaliser-terraform-state"
key = "terraform.tfstate"
region = "eu-west-2"
}
required_version = ">= 1.2.0"
}

View file

@ -0,0 +1,27 @@
variable "lambda_name" {
type = string
description = "Logical name of the lambda (e.g. bulkUploadFinaliser)"
}
variable "stage" {
description = "Deployment stage (e.g. dev, prod)"
type = string
}
variable "ecr_repo_url" {
type = string
description = "ECR repository URL (no tag, no digest)"
}
variable "image_digest" {
type = string
description = "Image digest (sha256:...)"
}
locals {
image_uri = "${var.ecr_repo_url}@${var.image_digest}"
}
output "resolved_image_uri" {
value = local.image_uri
}

View file

@ -46,6 +46,15 @@ data "terraform_remote_state" "bulk_address2uprn_combiner" {
}
}
data "terraform_remote_state" "bulk_upload_finaliser" {
backend = "s3"
config = {
bucket = "bulk-upload-finaliser-terraform-state",
key = "env:/${var.stage}/terraform.tfstate"
region = "eu-west-2"
}
}
############################################
# Load Credentials
############################################
@ -105,6 +114,7 @@ module "fastapi" {
CATEGORISATION_SQS_URL = data.terraform_remote_state.categorisation.outputs.categorisation_queue_url
POSTCODE_SPLITTER_SQS_URL = data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_url
COMBINER_SQS_URL = data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_url
FINALISER_SQS_URL = data.terraform_remote_state.bulk_upload_finaliser.outputs.bulk_upload_finaliser_queue_url
}
}
@ -126,7 +136,8 @@ module "fastapi_sqs_policy" {
data.terraform_remote_state.engine.outputs.ara_engine_queue_arn,
data.terraform_remote_state.categorisation.outputs.categorisation_queue_arn,
data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_arn,
data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_arn
data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_arn,
data.terraform_remote_state.bulk_upload_finaliser.outputs.bulk_upload_finaliser_queue_arn
]
conditions = null

View file

@ -22,6 +22,7 @@ module "lambda" {
stage = var.stage
image_uri = local.image_uri
timeout = var.timeout
# Optional: Set maximum_concurrency to limit concurrent SQS-triggered invocations (2-1000)
maximum_concurrency = var.maximum_concurrency

View file

@ -17,6 +17,12 @@ variable "image_digest" {
description = "Image digest (sha256:...)"
}
variable "timeout" {
type = number
default = 300
description = "Lambda timeout in seconds."
}
variable "maximum_concurrency" {
type = number
default = null

View file

@ -558,6 +558,36 @@ output "bulk_address2uprn_combiner_s3_arn" {
value = module.bulk_address2uprn_combiner_s3.policy_arn
}
################################################
# Bulk Upload Finaliser Lambda ECR (ADR-0013)
################################################
module "bulk_upload_finaliser_state_bucket" {
source = "../modules/tf_state_bucket"
bucket_name = "bulk-upload-finaliser-terraform-state"
}
module "bulk_upload_finaliser_registry" {
source = "../modules/container_registry"
name = "bulk_upload_finaliser"
stage = var.stage
}
# The finaliser only reads the combiner output (bulk_final_outputs) to insert
# property rows; it writes to Postgres, not S3.
module "bulk_upload_finaliser_s3_read" {
source = "../modules/s3_iam_policy"
policy_name = "BulkUploadFinaliserReadS3"
policy_description = "Allow bulk_upload_finaliser Lambda to read combiner output from retrofit-data bucket"
bucket_arns = ["arn:aws:s3:::retrofit-data-${var.stage}"]
actions = ["s3:GetObject", "s3:ListBucket"]
resource_paths = ["/bulk_final_outputs/*"]
}
output "bulk_upload_finaliser_s3_read_arn" {
value = module.bulk_upload_finaliser_s3_read.policy_arn
}
################################################
# Categorisation Lambda ECR
################################################

View file

@ -0,0 +1,125 @@
# ADR-0013: The `bulk_upload_finaliser` Lambda writes properties and the terminal status
**Status:** Accepted
**Date:** 2026-06-04
> Companion to the `assessment-model` (frontend) repo's
> [ADR-0005](https://github.com/Hestia-Homes/assessment-model/blob/main/docs/adr/0005-async-bulk-upload-finaliser.md),
> which owns the state-machine change and the Drizzle schema. This ADR owns the
> **backend write path**. It applies the direct-write pattern, transaction-boundary
> rule, and port/adapter/row-mirror file layout established by
> [ADR-0003](0003-python-writes-landlord-overrides-directly.md).
## Context
Finalising a BulkUpload inserts one `property` row per source row — up to ~40,000.
Today a synchronous Next.js route does it; frontend ADR-0005 moves it to a dispatched
Lambda for the same reasons ADR-0003 moved the classifier write into Python: the work
is internal (a Lambda computes the rows, the same Lambda persists them), the Lambda
already sits next to a Postgres connection, and routing 40k inserts back through an
HTTP hop buys nothing.
`PropertyRow` in this repo
([`infrastructure/postgres/property_table.py`](../../infrastructure/postgres/property_table.py))
is currently a **defensive read-only view** — its docstring states the backend never
inserts properties. Finalise changes that.
## Decision
1. **New application `applications/bulk_upload_finaliser/`** wraps a handler in
`@subtask_handler` (auto-injected `TaskOrchestrator`; `run_subtask` owns the
subtask start/complete/fail and the Task cascade), exactly like
`applications/landlord_description_overrides/handler.py`. The trigger body:
```python
class BulkUploadFinaliserTriggerBody(SubtaskTriggerBody): # task_id, sub_task_id
s3_uri: str # combiner output (combined_output_s3_uri)
portfolio_id: int
bulk_upload_id: UUID
```
Dispatched via a new `POST /v1/bulk-uploads/trigger-finaliser` FastAPI endpoint
(auth `validate_token`) that enqueues to a new SQS queue — mirroring
`trigger-postcode-splitter` / `trigger-landlord-overrides`.
2. **`PropertyRow` drops its "backend never inserts" invariant** and gains the
insertable columns finalise needs — the **exact nine** the frontend route writes
today: `portfolio_id`, `creation_status` (`'READY'`), `uprn`,
`landlord_property_id` (← `Internal Reference`), `address` (matched ?? user
input), `postcode`, `user_inputted_address`, `user_inputted_postcode`,
`lexiscore`. Drizzle remains the schema source of truth (ADR-0003); this is a
mirror change only.
3. **Insert policy lives in SQL, idempotent:**
```sql
INSERT INTO property (portfolio_id, creation_status, uprn, landlord_property_id,
address, postcode, user_inputted_address,
user_inputted_postcode, lexiscore)
VALUES …
ON CONFLICT (portfolio_id, uprn) WHERE uprn IS NOT NULL
DO NOTHING;
```
This reproduces the frontend's `onConflictDoNothing` exactly — existing properties
are not churned on a re-run.
4. **The Lambda writes the terminal BulkUpload status directly — DDD, on its own
session.** On success it sets `status='complete'`; on failure `status='failed'`.
Rather than the combiner's `backend/app/db/functions` (which open the legacy
`backend/` connection on the separate `DB_*` config), the finaliser stays
**PostgresConfig-only like the landlord classifier Lambda**: it writes status
through a DDD `BulkUploadStatusWriter` port (`repositories/bulk_upload/`) backed
by a minimal `BulkAddressUploadRow` mirror (`infrastructure/postgres/`), on the
*same* session. So a single DB config (`POSTGRES_*`) drives the run and the
image needs no `backend/`. The `complete` flip happens **in the same transaction
as the property insert** (atomic finalise — either both land or neither);
`failed` is written on a fresh session in the error path. Next.js no longer
writes `complete`; it owns only the `awaiting_review → finalising`
compare-and-swap at dispatch (frontend ADR-0005).
5. **DDD layering (ADR-0003 + the landlord precedent).** The
`orchestration/bulk_upload_finaliser_orchestrator.py` owns the whole Finalise
domain flow via a single `finalise(rows, portfolio_id, task_id)` that depends
**only on the two repository ports** — it resolves the combiner rows, inserts
the properties, and marks the upload `complete`, with no engine/session/DB of
its own. So it is unit-testable with two in-memory fakes
(`tests/orchestration/test_bulk_upload_finaliser_orchestrator.py`). The Lambda
`applications/bulk_upload_finaliser/handler.py` is the composition root: parse
trigger, read S3, build the engine/session and the concrete adapters, open the
transaction around `finalise`, and handle the failure path. New seams:
- Property insert: `insert_all` (+ the `PropertyIdentityInsert` DTO) is added to
the existing `PropertyRepository` port and its `PropertyPostgresRepository`
adapter — **one repository per aggregate**, reads and writes together, writing
the amended `infrastructure/postgres/property_table.py` mirror. `epc_repo` is
optional: the write path creates identity rows and never hydrates, so the
finaliser constructs the repo with a session alone.
- Status: port `repositories/bulk_upload/bulk_upload_status_writer.py`, adapter
`…_postgres.py`, writing the `infrastructure/postgres/bulk_address_upload_table.py`
mirror (a separate table → a separate repository).
- Transaction boundary stays in the `infrastructure/postgres/engine` helper
(`commit_scope`); the handler opens the context (once, around insert+complete),
never calls `.commit()`; orchestrator and repositories never commit.
6. **`property_overrides` is out of scope (v2).** The table exists (frontend
migration 0221) but this Lambda does not populate it in v1. When v2 lands it adds
a `PropertyOverrideRow` mirror + `infrastructure/property/property_overrides_postgres_repository.py`
using `onConflictDoUpdate` per frontend ADR-0005.
## Consequences
**Positive.**
- 40k inserts run in a Lambda with a single `sub_task` row to observe, not behind a
synchronous HTTP request.
- Reuses the `@subtask_handler` / SQS / direct-Postgres machinery — reviewers have
the classifier handler and the combiner status-writers as precedents.
**Negative.**
- **`PropertyRow` is no longer read-only.** The defensive invariant that made
accidental backend property writes impossible is gone; correctness now rests on the
finaliser being the only inserting caller.
- **Terminal-status ownership crosses the repo boundary.** The backend now writes a
BulkUpload status (`complete`/`failed`) that Next.js used to own — coordinated only
through the status column (frontend ADR-0005's "two writers, extended").
- The Drizzle/SQLModel lockstep risk from ADR-0003 now extends to `property`'s
insertable columns.

0
etl/hubspot/__init__.py Normal file
View file

View file

@ -0,0 +1,29 @@
from __future__ import annotations
from datetime import datetime, timezone
from typing import ClassVar, Optional
from uuid import UUID
from sqlmodel import Field, SQLModel
class BulkAddressUploadRow(SQLModel, table=True):
"""Minimal mirror of the FE-owned ``bulk_address_uploads`` table.
The schema source of truth is the Next.js Drizzle repo
(``src/app/db/schema/bulk_address_uploads.ts``); the backend's fuller mirror
lives at ``backend/app/db/models/bulk_address_uploads.py``. This DDD-side
mirror declares only the columns the finaliser writes the terminal status
so the finaliser can flip status on its own ``PostgresConfig`` session without
pulling in the legacy ``backend/`` connection (ADR-0013). Keep the column names
in step with the Drizzle source.
"""
__tablename__: ClassVar[str] = "bulk_address_uploads" # pyright: ignore[reportIncompatibleVariableOverride]
id: UUID = Field(primary_key=True)
task_id: Optional[UUID] = Field(default=None, index=True)
status: str
updated_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc)
)

View file

@ -2,24 +2,49 @@ from __future__ import annotations
from typing import ClassVar, Optional
from sqlalchemy import Column
from sqlalchemy import Enum as SAEnum
from sqlmodel import Field, SQLModel
# Mirror of the FE-owned `creation_status` pgEnum (property.ts:
# propertyCreationStatusEnum = {LOADING, READY, ERROR}). A single SAEnum instance
# so test-schema create_all emits one CREATE TYPE; prod owns the type via Drizzle.
property_creation_status_sa_enum = SAEnum(
"LOADING", "READY", "ERROR", name="creation_status"
)
class PropertyRow(SQLModel, table=True):
"""Defensive view of the FE-owned ``property`` table.
"""Mirror of the FE-owned ``property`` table.
The schema and migrations for ``property`` are owned by the front-end
Next.js repo; this declares only the identity columns the modelling backend
reads/writes, so FE-owned migrations to other columns don't ripple into us.
The schema and migrations for ``property`` are owned by the front-end Next.js
repo (``src/app/db/schema/property.ts``); this declares the identity columns
the modelling backend reads, plus the subset the ``bulk_upload_finaliser``
Lambda **inserts** at Finalise (ADR-0013). It is no longer read-only the
finaliser is the one backend caller that inserts. Columns not declared here
are still owned by FE migrations and don't ripple into us.
"""
__tablename__: ClassVar[str] = "property" # pyright: ignore[reportIncompatibleVariableOverride]
# Non-Optional: this is a read-only defensive view of the FE-owned ``property``
# table — the backend never inserts rows, so every row read carries an id.
id: int = Field(primary_key=True)
# bigserial in the FE schema — DB-assigned on insert, so Optional/None on the
# way in and always populated on the way out.
id: Optional[int] = Field(default=None, primary_key=True)
portfolio_id: int
postcode: str
address: str
# Nullable in the FE schema. The finaliser writes `matched ?? user-inputted`,
# which is absent for fully-unmatched rows.
postcode: Optional[str] = Field(default=None)
address: Optional[str] = Field(default=None)
uprn: Optional[int] = Field(default=None)
landlord_property_id: Optional[str] = Field(default=None)
# Insertable columns the finaliser writes (ADR-0013). All nullable in the FE
# schema except `creation_status` (NOT NULL); the finaliser always sets it to
# 'READY', so a nullable mirror is safe — the real column enforces NOT NULL.
creation_status: Optional[str] = Field(
default=None,
sa_column=Column(property_creation_status_sa_enum, nullable=True),
)
user_inputted_address: Optional[str] = Field(default=None)
user_inputted_postcode: Optional[str] = Field(default=None)
lexiscore: Optional[float] = Field(default=None)

View file

@ -0,0 +1,146 @@
"""Finalises a BulkUpload into ``property`` rows (ADR-0013).
The domain logic of Finalise: turn the combiner output rows into property identity
rows (the same resolution the old Next.js ``/finalize`` route did) and persist them
through the injected writer. Like every orchestrator it never commits the caller
owns the transaction boundary (see the Lambda handler).
"""
from __future__ import annotations
import re
from typing import Any, Optional
from uuid import UUID
from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter
from repositories.property.property_repository import (
PropertyIdentityInsert,
PropertyRepository,
)
# Combiner-output columns — identical to the old frontend /finalize route and the
# backend combined-results reader (router.py).
ADDRESS_COLS = ("Address 1", "Address 2", "Address 3")
POSTCODE_COL = "postcode"
INTERNAL_REF_COL = "Internal Reference"
UPRN_COL = "address2uprn_uprn"
MATCHED_ADDRESS_COL = "address2uprn_address"
LEXISCORE_COL = "address2uprn_lexiscore"
MISSING_SENTINEL = "invalid postcode"
UK_POSTCODE_RE = re.compile(r"[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}", re.IGNORECASE)
def _normalize(value: Any) -> str:
if value is None:
return ""
return str(value).strip()
def _is_missing(value: str) -> bool:
return value == "" or value.lower() == MISSING_SENTINEL
def _parse_uprn(raw: Any) -> Optional[int]:
val = _normalize(raw)
if _is_missing(val):
return None
try:
return int(val)
except ValueError:
return None
def _parse_lexiscore(raw: Any) -> Optional[float]:
val = _normalize(raw)
if _is_missing(val):
return None
try:
return float(val)
except ValueError:
return None
def _extract_postcode(matched: Optional[str], fallback: str) -> Optional[str]:
if matched:
m = UK_POSTCODE_RE.search(matched)
if m:
return m.group(0).upper()
return fallback or None
class BulkUploadFinaliserOrchestrator:
"""Owns the domain flow of Finalise, depending only on repository ports.
Both collaborators are ports (``PropertyRepository``,
``BulkUploadStatusWriter``); the concrete Postgres adapters are wired by the
Lambda handler (the composition root). So a unit test constructs this with two
fakes and exercises ``finalise`` end-to-end no engine, session, or DB. The
orchestrator never commits: the caller opens the transaction around
``finalise`` so the insert and the ``complete`` flip land atomically.
"""
def __init__(
self,
property_repo: PropertyRepository,
status_writer: BulkUploadStatusWriter,
) -> None:
self._property_repo = property_repo
self._status_writer = status_writer
def finalise(
self, rows: list[dict[str, str]], portfolio_id: int, task_id: UUID
) -> int:
"""Resolve the combiner rows, insert the properties, and mark the upload
``complete`` all via the injected repositories, no DB connection of its
own. Returns the number of properties inserted. Does not commit."""
inserts = self.to_property_rows(rows, portfolio_id)
inserted = self._property_repo.insert_all(inserts)
self._status_writer.set_status(task_id, "complete")
return inserted
def to_property_rows(
self, rows: list[dict[str, str]], portfolio_id: int
) -> list[PropertyIdentityInsert]:
"""Resolve combiner rows into property identity inserts.
Pure (no DB / IO) and independently testable. Reproduces the old
``/finalize`` route's resolution exactly: matched address falls back to the
user-inputted one; postcode is extracted from the matched address or falls
back to the user-inputted postcode.
"""
return [self._row_to_insert(raw, portfolio_id) for raw in rows]
@staticmethod
def _row_to_insert(
raw: dict[str, str], portfolio_id: int
) -> PropertyIdentityInsert:
user_inputted_address = (
", ".join(p for p in (_normalize(raw.get(c)) for c in ADDRESS_COLS) if p)
or None
)
user_inputted_postcode = _normalize(raw.get(POSTCODE_COL)) or None
uprn = _parse_uprn(raw.get(UPRN_COL))
matched_address_raw = _normalize(raw.get(MATCHED_ADDRESS_COL))
matched_address = (
None if _is_missing(matched_address_raw) else matched_address_raw
)
address = matched_address or user_inputted_address
postcode = _extract_postcode(matched_address, user_inputted_postcode or "")
internal_ref = _normalize(raw.get(INTERNAL_REF_COL)) or None
lexiscore = _parse_lexiscore(raw.get(LEXISCORE_COL))
return PropertyIdentityInsert(
portfolio_id=portfolio_id,
uprn=uprn,
landlord_property_id=internal_ref,
address=address,
postcode=postcode,
user_inputted_address=user_inputted_address,
user_inputted_postcode=user_inputted_postcode,
lexiscore=lexiscore,
creation_status="READY",
)

View file

@ -7,6 +7,7 @@ testpaths =
recommendations/tests
backend/tests
backend/address2UPRN/tests
backend/ordnanceSurvey/tests
backend/app/db/functions/tests
backend/categorisation/tests
backend/condition/tests
@ -25,7 +26,5 @@ testpaths =
etl/epc_clean/tests
etl/hubspot/tests
etl/spatial/tests
domain/sap10_ml/tests
tests/
markers =
integration: mark a test as an integration test

View file

@ -0,0 +1,24 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from uuid import UUID
class BulkUploadStatusWriter(ABC):
"""Port: writes the terminal BulkUpload status at Finalise (ADR-0013).
The finaliser owns the terminal write Next.js no longer writes ``complete``;
it only flips ``awaiting_review finalising`` at dispatch (ADR-0005). This is
the backend half of that "two writers" split, alongside the combiner's
``combining``/``awaiting_review`` writes.
"""
@abstractmethod
def set_status(self, task_id: UUID, status: str) -> None:
"""Set the status of the bulk_address_uploads row for ``task_id``.
Does not commit the caller owns the transaction boundary, so the status
flip can share the same transaction as the property insert (atomic
finalise) or run in its own session for the failure path.
"""
...

View file

@ -0,0 +1,33 @@
"""Postgres adapter for ``BulkUploadStatusWriter`` (ADR-0013).
Flips the ``bulk_address_uploads`` status on the caller's session — so the
finaliser can mark ``complete`` in the *same* transaction as the property insert
(atomic finalise), and mark ``failed`` in a fresh session on the error path.
"""
from __future__ import annotations
from datetime import datetime, timezone
from uuid import UUID
from sqlmodel import Session, select
from infrastructure.postgres.bulk_address_upload_table import BulkAddressUploadRow
from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter
class BulkUploadStatusWriterPostgresRepository(BulkUploadStatusWriter):
def __init__(self, session: Session) -> None:
self._session = session
def set_status(self, task_id: UUID, status: str) -> None:
row = self._session.exec(
select(BulkAddressUploadRow).where(
BulkAddressUploadRow.task_id == task_id
)
).first()
if row is None:
raise ValueError(f"No bulk_address_uploads row for task_id {task_id}")
row.status = status
row.updated_at = datetime.now(timezone.utc)
self._session.add(row)

View file

@ -1,39 +1,65 @@
from __future__ import annotations
from typing import Any, Optional, cast
from sqlalchemy import Table
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlmodel import Session, col, select
from domain.property.properties import Properties
from domain.property.property import Property, PropertyIdentity
from infrastructure.postgres.property_table import PropertyRow
from repositories.epc.epc_repository import EpcRepository
from repositories.property.property_repository import PropertyRepository
from repositories.property.property_repository import (
PropertyIdentityInsert,
PropertyRepository,
)
class PropertyPostgresRepository(PropertyRepository):
"""Hydrates the Property aggregate from the FE-owned ``property`` row plus the
EPC slice (via an injected `EpcRepository`). Reads only from repos no
external IO so a hydrated Property is a pure function of repository state
(ADR-0003).
"""Postgres adapter for the ``property`` table — reads and writes (ADR-0003).
Reads hydrate the Property aggregate from the FE-owned row plus the EPC slice
(via an injected `EpcRepository`), so a hydrated Property is a pure function of
repository state. ``epc_repo`` is optional: the Finalise write path
(``insert_all``) creates new identity rows and never hydrates, so callers that
only insert construct this with a session alone.
"""
def __init__(self, session: Session, epc_repo: EpcRepository) -> None:
def __init__(
self, session: Session, epc_repo: Optional[EpcRepository] = None
) -> None:
self._session = session
self._epc_repo = epc_repo
# ``__table__`` is injected at runtime on table=True classes but the stubs
# don't expose it; pin to ``Table`` so the dialect insert is typed.
self._table: Table = cast(Table, getattr(PropertyRow, "__table__"))
def _epc(self) -> EpcRepository:
if self._epc_repo is None:
raise ValueError(
"PropertyPostgresRepository needs an EpcRepository to read; it was "
"constructed for the write-only Finalise path."
)
return self._epc_repo
def get(self, property_id: int) -> Property:
row = self._session.get(PropertyRow, property_id)
if row is None:
raise ValueError(f"property {property_id} not found")
identity = PropertyIdentity(
# `postcode`/`address` are nullable in the FE schema (the finaliser may
# insert a row with neither); coerce the degenerate null to "" so the
# identity type stays a plain str.
portfolio_id=row.portfolio_id,
postcode=row.postcode,
address=row.address,
postcode=row.postcode or "",
address=row.address or "",
uprn=row.uprn,
landlord_property_id=row.landlord_property_id,
)
return Property(
identity=identity,
epc=self._epc_repo.get_for_property(property_id),
epc=self._epc().get_for_property(property_id),
)
def get_many(self, property_ids: list[int]) -> Properties:
@ -43,7 +69,7 @@ class PropertyPostgresRepository(PropertyRepository):
select(PropertyRow).where(col(PropertyRow.id).in_(property_ids))
).all()
row_by_id = {row.id: row for row in rows}
epcs = self._epc_repo.get_for_properties(property_ids)
epcs = self._epc().get_for_properties(property_ids)
items: list[Property] = []
for property_id in property_ids:
row = row_by_id.get(property_id)
@ -53,8 +79,8 @@ class PropertyPostgresRepository(PropertyRepository):
Property(
identity=PropertyIdentity(
portfolio_id=row.portfolio_id,
postcode=row.postcode,
address=row.address,
postcode=row.postcode or "",
address=row.address or "",
uprn=row.uprn,
landlord_property_id=row.landlord_property_id,
),
@ -62,3 +88,36 @@ class PropertyPostgresRepository(PropertyRepository):
)
)
return Properties(items)
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
if not rows:
return 0
values = [
{
"portfolio_id": r.portfolio_id,
"creation_status": r.creation_status,
"uprn": r.uprn,
"landlord_property_id": r.landlord_property_id,
"address": r.address,
"postcode": r.postcode,
"user_inputted_address": r.user_inputted_address,
"user_inputted_postcode": r.user_inputted_postcode,
"lexiscore": r.lexiscore,
}
for r in rows
]
stmt = pg_insert(self._table).values(values)
# Matches `uq_property_portfolio_uprn` (partial: WHERE uprn IS NOT NULL),
# reproducing today's Next.js onConflictDoNothing — a re-run leaves existing
# properties untouched (contrast property_overrides, which recalculates).
stmt = stmt.on_conflict_do_nothing(
index_elements=["portfolio_id", "uprn"],
index_where=self._table.c.uprn.isnot(None),
)
# SQLModel re-exports SQLAlchemy's Session.execute; one overload is marked
# deprecated in the stubs but the INSERT path is supported.
result = self._session.execute(stmt) # pyright: ignore[reportDeprecated]
return cast(int, result.rowcount)

View file

@ -1,17 +1,40 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
from domain.property.properties import Properties
from domain.property.property import Property
class PropertyRepository(ABC):
"""Loads and saves the Property aggregate.
@dataclass(frozen=True)
class PropertyIdentityInsert:
"""One row inserted into the FE-owned ``property`` table at Finalise (ADR-0013).
Composes the aggregate whole from the FE-owned ``property`` identity row plus
its source-data slices (EPC today; Site Notes / enrichments as later slices
land). Aggregates load whole never half a Property (ADR-0002).
Mirrors the exact column set today's Next.js ``/finalize`` writes: nine fields
plus ``creation_status='READY'``. ``address``/``postcode`` are the resolved
(matched ?? user-inputted) values and may be ``None``.
"""
portfolio_id: int
uprn: Optional[int]
landlord_property_id: Optional[str]
address: Optional[str]
postcode: Optional[str]
user_inputted_address: Optional[str]
user_inputted_postcode: Optional[str]
lexiscore: Optional[float]
creation_status: str = "READY"
class PropertyRepository(ABC):
"""Reads and writes the FE-owned ``property`` table.
Reads hydrate the Property aggregate whole never half a Property from the
identity row plus its source-data slices (EPC today; Site Notes / enrichments
as later slices land) (ADR-0002, ADR-0012). Writes bulk-insert identity rows at
Finalise (ADR-0013). One repository per aggregate.
"""
@abstractmethod
@ -23,3 +46,11 @@ class PropertyRepository(ABC):
rather than one round-trip per property (ADR-0012). Order follows the
input ids."""
...
@abstractmethod
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
"""Bulk-insert identity rows, skipping any whose ``(portfolio_id, uprn)``
already exists (the FE partial unique index, ``WHERE uprn IS NOT NULL``).
Rows with no UPRN are always inserted. Returns the number actually
inserted; an empty list is a no-op returning 0 (ADR-0013)."""
...

View file

@ -15,7 +15,10 @@ from domain.property.properties import Properties
from domain.property.property import Property
from repositories.property_baseline.property_baseline_repository import PropertyBaselineRepository
from repositories.epc.epc_repository import EpcRepository
from repositories.property.property_repository import PropertyRepository
from repositories.property.property_repository import (
PropertyIdentityInsert,
PropertyRepository,
)
from repositories.solar.solar_repository import SolarRepository
from repositories.unit_of_work import UnitOfWork
@ -30,6 +33,10 @@ class FakePropertyRepo(PropertyRepository):
def get_many(self, property_ids: list[int]) -> Properties:
return Properties([self._by_id[property_id] for property_id in property_ids])
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
self.inserted: list[PropertyIdentityInsert] = list(rows)
return len(rows)
class FakeEpcRepo(EpcRepository):
def __init__(self, by_property: Optional[dict[int, EpcPropertyData]] = None) -> None:

View file

@ -0,0 +1,105 @@
"""The finaliser orchestrator runs end-to-end against fake writers — no DB.
This is the payoff of injecting the repository ports (ADR-0013): the whole
Finalise domain flow (resolve combiner rows -> insert properties -> mark complete)
is unit-testable with two in-memory fakes, no engine/session/Postgres.
"""
from __future__ import annotations
from uuid import UUID, uuid4
from domain.property.properties import Properties
from domain.property.property import Property
from orchestration.bulk_upload_finaliser_orchestrator import (
BulkUploadFinaliserOrchestrator,
)
from repositories.bulk_upload.bulk_upload_status_writer import BulkUploadStatusWriter
from repositories.property.property_repository import (
PropertyIdentityInsert,
PropertyRepository,
)
class FakePropertyRepository(PropertyRepository):
"""Records inserts; the read half is unused on the Finalise path."""
def __init__(self) -> None:
self.inserted: list[PropertyIdentityInsert] = []
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
self.inserted = list(rows)
return len(rows)
def get(self, property_id: int) -> Property: # pragma: no cover
raise NotImplementedError
def get_many(self, property_ids: list[int]) -> Properties: # pragma: no cover
raise NotImplementedError
class FakeStatusWriter(BulkUploadStatusWriter):
def __init__(self) -> None:
self.calls: list[tuple[UUID, str]] = []
def set_status(self, task_id: UUID, status: str) -> None:
self.calls.append((task_id, status))
def _orchestrator() -> tuple[
BulkUploadFinaliserOrchestrator, FakePropertyRepository, FakeStatusWriter
]:
prop = FakePropertyRepository()
status = FakeStatusWriter()
return BulkUploadFinaliserOrchestrator(prop, status), prop, status
def test_finalise_inserts_resolved_rows_and_marks_complete() -> None:
orchestrator, prop, status = _orchestrator()
task_id = uuid4()
rows = [
{
"Address 1": "1 Some Street",
"postcode": "A0 0AA",
"Internal Reference": "REF-1",
"address2uprn_uprn": "100023",
"address2uprn_address": "1 SOME STREET, TOWN, SW1A 1AA",
"address2uprn_lexiscore": "0.95",
},
]
inserted = orchestrator.finalise(rows, portfolio_id=7, task_id=task_id)
assert inserted == 1
(row,) = prop.inserted
assert row.portfolio_id == 7
assert row.uprn == 100023
assert row.landlord_property_id == "REF-1"
assert row.address == "1 SOME STREET, TOWN, SW1A 1AA" # matched wins
assert row.postcode == "SW1A 1AA" # extracted from the matched address
assert row.user_inputted_address == "1 Some Street"
assert row.lexiscore == 0.95
assert row.creation_status == "READY"
# The upload is marked complete via the injected status writer.
assert status.calls == [(task_id, "complete")]
def test_finalise_falls_back_to_user_input_when_unmatched() -> None:
orchestrator, prop, _status = _orchestrator()
rows = [
{
"Address 1": "2 Other Road",
"postcode": "B1 1BB",
"address2uprn_uprn": "invalid postcode",
"address2uprn_address": "invalid postcode",
"address2uprn_lexiscore": "",
},
]
orchestrator.finalise(rows, portfolio_id=9, task_id=uuid4())
(row,) = prop.inserted
assert row.uprn is None # missing sentinel -> None
assert row.address == "2 Other Road" # falls back to user input
assert row.postcode == "B1 1BB" # falls back to user-inputted postcode
assert row.lexiscore is None

View file

@ -37,7 +37,7 @@ class DomnaSharepointClient:
site_id=self.sharepoint_drive.value,
)
return sharepoint_client.list_folder_contents(path)
return sharepoint_client.list_folder_contents(path, page_size=500)
def does_folder_exists_at(self, file_name: str, file_path: str):
folders: Dict[str, Any] = self.get_folders_in_path(file_path)