bulk upload finaliser

This commit is contained in:
Jun-te Kim 2026-06-04 11:48:02 +00:00
parent 71e9ba4ad3
commit fc2664aeef
7 changed files with 318 additions and 204 deletions

View file

@ -49,7 +49,11 @@
"Bash(GIT_LITERAL_PATHSPECS=1 npx eslint src/lib/bulkUpload/server.ts src/lib/bulkUpload/client.ts \"src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/classifications/route.ts\" \"src/app/portfolio/[slug]/\\(portfolio\\)/bulk-upload/[uploadId]/OnboardingProgress.tsx\")",
"Bash(python3 -c \"from backend.app.config import Settings; print\\('COMBINER_SQS_URL =', Settings\\(\\).COMBINER_SQS_URL\\)\")",
"Bash(find /workspaces -name \"*.py\" -path \"*/domain/*\" -o -name \"subtasks.py\" 2>/dev/null | head -20)",
"Read(//workspaces/**)"
"Read(//workspaces/**)",
"Bash(grep -E '\\\\.sql$')",
"Bash(cd /home/vscode/po-migration *)",
"Read(//home/vscode/po-migration/**)",
"Bash(python -m py_compile applications/bulk_upload_finaliser/handler.py orchestration/bulk_upload_finaliser_orchestrator.py)"
],
"deny": [
"Bash(npx drizzle-kit generate)",
@ -61,7 +65,15 @@
"/workspaces/home/github/Model/orchestration",
"/workspaces/home/github/Model/backend/address2UPRN/local_handler",
"/workspaces/home/github/Model/deployment/terraform/shared",
"/tmp/mig-wt"
"/tmp/mig-wt",
"/workspaces/home/github/Model/docs/adr",
"/workspaces/home/github/Model/infrastructure/postgres",
"/workspaces/home/github/Model/repositories/property",
"/workspaces/home/github/Model/applications/bulk_upload_finaliser",
"/workspaces/home/github/Model/deployment/terraform/lambda/bulkUploadFinaliser",
"/workspaces/home/github/Model/deployment/terraform/lambda/fast-api",
"/workspaces/home/github/Model/backend/app/db/functions",
"/workspaces/home/github/Model/repositories/bulk_upload"
]
}
}

View file

@ -38,9 +38,13 @@ The housing association supplying a Portfolio's BulkUploads. A Landlord knows fa
_Avoid_: customer, client, owner, organisation (Organisation is a separate, broader entity)
**Landlord override**:
A landlord-supplied fact about a property that takes precedence over EPC-derived defaults when computing an assessment. The end-to-end Landlord override journey has two layers — a **VocabularyMapping** layer (this glossary entry below) and a per-Property fact layer (not yet modelled).
A landlord-supplied fact about a property that takes precedence over EPC-derived defaults when computing an assessment. The end-to-end Landlord override journey has two layers — a **VocabularyMapping** layer (this glossary entry below) and a per-Property fact layer (the **Property override**, below).
_Avoid_: customer data, manual override, landlord data
**Property override**:
The per-Property fact layer — one resolved fact per `(Property, Building part, component)`, where component is one of `wall_type`/`roof_type`/`property_type`/`built_form_type`. Holds a **snapshot** of the resolved enum value (a denormalised copy of the VocabularyMapping outcome at finalise time, so two Properties sharing a description can later diverge), plus the original spreadsheet text it resolved from. Materialised by the finaliser; see [ADR-0005](./docs/adr/0005-async-bulk-upload-finaliser.md). (Table created; population is follow-up work.)
_Avoid_: per-property mapping, property fact, override row
**VocabularyMapping**:
The translation from a Landlord's free-text description in a BulkUpload column (e.g. `"cavity: filledcavity"`) to a canonical domain enum value (e.g. `WallType.CAVITY`). Produced by a `ColumnClassifier` (today an LLM, tomorrow possibly a lookup table or rules engine) in the Model service. Stored per-Portfolio, one row per `(category, description)`. A row carries provenance (`classifier` or `user`) so user overrides survive re-classification.
_Avoid_: column mapping (that's a separate concept — see `ColumnMapping` above), classification, dictionary
@ -76,15 +80,18 @@ ready_for_processing
→ processing (Address matching triggered; Next.js writes)
→ combining (Combiner stage running; FastAPI writes directly)
→ awaiting_review (Combiner output in S3; FastAPI writes directly)
→ complete (Finalise succeeded; Next.js writes)
→ failed (FastAPI reports in-flight failure — schema only, not yet wired)
→ finalising (Finalise dispatched; Next.js writes via compare-and-swap)
→ complete (Finaliser succeeded; FastAPI/Lambda writes directly)
→ failed (Finaliser failed; FastAPI/Lambda writes directly)
```
`complete` and `failed` are terminal.
`complete` and `failed` are terminal. `finalising` is the in-flight state of the
async finaliser (mirrors `combining`); the UI renders it as "Uploading to ARA". See
[ADR-0005](./docs/adr/0005-async-bulk-upload-finaliser.md).
Re-mapping (PATCHing `columnMapping`) is legal only in `ready_for_processing` and `mapping_complete`. Any later state rejects with 409.
**Two writers**: Next.js owns transitions out of `mapping_complete`, into `processing`, and the terminal Finalise outcomes. FastAPI owns `combining` and `awaiting_review` — writing them direct to the DB during the combiner run. The BulkUpload aggregate observes both.
**Two writers**: Next.js owns transitions out of `mapping_complete`, into `processing`, and the `awaiting_review → finalising` compare-and-swap at Finalise dispatch. FastAPI/Lambda owns `combining`, `awaiting_review`, and the terminal `finalising → complete`/`failed` — writing them direct to the DB during the combiner and finaliser runs. The BulkUpload aggregate observes both. See [ADR-0005](./docs/adr/0005-async-bulk-upload-finaliser.md).
At `awaiting_review`, **Finalise is gated** (not a new status — a precondition on the action): when classifier columns were mapped the user must acknowledge the classification-verification step, and when the file is **Multi-entry** they must confirm the **Building-part ordering**. See [ADR-0004](./docs/adr/0004-multi-entry-building-part-ordering.md).

View file

@ -1,7 +1,8 @@
# Design WIP: `bulk_upload_finaliser` + `property_overrides`
> **Status:** In progress (grilling session paused 2026-06-03). Not an ADR yet.
> Resume from **Open question Q6** below. When decisions stabilise this should
> **Status:** v1 fully resolved (grilling 2026-06-04). Ready to graduate to ADR(s).
> v2 (`property_overrides` population) deferred to its own session — see the
> "Input" application-flow item for its entry point. When decisions stabilise this should
> graduate into a new ADR in `docs/adr/` (frontend) and likely a companion ADR
> in the Model repo, plus a CONTEXT.md update (see "Docs to update").
@ -24,9 +25,20 @@ Two linked pieces of work:
finalise"). One row per `(property, building_part, component)` carrying the
resolved enum value + provenance.
**v1 scope:** the finaliser only needs to write `property` (UPRN + address),
matching today's frontend `/finalize`. The `property_overrides` *table* is
designed now; populating it is follow-up work. (Confirm scope — see Q-scope.)
**Split into two pieces (decided 2026-06-04):**
- **v1 — async finaliser writes `property`.** Move today's synchronous Next.js
`/finalize` property-insert into a dispatched Lambda (`bulk_upload_finaliser`),
because a property list can be ~40,000 rows. Reproduces the exact 9-column insert
+ `onConflictDoNothing`, adds the `finalising` status + async state machine, and
shifts terminal-status ownership to the backend. **Fully designed — ADR-ready.**
- **v2 — populate `property_overrides`.** The per-Property fact layer. The *table*
already shipped (migration 0221, PR #306), but population is a **separate
follow-up** with its own open input-plumbing questions (see the "Input" item
under application-flow questions). Not designed here.
This doc resolves **v1 in full**; v2 gets its own grilling session against real
classifier-CSV / combiner-output samples.
## Where this sits in the existing pipeline
@ -57,7 +69,9 @@ already has an override-aware "re-score" seam — `property_overrides` will feed
| Backend access | Backend gets a **`PropertyOverrideRow` SQLModel** (mirror, like `landlord_wall_type_override_table.py`) + a **repository** (see `Model/infrastructure/postgres` + `Model/repositories` for examples). `PropertyRow` must drop its "backend never inserts" invariant and gain insertable columns. |
| Next.js `/finalize` | **Delete it** — fully replaced by the Lambda. |
| `property_overrides` shape | **Single polymorphic table**, not per-component tables. Accepts losing DB-level pgEnum typing on `value`. |
| `value` | `text` — a **denormalised snapshot copy** of the resolved enum value from `landlord_*_overrides` at materialise time (lets us see the value per-property even if vocabulary later changes). |
| `override_value` | `text` — a **denormalised snapshot copy** (own value per row) of the resolved enum from `landlord_*_overrides` at materialise time. Own-value (not an FK to the vocabulary) is what lets two properties sharing a description later **diverge**, and lets re-run recalculate one property's value without touching its siblings. |
| Snapshot, **not** FK to vocabulary | `property_overrides` does **not** foreign-key the originating `landlord_*_overrides` row. An FK forces every property sharing a description to share one value (forbids divergence); is structurally impossible as a real FK (4 polymorphic target tables, each with its own value enum); and would risk cascade-deleting per-property facts when re-classification prunes a vocabulary row. Lineage is preserved as a **natural key**`(portfolio_id, override_component, original_spreadsheet_description)` re-finds the vocabulary row (its `UNIQUE` is `(portfolio_id, description)`) — so deliberate re-sync needs no surrogate FK. |
| Re-run = **recalculate** | The finaliser write to `property_overrides` is `onConflictDoUpdate` on `(property_id, override_component, building_part)`, refreshing `override_value` + `original_spreadsheet_description` + `updated_at` to the latest resolution. Contrast `property`, which stays `onConflictDoNothing` (identity rows, don't churn). When per-property `source='user'` edits exist, the update must guard `WHERE source='classifier'` to preserve hand-edits (mirrors the Model classifier upsert). |
| `building_part` | **`smallint NOT NULL`**, explicit index: `0 = main building, 1 = extension 1, 2 = extension 2, …` (matches ADR-0004 `multiEntryOrdering.permutations` indexing). |
| Whole-dwelling components | **No special case.** `property_type`/`built_form` are *per-part-capable* too (an extension — conservatory, summer house — can be a different built form / property type). Today's files only supply them once, so they'll usually be written at `building_part = 0` only, but the schema allows per-part with no future migration. |
@ -70,70 +84,136 @@ property_overrides
id uuid pk (default random) -- match landlord_* tables
property_id bigint NOT NULL FK → property.id (FE-owned table)
portfolio_id bigint NOT NULL FK → portfolio.id
building_part smallint NOT NULL -- 0 = main, 1 = ext 1, 2 = ext 2, …
component <enum?> NOT NULL -- Q6: pgEnum vs text; value set
value text NOT NULL -- snapshot copy of landlord_* resolved enum
source override_source NOT NULL -- 'classifier' | 'user' (reuse existing pgEnum)
description text? -- Q7: store raw landlord description for provenance?
building_part smallint NOT NULL -- 0 = main, 1 = ext 1, 2 = ext 2, …
override_component override_component NOT NULL -- column name == enum type name; pgEnum {wall_type, roof_type, property_type, built_form_type} (Q6 ✓)
override_value text NOT NULL -- snapshot copy of landlord_* resolved enum (free text; `override_component` carries the typing)
-- (no `source`) — dropped Q9: pure value snapshot; add back as nullable column if/when a per-property edit path needs provenance
original_spreadsheet_description text NOT NULL -- raw spreadsheet cell text this snapshot resolved from (Q7 ✓)
created_at timestamptz NOT NULL default now()
updated_at timestamptz NOT NULL default now()
-- UNIQUE (property_id, component, building_part) -- Q8: confirm
-- UNIQUE (property_id, override_component, building_part) -- Q8 ✓ (source NOT in key — mirrors ADR-0004 single-row flip; portfolio_id implied by property_id)
-- FK property_id → property.id ON DELETE CASCADE; portfolio_id → portfolio.id (Drizzle only; bare bigint in SQLModel mirror); portfolio_id kept (matches property_details_epc / property_targets)
```
## Open questions (resume here)
- **Q6 — `component` discriminator (WE STOPPED HERE).** pgEnum vs text, and the
value set. *Recommendation:* pgEnum `property_component` (or
`override_component`) with the established category names
**`wall_type`, `roof_type`, `property_type`, `built_form_type`** (same keys as
`column_mapping` / `ClassifiableColumn.name`, so finaliser maps category →
component with no translation). pgEnum over text: small closed set, typos
caught at write time (matters more now `value` is free text); new component =
one-line `ALTER TYPE … ADD VALUE`.
- **Q6 — `component` discriminator. RESOLVED 2026-06-04.** pgEnum
**`override_component`** (column `component`) with values
**`wall_type`, `roof_type`, `property_type`, `built_form_type`**. Verified these
are the *exact* keys used both in the frontend
([columnFields.ts:30-33](../../src/lib/bulkUpload/columnFields.ts#L30-L33)) and
the backend (`ClassifiableColumn.name` / handler `_build_columns()`), so the
finaliser maps category → component with **no translation**. pgEnum over text:
small closed set, typos caught at write time — and this is now the *only*
DB-level typing left on a row, since `override_value` is free text. New component
= one-line `ALTER TYPE … ADD VALUE` (Drizzle-owned). Enum named `override_*`
(not `property_*`) to sit with `override_source` and stay visually distinct from
the existing *value* enum `property_type`.
- **Q7 — store raw `description` per override row?** For provenance / re-resolution
/ debugging vs redundancy (the description already lives in `landlord_*_overrides`).
*Lean:* store it — cheap, and it pins what text produced this snapshot.
- **Q7 — store raw description per override row? RESOLVED 2026-06-04: yes, as
`original_spreadsheet_description text NOT NULL`.** Names the *source artifact*
(the spreadsheet cell), not an actor — sidesteps the Landlord-vs-User conflation
the glossary warns against, and aligns with CONTEXT.md's "the source file". Stored
because `override_value` is a denormalised snapshot that deliberately won't
refresh on later vocabulary edits; pinning the original text makes each row
self-explaining and re-resolvable even after the source `landlord_*_overrides` row
changes. `NOT NULL` is safe **iff** every `property_overrides` row is materialised
from a `landlord_*_overrides` row (whose `description` is itself `NOT NULL`) —
confirm when settling Q9/source semantics.
- **Q8 — uniqueness + FKs.** Confirm `UNIQUE (property_id, component, building_part)`.
`property_id` FK → FE-owned `property.id`; `portfolio_id` as `bigint` (mirror
the `landlord_*` note: FK enforced by Drizzle migration, not the SQLModel).
- **Q8 — uniqueness + FKs. RESOLVED 2026-06-04.**
`UNIQUE (property_id, override_component, building_part)`. `building_part` is in
the key (part 0 and part 1 both carry e.g. a `wall_type` row). `source` is
**deliberately not** in the key — mirrors ADR-0004's single-row-flip (one row,
flip `source` in place; the two-row model was rejected). `portfolio_id` is not in
the key (implied by `property_id`) but **is kept as a column** for query ergonomics
and consistency with `property_details_epc` / `property_targets`, which both
denormalise it. FKs: `property_id → property.id ON DELETE CASCADE`;
`portfolio_id → portfolio.id ON DELETE CASCADE` in the Drizzle migration, but a
bare `bigint` (no FK) in the backend `PropertyOverrideRow` SQLModel mirror —
matching `landlord_wall_type_override_table.py`.
- **Q9 — `source` semantics.** Reuse the existing `override_source` pgEnum
(`classifier`/`user`). At materialise time the finaliser copies the source from
the `landlord_*_overrides` row it resolved from. Confirm there's no *per-property*
override concept yet (today overrides are edited at the **vocabulary/portfolio**
level per ADR-0004; property_overrides just snapshots the outcome).
- **Q9 — `source` semantics. RESOLVED 2026-06-04: drop `source` entirely.**
`property_overrides` is a pure **snapshot of resolved values**. Rationale: there
is no per-property override concept today (per ADR-0004 edits happen at the
**vocabulary/portfolio** level, flipped in place), so a copied `source` would
describe the *vocabulary mapping's* provenance, not this property's — a footgun a
reader/re-score rule could misread, and no consumer needs it in v1. When a genuine
per-property edit path lands (the real use for per-property provenance), `source`
returns as an **additive nullable-column migration** — no need to carry it now.
This also confirms the Q7 `NOT NULL` contingency: every row is still materialised
from a `landlord_*_overrides` row (`description NOT NULL`).
- **Q-scope — v1 scope.** Confirm v1 = `property` (UPRN + address) only;
`property_overrides` table created but **not populated** until follow-up.
- **Q-scope — v1 scope. RESOLVED 2026-06-04.** v1 = the finaliser reproduces
today's **exact 9-column `property` insert** (`portfolio_id`,
`creation_status='READY'`, `uprn`, `landlord_property_id``Internal Reference`,
`address` = matched ?? user-inputted, `postcode`, `user_inputted_address`,
`user_inputted_postcode`, `lexiscore`) **+** `onConflictDoNothing` on
`(portfolio_id, uprn) where uprn is not null` — not a reduced "UPRN + address".
This sizes the "PropertyRow gains insertable columns" decision to all nine
columns plus `creation_status`. The `property_overrides` *table* shipped ahead
(migration 0221, PR #306) but is **not populated** in v1 — population is
follow-up work (and needs a different input source; see combiner-output note
below).
### Application-flow questions not yet reached
- **Trigger + orchestration.** How does Finalise dispatch the finaliser? Likely
the existing `TaskOrchestrator` / `subtask_handler` pattern (see ADR-0003,
`landlord_description_overrides` handler) — Next.js `/finalize` triggers a
subtask instead of inserting. Need the trigger body shape (cf.
`LandlordDescriptionOverridesTriggerBody`: `task_id`, `sub_task_id`, `s3_uri`,
`portfolio_id`, …).
- **Trigger + orchestration. RESOLVED 2026-06-04.** Mirror the
`start-address-matching` path. Next.js creates a `SubTask` (`service:
"finaliser"`) under the BulkUpload's existing Task, then POSTs a new FastAPI
endpoint `POST /v1/bulk-uploads/trigger-finaliser` (auth via `validate_token`),
which enqueues to a **new SQS queue**; a Lambda runs the finaliser wrapped in
`@subtask_handler` (auto-injected `TaskOrchestrator`; `run_subtask` owns the
subtask start/complete/fail + Task cascade). Trigger body
`FinaliserTriggerBody { task_id, sub_task_id, s3_uri (combined output), portfolio_id }`
(extends `SubtaskTriggerBody`). Slow work stays outside the txn; persistence in a
`commit_scope`. The synchronous Next.js `/finalize` route is deleted (locked).
- **State machine / who writes `complete`.** Today Next.js writes `complete`
synchronously after the insert. If Finalise becomes async, **FastAPI/Lambda
writes the terminal status** (mirroring how it already owns
`combining`/`awaiting_review`). This is a CONTEXT.md "Two writers" change.
- **State machine / who writes `complete`. RESOLVED 2026-06-04.** New status
**`finalising`** between `awaiting_review` and `complete` (mirrors `combining`
before `awaiting_review`). Lifecycle:
`awaiting_review → finalising → complete` (↘ `failed`).
- **`finalising`** written by **Next.js at dispatch** via a **compare-and-swap**:
`UPDATE … SET status='finalising' WHERE id=? AND status='awaiting_review'`
0 rows ⇒ already dispatched ⇒ 409. This is the **double-dispatch guard** (closes
the simultaneous-click race under `loadForFinalize`'s existing precondition).
- **`complete` / `failed`** written by the **Lambda** directly to
`bulk_address_uploads` (new `set_finalized_status` / `set_failed_status`,
exactly like the combiner's `set_combining_status` /
`set_combined_output_s3_uri`). `markFinalized` + the Next.js `/finalize` route
are deleted.
- **CONTEXT.md "Two writers" change:** Next.js owns dispatch + the
`awaiting_review → finalising` CAS; the backend owns `finalising → complete`
and `→ failed` (in addition to `combining` / `awaiting_review`).
- **UI vs canonical:** persisted enum value is `finalising` (canonical; ties to
the **Finalise** action). The frontend renders it as **"Uploading to ARA"** — a
display-layer label only, **not** the enum name, so UX copy never needs a
migration.
- **Input — does the combiner output carry the raw description cells?** v1 only
needs address/UPRN columns (confirmed present: `address2uprn_uprn`,
- **Input — does the combiner output carry the raw description cells? RESOLVED
2026-06-04: NO. This is a v2 problem (deferred).** v1 needs only address/UPRN
columns, all **confirmed present** in the combiner output (`address2uprn_uprn`,
`address2uprn_address`, `address2uprn_lexiscore`, `Internal Reference`,
address/postcode). For `property_overrides` population (later) the finaliser
also needs the raw `Walls`/`Roofs`/`Property Type` cells **plus**
`multiEntryOrdering` to split per building part — confirm these survive into
the combiner output, or that the finaliser reads them from another source.
`Address 1/2/3`, `postcode`). The raw `Walls`/`Roofs`/`Property Type`/`Built Form`
cells are **not** in the combiner output — they survive only in (a) the
`{uploadId}-classifier.csv` on S3 (original headers) and (b) `landlord_*_overrides`
as *resolved* values keyed by description. So v2 population must assemble **four
inputs**, not one file:
- `property_id` (identity) ← combiner output `(portfolio_id, uprn)` — **but
no-UPRN rows have no such key**;
- raw cell text ← the classifier CSV (not the combiner output);
- cell → building-part split ← `multiEntryOrdering` on `bulk_address_uploads`;
- description → `override_value``landlord_*_overrides` (normalized description).
- **Two open v2 hazards (entry point for the v2 session):** (1) the join key
between classifier CSV and combiner output — is there a stable per-row key
(`Internal Reference`?) and is row order preserved through postcode-split +
combine? (2) obtaining `property_id` for unmatched (no-UPRN) rows — v1's
`onConflictDoNothing` returns no ids, so v2 likely needs `RETURNING id` mapped
back to source rows.
- **Idempotency / re-run.** Today's insert uses
`onConflictDoNothing((portfolio_id, uprn) where uprn is not null)`. Define the
finaliser's re-run behaviour for both tables (esp. that snapshot `value`s won't
refresh on re-run after a vocabulary edit unless we deliberately re-materialise).
- **Idempotency / re-run. RESOLVED 2026-06-04 (per-table).**
- `property`: keep today's `onConflictDoNothing` on `(portfolio_id, uprn) where uprn is not null` — existing properties are not churned.
- `property_overrides`: `onConflictDoUpdate` on `(property_id, override_component, building_part)`**recalculate** `override_value` + `original_spreadsheet_description` + `updated_at` to the latest resolution, so an existing property whose override changed is refreshed in place. Guard `WHERE source='classifier'` once a per-property user-edit path exists (until then every row is classifier-derived, so blind overwrite is correct). See the "Re-run = recalculate" and "Snapshot, not FK" locked decisions.
## Key code references (from exploration)

View file

@ -1,160 +1,46 @@
import { db } from "@/app/db/db";
import { property } from "@/app/db/schema/property";
import { sql } from "drizzle-orm";
import { NextRequest, NextResponse } from "next/server";
import { revalidatePath } from "next/cache";
import { getServerSession } from "next-auth";
import { AuthOptions } from "@/app/api/auth/[...nextauth]/authOptions";
import { createRetrofitDataS3Client } from "@/app/utils/s3";
import * as XLSX from "xlsx";
import { loadForFinalize, markFinalized } from "@/lib/bulkUpload/server";
const ADDRESS_COLS = ["Address 1", "Address 2", "Address 3"] as const;
const POSTCODE_COL = "postcode";
const INTERNAL_REF_COL = "Internal Reference";
const UPRN_COL = "address2uprn_uprn";
const MATCHED_ADDRESS_COL = "address2uprn_address";
const LEXISCORE_COL = "address2uprn_lexiscore";
const MISSING_SENTINEL = "invalid postcode";
const UK_POSTCODE_RE = /[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}/i;
function normalize(v: unknown): string {
if (v === null || v === undefined) return "";
return String(v).trim();
}
function isMissing(v: string): boolean {
return v === "" || v.toLowerCase() === MISSING_SENTINEL;
}
function parseUprn(raw: unknown): bigint | null {
const v = normalize(raw);
if (isMissing(v)) return null;
try {
return BigInt(v);
} catch {
return null;
}
}
function parseLexiscore(raw: unknown): number | null {
const v = normalize(raw);
if (isMissing(v)) return null;
const n = Number(v);
return Number.isFinite(n) ? n : null;
}
function extractPostcode(matched: string | null, fallback: string): string | null {
if (matched) {
const m = matched.match(UK_POSTCODE_RE);
if (m) return m[0].toUpperCase();
}
return fallback || null;
}
function parseS3Uri(uri: string): { bucket: string; key: string } | null {
if (!uri.startsWith("s3://")) return null;
const rest = uri.slice(5);
const slash = rest.indexOf("/");
if (slash < 0) return null;
return { bucket: rest.slice(0, slash), key: rest.slice(slash + 1) };
}
import { readSessionToken } from "@/lib/session";
import { dispatchFinaliser } from "@/lib/bulkUpload/server";
// Finalise is now asynchronous (ADR-0005). This route no longer inserts
// properties; it dispatches the bulk_upload_finaliser Lambda and flips the
// BulkUpload to `finalising` via a compare-and-swap (the double-dispatch guard).
// The Lambda reads the combiner output, inserts the property rows, and writes the
// terminal `complete`/`failed` status directly. The user sees "Uploading to ARA"
// while the row is `finalising`; the onboarding surface polls for the outcome.
export async function POST(
_request: NextRequest,
request: NextRequest,
{ params }: { params: Promise<{ portfolioId: string; uploadId: string }> }
) {
const session = await getServerSession(AuthOptions);
if (!session) return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
const { uploadId } = await params;
const sessionToken = readSessionToken(request);
const guarded = await loadForFinalize(uploadId);
switch (guarded.kind) {
const result = await dispatchFinaliser({ uploadId, sessionToken });
switch (result.kind) {
case "ok":
// Accepted: the finaliser is running; the row is now `finalising`.
return NextResponse.json({ taskId: result.taskId }, { status: 202 });
case "not_found":
return NextResponse.json({ error: "Not found" }, { status: 404 });
case "already_finalized":
// Idempotent: nothing to do.
return new NextResponse(null, { status: 200 });
case "wrong_state":
return NextResponse.json(
{ error: `Upload not ready to finalize (state: ${guarded.current})` },
{ status: 409 }
);
case "not_yet_combined":
return NextResponse.json({ error: "Combiner not finished" }, { status: 409 });
}
const upload = guarded.upload;
const parsed = parseS3Uri(upload.combinedOutputS3Uri!);
if (!parsed) {
return NextResponse.json({ error: "Invalid combined output S3 URI" }, { status: 500 });
}
const s3 = createRetrofitDataS3Client();
let rawRows: Record<string, unknown>[];
try {
const obj = await s3
.getObject({ Bucket: parsed.bucket, Key: parsed.key })
.promise();
const buf = Buffer.from(obj.Body as Uint8Array);
const wb = XLSX.read(buf, { type: "buffer" });
const sheet = wb.Sheets[wb.SheetNames[0]];
rawRows = XLSX.utils.sheet_to_json<Record<string, unknown>>(sheet, { defval: "" });
} catch (err) {
console.error("Failed to read combined CSV from S3:", err);
return NextResponse.json({ error: "Failed to read combined CSV" }, { status: 502 });
}
const portfolioIdBig = BigInt(upload.portfolioId);
const values = rawRows.map((raw) => {
const userInputtedAddress =
ADDRESS_COLS.map((c) => normalize(raw[c])).filter(Boolean).join(", ") || null;
const userInputtedPostcode = normalize(raw[POSTCODE_COL]) || null;
const uprn = parseUprn(raw[UPRN_COL]);
const matchedAddressRaw = normalize(raw[MATCHED_ADDRESS_COL]);
const matchedAddress = isMissing(matchedAddressRaw) ? null : matchedAddressRaw;
const address = matchedAddress ?? userInputtedAddress;
const postcode = extractPostcode(matchedAddress, userInputtedPostcode ?? "");
const internalRef = normalize(raw[INTERNAL_REF_COL]) || null;
const lexiscore = parseLexiscore(raw[LEXISCORE_COL]);
return {
portfolioId: portfolioIdBig,
creationStatus: "READY" as const,
uprn,
landlordPropertyId: internalRef,
address,
postcode,
userInputtedAddress,
userInputtedPostcode,
lexiscore,
};
});
try {
if (values.length > 0) {
await db
.insert(property)
.values(values)
.onConflictDoNothing({
target: [property.portfolioId, property.uprn],
where: sql`${property.uprn} IS NOT NULL`,
});
}
await markFinalized(uploadId);
revalidatePath("/portfolio/[slug]", "layout");
return new NextResponse(null, { status: 200 });
} catch (err) {
console.error("Failed to finalize bulk upload:", err);
return NextResponse.json({ error: "Failed to import properties" }, { status: 500 });
case "missing_task":
return NextResponse.json({ error: "Upload has no task to finalise" }, { status: 409 });
case "wrong_state":
return NextResponse.json(
{ error: `Upload not ready to finalize (state: ${result.current})` },
{ status: 409 }
);
case "trigger_failed":
return NextResponse.json({ error: result.message }, { status: result.status });
}
}

View file

@ -28,6 +28,7 @@ import {
RoofTypeValues,
} from "@/app/db/schema/landlord_overrides";
import { CLASSIFIER_FIELDS } from "@/lib/bulkUpload/columnFields";
import { statusLabel } from "@/lib/bulkUpload/types";
// Valid enum options per classifier category, for the editable dropdowns (#299).
const CATEGORY_VALUES: Record<string, readonly string[]> = {
@ -100,6 +101,9 @@ export default function OnboardingProgress({
const taskFailed = TASK_FAILED_STATUSES.has(taskStatus);
const isCombining = upload.status === "combining";
const isImporting = upload.status === "awaiting_review";
// Async finaliser in flight (ADR-0005). Polling continues (non-terminal) until
// the backend writes complete/failed. Surfaced to the user as "Uploading to ARA".
const isFinalising = upload.status === "finalising";
const canRunCombiner = taskDone && !taskFailed && upload.status === "processing";
const isAwaitingReview = upload.status === "awaiting_review";
@ -178,6 +182,12 @@ export default function OnboardingProgress({
Awaiting import
</span>
)}
{isFinalising && (
<span className="flex items-center gap-1 text-blue-500">
<span className="w-1.5 h-1.5 rounded-full bg-blue-400 animate-pulse" />
{statusLabel("finalising")}
</span>
)}
</div>
{needsVerify && sample && (

View file

@ -562,9 +562,105 @@ export async function loadForFinalize(uploadId: string): Promise<LoadForFinalize
return { kind: "ready", upload };
}
export async function markFinalized(uploadId: string): Promise<void> {
await db
export type DispatchFinaliserOutcome =
| { kind: "ok"; taskId: string; subTaskId: string }
| { kind: "not_found" }
| { kind: "already_finalized" }
| { kind: "not_yet_combined" }
| { kind: "wrong_state"; current: string }
| { kind: "missing_task" }
| { kind: "trigger_failed"; status: number; message: string };
// Dispatch the async bulk_upload_finaliser (ADR-0005). Replaces the old
// synchronous property insert + markFinalized. Order matters:
// 1. loadForFinalize — rich guards (combined output present, awaiting_review).
// 2. CAS claim `awaiting_review → finalising` — the double-dispatch guard:
// of two simultaneous clicks exactly one updates a row; the loser gets 409.
// 3. create the finaliser subtask under the upload's existing Task + POST the
// trigger. On trigger failure, revert the status so the user can retry and
// mark the subtask failed. The backend writes the terminal complete/failed.
export async function dispatchFinaliser(args: {
uploadId: string;
sessionToken: string | undefined;
}): Promise<DispatchFinaliserOutcome> {
const guarded = await loadForFinalize(args.uploadId);
switch (guarded.kind) {
case "not_found":
return { kind: "not_found" };
case "already_finalized":
return { kind: "already_finalized" };
case "not_yet_combined":
return { kind: "not_yet_combined" };
case "wrong_state":
return { kind: "wrong_state", current: guarded.current };
}
const upload = guarded.upload;
if (!upload.taskId) return { kind: "missing_task" };
// CAS: atomically claim the dispatch. Only the request that flips
// awaiting_review → finalising proceeds; a concurrent one updates 0 rows.
const claimed = await db
.update(bulkAddressUploads)
.set({ status: "complete" })
.where(eq(bulkAddressUploads.id, uploadId));
.set({ status: "finalising" })
.where(
and(
eq(bulkAddressUploads.id, args.uploadId),
eq(bulkAddressUploads.status, "awaiting_review"),
),
)
.returning();
if (claimed.length === 0) {
const current = await loadById(args.uploadId);
if (current?.status === "complete") return { kind: "already_finalized" };
return { kind: "wrong_state", current: current?.status ?? "unknown" };
}
const [subTask] = await db
.insert(subTasks)
.values({
taskId: upload.taskId,
status: "waiting",
service: SUBTASK_SERVICE.finaliser,
inputs: JSON.stringify({ bulk_upload_id: args.uploadId }),
})
.returning();
const payload = {
task_id: upload.taskId,
sub_task_id: subTask.id,
s3_uri: upload.combinedOutputS3Uri,
portfolio_id: Number(upload.portfolioId),
bulk_upload_id: args.uploadId,
};
const trigger = await triggerFastApiPipeline({
endpoint: "/v1/bulk-uploads/trigger-finaliser",
payload,
sessionToken: args.sessionToken,
});
if (!trigger.ok) {
// Roll the claim back so the user can retry, and fail the subtask.
await Promise.all([
db
.update(bulkAddressUploads)
.set({ status: "awaiting_review" })
.where(eq(bulkAddressUploads.id, args.uploadId)),
db
.update(subTasks)
.set({
status: "failed",
outputs: JSON.stringify({ error: trigger.message }),
})
.where(eq(subTasks.id, subTask.id)),
]);
return { kind: "trigger_failed", status: trigger.status, message: trigger.message };
}
await db
.update(subTasks)
.set({ status: "in progress", inputs: JSON.stringify(payload) })
.where(eq(subTasks.id, subTask.id));
return { kind: "ok", taskId: upload.taskId, subTaskId: subTask.id };
}

View file

@ -6,6 +6,10 @@ export const BULK_UPLOAD_STATUSES = [
"processing",
"combining",
"awaiting_review",
// In-flight state of the async finaliser (ADR-0005); mirrors `combining`. The
// status column is free text, so no enum migration is needed. UI renders this
// as "Uploading to ARA" — see STATUS_LABELS.
"finalising",
"complete",
"failed",
] as const;
@ -19,8 +23,27 @@ export type BulkUpload = typeof bulkAddressUploads.$inferSelect;
export const SUBTASK_SERVICE = {
address: "address2uprn",
classifier: "landlord_description_overrides",
finaliser: "bulk_upload_finaliser",
} as const;
// User-facing label for a BulkUpload status. The persisted enum value stays
// canonical (`finalising`); the product surface for that state is "Uploading to
// ARA" (ADR-0005 — a display-layer label, never the enum name).
export const STATUS_LABELS: Record<string, string> = {
ready_for_processing: "Ready for processing",
mapping_complete: "Mapping complete",
processing: "Processing",
combining: "Combining",
awaiting_review: "Awaiting review",
finalising: "Uploading to ARA",
complete: "Complete",
failed: "Failed",
};
export function statusLabel(status: string): string {
return STATUS_LABELS[status] ?? status;
}
export type TaskSummary = {
id: string;
taskSource: string;