diff --git a/.claude/settings.json b/.claude/settings.json index 974af6a..0343c16 100644 --- a/.claude/settings.json +++ b/.claude/settings.json @@ -49,7 +49,11 @@ "Bash(GIT_LITERAL_PATHSPECS=1 npx eslint src/lib/bulkUpload/server.ts src/lib/bulkUpload/client.ts \"src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/classifications/route.ts\" \"src/app/portfolio/[slug]/\\(portfolio\\)/bulk-upload/[uploadId]/OnboardingProgress.tsx\")", "Bash(python3 -c \"from backend.app.config import Settings; print\\('COMBINER_SQS_URL =', Settings\\(\\).COMBINER_SQS_URL\\)\")", "Bash(find /workspaces -name \"*.py\" -path \"*/domain/*\" -o -name \"subtasks.py\" 2>/dev/null | head -20)", - "Read(//workspaces/**)" + "Read(//workspaces/**)", + "Bash(grep -E '\\\\.sql$')", + "Bash(cd /home/vscode/po-migration *)", + "Read(//home/vscode/po-migration/**)", + "Bash(python -m py_compile applications/bulk_upload_finaliser/handler.py orchestration/bulk_upload_finaliser_orchestrator.py)" ], "deny": [ "Bash(npx drizzle-kit generate)", @@ -61,7 +65,15 @@ "/workspaces/home/github/Model/orchestration", "/workspaces/home/github/Model/backend/address2UPRN/local_handler", "/workspaces/home/github/Model/deployment/terraform/shared", - "/tmp/mig-wt" + "/tmp/mig-wt", + "/workspaces/home/github/Model/docs/adr", + "/workspaces/home/github/Model/infrastructure/postgres", + "/workspaces/home/github/Model/repositories/property", + "/workspaces/home/github/Model/applications/bulk_upload_finaliser", + "/workspaces/home/github/Model/deployment/terraform/lambda/bulkUploadFinaliser", + "/workspaces/home/github/Model/deployment/terraform/lambda/fast-api", + "/workspaces/home/github/Model/backend/app/db/functions", + "/workspaces/home/github/Model/repositories/bulk_upload" ] } } diff --git a/CONTEXT.md b/CONTEXT.md index 039b7c2..4e9234e 100644 --- a/CONTEXT.md +++ b/CONTEXT.md @@ -38,9 +38,13 @@ The housing association supplying a Portfolio's BulkUploads. A Landlord knows fa _Avoid_: customer, client, owner, organisation (Organisation is a separate, broader entity) **Landlord override**: -A landlord-supplied fact about a property that takes precedence over EPC-derived defaults when computing an assessment. The end-to-end Landlord override journey has two layers — a **VocabularyMapping** layer (this glossary entry below) and a per-Property fact layer (not yet modelled). +A landlord-supplied fact about a property that takes precedence over EPC-derived defaults when computing an assessment. The end-to-end Landlord override journey has two layers — a **VocabularyMapping** layer (this glossary entry below) and a per-Property fact layer (the **Property override**, below). _Avoid_: customer data, manual override, landlord data +**Property override**: +The per-Property fact layer — one resolved fact per `(Property, Building part, component)`, where component is one of `wall_type`/`roof_type`/`property_type`/`built_form_type`. Holds a **snapshot** of the resolved enum value (a denormalised copy of the VocabularyMapping outcome at finalise time, so two Properties sharing a description can later diverge), plus the original spreadsheet text it resolved from. Materialised by the finaliser; see [ADR-0005](./docs/adr/0005-async-bulk-upload-finaliser.md). (Table created; population is follow-up work.) +_Avoid_: per-property mapping, property fact, override row + **VocabularyMapping**: The translation from a Landlord's free-text description in a BulkUpload column (e.g. `"cavity: filledcavity"`) to a canonical domain enum value (e.g. `WallType.CAVITY`). Produced by a `ColumnClassifier` (today an LLM, tomorrow possibly a lookup table or rules engine) in the Model service. Stored per-Portfolio, one row per `(category, description)`. A row carries provenance (`classifier` or `user`) so user overrides survive re-classification. _Avoid_: column mapping (that's a separate concept — see `ColumnMapping` above), classification, dictionary @@ -76,15 +80,18 @@ ready_for_processing → processing (Address matching triggered; Next.js writes) → combining (Combiner stage running; FastAPI writes directly) → awaiting_review (Combiner output in S3; FastAPI writes directly) - → complete (Finalise succeeded; Next.js writes) - → failed (FastAPI reports in-flight failure — schema only, not yet wired) + → finalising (Finalise dispatched; Next.js writes via compare-and-swap) + → complete (Finaliser succeeded; FastAPI/Lambda writes directly) + → failed (Finaliser failed; FastAPI/Lambda writes directly) ``` -`complete` and `failed` are terminal. +`complete` and `failed` are terminal. `finalising` is the in-flight state of the +async finaliser (mirrors `combining`); the UI renders it as "Uploading to ARA". See +[ADR-0005](./docs/adr/0005-async-bulk-upload-finaliser.md). Re-mapping (PATCHing `columnMapping`) is legal only in `ready_for_processing` and `mapping_complete`. Any later state rejects with 409. -**Two writers**: Next.js owns transitions out of `mapping_complete`, into `processing`, and the terminal Finalise outcomes. FastAPI owns `combining` and `awaiting_review` — writing them direct to the DB during the combiner run. The BulkUpload aggregate observes both. +**Two writers**: Next.js owns transitions out of `mapping_complete`, into `processing`, and the `awaiting_review → finalising` compare-and-swap at Finalise dispatch. FastAPI/Lambda owns `combining`, `awaiting_review`, and the terminal `finalising → complete`/`failed` — writing them direct to the DB during the combiner and finaliser runs. The BulkUpload aggregate observes both. See [ADR-0005](./docs/adr/0005-async-bulk-upload-finaliser.md). At `awaiting_review`, **Finalise is gated** (not a new status — a precondition on the action): when classifier columns were mapped the user must acknowledge the classification-verification step, and when the file is **Multi-entry** they must confirm the **Building-part ordering**. See [ADR-0004](./docs/adr/0004-multi-entry-building-part-ordering.md). diff --git a/docs/design/bulk-upload-finaliser.md b/docs/design/bulk-upload-finaliser.md index b56c7b7..3a41e28 100644 --- a/docs/design/bulk-upload-finaliser.md +++ b/docs/design/bulk-upload-finaliser.md @@ -1,7 +1,8 @@ # Design WIP: `bulk_upload_finaliser` + `property_overrides` -> **Status:** In progress (grilling session paused 2026-06-03). Not an ADR yet. -> Resume from **Open question Q6** below. When decisions stabilise this should +> **Status:** v1 fully resolved (grilling 2026-06-04). Ready to graduate to ADR(s). +> v2 (`property_overrides` population) deferred to its own session — see the +> "Input" application-flow item for its entry point. When decisions stabilise this should > graduate into a new ADR in `docs/adr/` (frontend) and likely a companion ADR > in the Model repo, plus a CONTEXT.md update (see "Docs to update"). @@ -24,9 +25,20 @@ Two linked pieces of work: finalise"). One row per `(property, building_part, component)` carrying the resolved enum value + provenance. -**v1 scope:** the finaliser only needs to write `property` (UPRN + address), -matching today's frontend `/finalize`. The `property_overrides` *table* is -designed now; populating it is follow-up work. (Confirm scope — see Q-scope.) +**Split into two pieces (decided 2026-06-04):** + +- **v1 — async finaliser writes `property`.** Move today's synchronous Next.js + `/finalize` property-insert into a dispatched Lambda (`bulk_upload_finaliser`), + because a property list can be ~40,000 rows. Reproduces the exact 9-column insert + + `onConflictDoNothing`, adds the `finalising` status + async state machine, and + shifts terminal-status ownership to the backend. **Fully designed — ADR-ready.** +- **v2 — populate `property_overrides`.** The per-Property fact layer. The *table* + already shipped (migration 0221, PR #306), but population is a **separate + follow-up** with its own open input-plumbing questions (see the "Input" item + under application-flow questions). Not designed here. + +This doc resolves **v1 in full**; v2 gets its own grilling session against real +classifier-CSV / combiner-output samples. ## Where this sits in the existing pipeline @@ -57,7 +69,9 @@ already has an override-aware "re-score" seam — `property_overrides` will feed | Backend access | Backend gets a **`PropertyOverrideRow` SQLModel** (mirror, like `landlord_wall_type_override_table.py`) + a **repository** (see `Model/infrastructure/postgres` + `Model/repositories` for examples). `PropertyRow` must drop its "backend never inserts" invariant and gain insertable columns. | | Next.js `/finalize` | **Delete it** — fully replaced by the Lambda. | | `property_overrides` shape | **Single polymorphic table**, not per-component tables. Accepts losing DB-level pgEnum typing on `value`. | -| `value` | `text` — a **denormalised snapshot copy** of the resolved enum value from `landlord_*_overrides` at materialise time (lets us see the value per-property even if vocabulary later changes). | +| `override_value` | `text` — a **denormalised snapshot copy** (own value per row) of the resolved enum from `landlord_*_overrides` at materialise time. Own-value (not an FK to the vocabulary) is what lets two properties sharing a description later **diverge**, and lets re-run recalculate one property's value without touching its siblings. | +| Snapshot, **not** FK to vocabulary | `property_overrides` does **not** foreign-key the originating `landlord_*_overrides` row. An FK forces every property sharing a description to share one value (forbids divergence); is structurally impossible as a real FK (4 polymorphic target tables, each with its own value enum); and would risk cascade-deleting per-property facts when re-classification prunes a vocabulary row. Lineage is preserved as a **natural key** — `(portfolio_id, override_component, original_spreadsheet_description)` re-finds the vocabulary row (its `UNIQUE` is `(portfolio_id, description)`) — so deliberate re-sync needs no surrogate FK. | +| Re-run = **recalculate** | The finaliser write to `property_overrides` is `onConflictDoUpdate` on `(property_id, override_component, building_part)`, refreshing `override_value` + `original_spreadsheet_description` + `updated_at` to the latest resolution. Contrast `property`, which stays `onConflictDoNothing` (identity rows, don't churn). When per-property `source='user'` edits exist, the update must guard `WHERE source='classifier'` to preserve hand-edits (mirrors the Model classifier upsert). | | `building_part` | **`smallint NOT NULL`**, explicit index: `0 = main building, 1 = extension 1, 2 = extension 2, …` (matches ADR-0004 `multiEntryOrdering.permutations` indexing). | | Whole-dwelling components | **No special case.** `property_type`/`built_form` are *per-part-capable* too (an extension — conservatory, summer house — can be a different built form / property type). Today's files only supply them once, so they'll usually be written at `building_part = 0` only, but the schema allows per-part with no future migration. | @@ -70,70 +84,136 @@ property_overrides id uuid pk (default random) -- match landlord_* tables property_id bigint NOT NULL FK → property.id (FE-owned table) portfolio_id bigint NOT NULL FK → portfolio.id - building_part smallint NOT NULL -- 0 = main, 1 = ext 1, 2 = ext 2, … - component NOT NULL -- Q6: pgEnum vs text; value set - value text NOT NULL -- snapshot copy of landlord_* resolved enum - source override_source NOT NULL -- 'classifier' | 'user' (reuse existing pgEnum) - description text? -- Q7: store raw landlord description for provenance? + building_part smallint NOT NULL -- 0 = main, 1 = ext 1, 2 = ext 2, … + override_component override_component NOT NULL -- column name == enum type name; pgEnum {wall_type, roof_type, property_type, built_form_type} (Q6 ✓) + override_value text NOT NULL -- snapshot copy of landlord_* resolved enum (free text; `override_component` carries the typing) + -- (no `source`) — dropped Q9: pure value snapshot; add back as nullable column if/when a per-property edit path needs provenance + original_spreadsheet_description text NOT NULL -- raw spreadsheet cell text this snapshot resolved from (Q7 ✓) created_at timestamptz NOT NULL default now() updated_at timestamptz NOT NULL default now() - -- UNIQUE (property_id, component, building_part) -- Q8: confirm + -- UNIQUE (property_id, override_component, building_part) -- Q8 ✓ (source NOT in key — mirrors ADR-0004 single-row flip; portfolio_id implied by property_id) + -- FK property_id → property.id ON DELETE CASCADE; portfolio_id → portfolio.id (Drizzle only; bare bigint in SQLModel mirror); portfolio_id kept (matches property_details_epc / property_targets) ``` ## Open questions (resume here) -- **Q6 — `component` discriminator (WE STOPPED HERE).** pgEnum vs text, and the - value set. *Recommendation:* pgEnum `property_component` (or - `override_component`) with the established category names - **`wall_type`, `roof_type`, `property_type`, `built_form_type`** (same keys as - `column_mapping` / `ClassifiableColumn.name`, so finaliser maps category → - component with no translation). pgEnum over text: small closed set, typos - caught at write time (matters more now `value` is free text); new component = - one-line `ALTER TYPE … ADD VALUE`. +- **Q6 — `component` discriminator. RESOLVED 2026-06-04.** pgEnum + **`override_component`** (column `component`) with values + **`wall_type`, `roof_type`, `property_type`, `built_form_type`**. Verified these + are the *exact* keys used both in the frontend + ([columnFields.ts:30-33](../../src/lib/bulkUpload/columnFields.ts#L30-L33)) and + the backend (`ClassifiableColumn.name` / handler `_build_columns()`), so the + finaliser maps category → component with **no translation**. pgEnum over text: + small closed set, typos caught at write time — and this is now the *only* + DB-level typing left on a row, since `override_value` is free text. New component + = one-line `ALTER TYPE … ADD VALUE` (Drizzle-owned). Enum named `override_*` + (not `property_*`) to sit with `override_source` and stay visually distinct from + the existing *value* enum `property_type`. -- **Q7 — store raw `description` per override row?** For provenance / re-resolution - / debugging vs redundancy (the description already lives in `landlord_*_overrides`). - *Lean:* store it — cheap, and it pins what text produced this snapshot. +- **Q7 — store raw description per override row? RESOLVED 2026-06-04: yes, as + `original_spreadsheet_description text NOT NULL`.** Names the *source artifact* + (the spreadsheet cell), not an actor — sidesteps the Landlord-vs-User conflation + the glossary warns against, and aligns with CONTEXT.md's "the source file". Stored + because `override_value` is a denormalised snapshot that deliberately won't + refresh on later vocabulary edits; pinning the original text makes each row + self-explaining and re-resolvable even after the source `landlord_*_overrides` row + changes. `NOT NULL` is safe **iff** every `property_overrides` row is materialised + from a `landlord_*_overrides` row (whose `description` is itself `NOT NULL`) — + confirm when settling Q9/source semantics. -- **Q8 — uniqueness + FKs.** Confirm `UNIQUE (property_id, component, building_part)`. - `property_id` FK → FE-owned `property.id`; `portfolio_id` as `bigint` (mirror - the `landlord_*` note: FK enforced by Drizzle migration, not the SQLModel). +- **Q8 — uniqueness + FKs. RESOLVED 2026-06-04.** + `UNIQUE (property_id, override_component, building_part)`. `building_part` is in + the key (part 0 and part 1 both carry e.g. a `wall_type` row). `source` is + **deliberately not** in the key — mirrors ADR-0004's single-row-flip (one row, + flip `source` in place; the two-row model was rejected). `portfolio_id` is not in + the key (implied by `property_id`) but **is kept as a column** for query ergonomics + and consistency with `property_details_epc` / `property_targets`, which both + denormalise it. FKs: `property_id → property.id ON DELETE CASCADE`; + `portfolio_id → portfolio.id ON DELETE CASCADE` in the Drizzle migration, but a + bare `bigint` (no FK) in the backend `PropertyOverrideRow` SQLModel mirror — + matching `landlord_wall_type_override_table.py`. -- **Q9 — `source` semantics.** Reuse the existing `override_source` pgEnum - (`classifier`/`user`). At materialise time the finaliser copies the source from - the `landlord_*_overrides` row it resolved from. Confirm there's no *per-property* - override concept yet (today overrides are edited at the **vocabulary/portfolio** - level per ADR-0004; property_overrides just snapshots the outcome). +- **Q9 — `source` semantics. RESOLVED 2026-06-04: drop `source` entirely.** + `property_overrides` is a pure **snapshot of resolved values**. Rationale: there + is no per-property override concept today (per ADR-0004 edits happen at the + **vocabulary/portfolio** level, flipped in place), so a copied `source` would + describe the *vocabulary mapping's* provenance, not this property's — a footgun a + reader/re-score rule could misread, and no consumer needs it in v1. When a genuine + per-property edit path lands (the real use for per-property provenance), `source` + returns as an **additive nullable-column migration** — no need to carry it now. + This also confirms the Q7 `NOT NULL` contingency: every row is still materialised + from a `landlord_*_overrides` row (`description NOT NULL`). -- **Q-scope — v1 scope.** Confirm v1 = `property` (UPRN + address) only; - `property_overrides` table created but **not populated** until follow-up. +- **Q-scope — v1 scope. RESOLVED 2026-06-04.** v1 = the finaliser reproduces + today's **exact 9-column `property` insert** (`portfolio_id`, + `creation_status='READY'`, `uprn`, `landlord_property_id` ← `Internal Reference`, + `address` = matched ?? user-inputted, `postcode`, `user_inputted_address`, + `user_inputted_postcode`, `lexiscore`) **+** `onConflictDoNothing` on + `(portfolio_id, uprn) where uprn is not null` — not a reduced "UPRN + address". + This sizes the "PropertyRow gains insertable columns" decision to all nine + columns plus `creation_status`. The `property_overrides` *table* shipped ahead + (migration 0221, PR #306) but is **not populated** in v1 — population is + follow-up work (and needs a different input source; see combiner-output note + below). ### Application-flow questions not yet reached -- **Trigger + orchestration.** How does Finalise dispatch the finaliser? Likely - the existing `TaskOrchestrator` / `subtask_handler` pattern (see ADR-0003, - `landlord_description_overrides` handler) — Next.js `/finalize` triggers a - subtask instead of inserting. Need the trigger body shape (cf. - `LandlordDescriptionOverridesTriggerBody`: `task_id`, `sub_task_id`, `s3_uri`, - `portfolio_id`, …). +- **Trigger + orchestration. RESOLVED 2026-06-04.** Mirror the + `start-address-matching` path. Next.js creates a `SubTask` (`service: + "finaliser"`) under the BulkUpload's existing Task, then POSTs a new FastAPI + endpoint `POST /v1/bulk-uploads/trigger-finaliser` (auth via `validate_token`), + which enqueues to a **new SQS queue**; a Lambda runs the finaliser wrapped in + `@subtask_handler` (auto-injected `TaskOrchestrator`; `run_subtask` owns the + subtask start/complete/fail + Task cascade). Trigger body + `FinaliserTriggerBody { task_id, sub_task_id, s3_uri (combined output), portfolio_id }` + (extends `SubtaskTriggerBody`). Slow work stays outside the txn; persistence in a + `commit_scope`. The synchronous Next.js `/finalize` route is deleted (locked). -- **State machine / who writes `complete`.** Today Next.js writes `complete` - synchronously after the insert. If Finalise becomes async, **FastAPI/Lambda - writes the terminal status** (mirroring how it already owns - `combining`/`awaiting_review`). This is a CONTEXT.md "Two writers" change. +- **State machine / who writes `complete`. RESOLVED 2026-06-04.** New status + **`finalising`** between `awaiting_review` and `complete` (mirrors `combining` + before `awaiting_review`). Lifecycle: + `awaiting_review → finalising → complete` (↘ `failed`). + - **`finalising`** written by **Next.js at dispatch** via a **compare-and-swap**: + `UPDATE … SET status='finalising' WHERE id=? AND status='awaiting_review'` — + 0 rows ⇒ already dispatched ⇒ 409. This is the **double-dispatch guard** (closes + the simultaneous-click race under `loadForFinalize`'s existing precondition). + - **`complete` / `failed`** written by the **Lambda** directly to + `bulk_address_uploads` (new `set_finalized_status` / `set_failed_status`, + exactly like the combiner's `set_combining_status` / + `set_combined_output_s3_uri`). `markFinalized` + the Next.js `/finalize` route + are deleted. + - **CONTEXT.md "Two writers" change:** Next.js owns dispatch + the + `awaiting_review → finalising` CAS; the backend owns `finalising → complete` + and `→ failed` (in addition to `combining` / `awaiting_review`). + - **UI vs canonical:** persisted enum value is `finalising` (canonical; ties to + the **Finalise** action). The frontend renders it as **"Uploading to ARA"** — a + display-layer label only, **not** the enum name, so UX copy never needs a + migration. -- **Input — does the combiner output carry the raw description cells?** v1 only - needs address/UPRN columns (confirmed present: `address2uprn_uprn`, +- **Input — does the combiner output carry the raw description cells? RESOLVED + 2026-06-04: NO. This is a v2 problem (deferred).** v1 needs only address/UPRN + columns, all **confirmed present** in the combiner output (`address2uprn_uprn`, `address2uprn_address`, `address2uprn_lexiscore`, `Internal Reference`, - address/postcode). For `property_overrides` population (later) the finaliser - also needs the raw `Walls`/`Roofs`/`Property Type` cells **plus** - `multiEntryOrdering` to split per building part — confirm these survive into - the combiner output, or that the finaliser reads them from another source. + `Address 1/2/3`, `postcode`). The raw `Walls`/`Roofs`/`Property Type`/`Built Form` + cells are **not** in the combiner output — they survive only in (a) the + `{uploadId}-classifier.csv` on S3 (original headers) and (b) `landlord_*_overrides` + as *resolved* values keyed by description. So v2 population must assemble **four + inputs**, not one file: + - `property_id` (identity) ← combiner output `(portfolio_id, uprn)` — **but + no-UPRN rows have no such key**; + - raw cell text ← the classifier CSV (not the combiner output); + - cell → building-part split ← `multiEntryOrdering` on `bulk_address_uploads`; + - description → `override_value` ← `landlord_*_overrides` (normalized description). + - **Two open v2 hazards (entry point for the v2 session):** (1) the join key + between classifier CSV and combiner output — is there a stable per-row key + (`Internal Reference`?) and is row order preserved through postcode-split + + combine? (2) obtaining `property_id` for unmatched (no-UPRN) rows — v1's + `onConflictDoNothing` returns no ids, so v2 likely needs `RETURNING id` mapped + back to source rows. -- **Idempotency / re-run.** Today's insert uses - `onConflictDoNothing((portfolio_id, uprn) where uprn is not null)`. Define the - finaliser's re-run behaviour for both tables (esp. that snapshot `value`s won't - refresh on re-run after a vocabulary edit unless we deliberately re-materialise). +- **Idempotency / re-run. RESOLVED 2026-06-04 (per-table).** + - `property`: keep today's `onConflictDoNothing` on `(portfolio_id, uprn) where uprn is not null` — existing properties are not churned. + - `property_overrides`: `onConflictDoUpdate` on `(property_id, override_component, building_part)` — **recalculate** `override_value` + `original_spreadsheet_description` + `updated_at` to the latest resolution, so an existing property whose override changed is refreshed in place. Guard `WHERE source='classifier'` once a per-property user-edit path exists (until then every row is classifier-derived, so blind overwrite is correct). See the "Re-run = recalculate" and "Snapshot, not FK" locked decisions. ## Key code references (from exploration) diff --git a/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/finalize/route.ts b/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/finalize/route.ts index b6fd7ec..f326c65 100644 --- a/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/finalize/route.ts +++ b/src/app/api/portfolio/[portfolioId]/bulk-uploads/[uploadId]/finalize/route.ts @@ -1,160 +1,46 @@ -import { db } from "@/app/db/db"; -import { property } from "@/app/db/schema/property"; -import { sql } from "drizzle-orm"; import { NextRequest, NextResponse } from "next/server"; -import { revalidatePath } from "next/cache"; import { getServerSession } from "next-auth"; import { AuthOptions } from "@/app/api/auth/[...nextauth]/authOptions"; -import { createRetrofitDataS3Client } from "@/app/utils/s3"; -import * as XLSX from "xlsx"; -import { loadForFinalize, markFinalized } from "@/lib/bulkUpload/server"; - -const ADDRESS_COLS = ["Address 1", "Address 2", "Address 3"] as const; -const POSTCODE_COL = "postcode"; -const INTERNAL_REF_COL = "Internal Reference"; -const UPRN_COL = "address2uprn_uprn"; -const MATCHED_ADDRESS_COL = "address2uprn_address"; -const LEXISCORE_COL = "address2uprn_lexiscore"; -const MISSING_SENTINEL = "invalid postcode"; -const UK_POSTCODE_RE = /[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}/i; - -function normalize(v: unknown): string { - if (v === null || v === undefined) return ""; - return String(v).trim(); -} - -function isMissing(v: string): boolean { - return v === "" || v.toLowerCase() === MISSING_SENTINEL; -} - -function parseUprn(raw: unknown): bigint | null { - const v = normalize(raw); - if (isMissing(v)) return null; - try { - return BigInt(v); - } catch { - return null; - } -} - -function parseLexiscore(raw: unknown): number | null { - const v = normalize(raw); - if (isMissing(v)) return null; - const n = Number(v); - return Number.isFinite(n) ? n : null; -} - -function extractPostcode(matched: string | null, fallback: string): string | null { - if (matched) { - const m = matched.match(UK_POSTCODE_RE); - if (m) return m[0].toUpperCase(); - } - return fallback || null; -} - -function parseS3Uri(uri: string): { bucket: string; key: string } | null { - if (!uri.startsWith("s3://")) return null; - const rest = uri.slice(5); - const slash = rest.indexOf("/"); - if (slash < 0) return null; - return { bucket: rest.slice(0, slash), key: rest.slice(slash + 1) }; -} +import { readSessionToken } from "@/lib/session"; +import { dispatchFinaliser } from "@/lib/bulkUpload/server"; +// Finalise is now asynchronous (ADR-0005). This route no longer inserts +// properties; it dispatches the bulk_upload_finaliser Lambda and flips the +// BulkUpload to `finalising` via a compare-and-swap (the double-dispatch guard). +// The Lambda reads the combiner output, inserts the property rows, and writes the +// terminal `complete`/`failed` status directly. The user sees "Uploading to ARA" +// while the row is `finalising`; the onboarding surface polls for the outcome. export async function POST( - _request: NextRequest, + request: NextRequest, { params }: { params: Promise<{ portfolioId: string; uploadId: string }> } ) { const session = await getServerSession(AuthOptions); if (!session) return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); const { uploadId } = await params; + const sessionToken = readSessionToken(request); - const guarded = await loadForFinalize(uploadId); - switch (guarded.kind) { + const result = await dispatchFinaliser({ uploadId, sessionToken }); + + switch (result.kind) { + case "ok": + // Accepted: the finaliser is running; the row is now `finalising`. + return NextResponse.json({ taskId: result.taskId }, { status: 202 }); case "not_found": return NextResponse.json({ error: "Not found" }, { status: 404 }); case "already_finalized": + // Idempotent: nothing to do. return new NextResponse(null, { status: 200 }); - case "wrong_state": - return NextResponse.json( - { error: `Upload not ready to finalize (state: ${guarded.current})` }, - { status: 409 } - ); case "not_yet_combined": return NextResponse.json({ error: "Combiner not finished" }, { status: 409 }); - } - const upload = guarded.upload; - - const parsed = parseS3Uri(upload.combinedOutputS3Uri!); - if (!parsed) { - return NextResponse.json({ error: "Invalid combined output S3 URI" }, { status: 500 }); - } - - const s3 = createRetrofitDataS3Client(); - - let rawRows: Record[]; - try { - const obj = await s3 - .getObject({ Bucket: parsed.bucket, Key: parsed.key }) - .promise(); - const buf = Buffer.from(obj.Body as Uint8Array); - const wb = XLSX.read(buf, { type: "buffer" }); - const sheet = wb.Sheets[wb.SheetNames[0]]; - rawRows = XLSX.utils.sheet_to_json>(sheet, { defval: "" }); - } catch (err) { - console.error("Failed to read combined CSV from S3:", err); - return NextResponse.json({ error: "Failed to read combined CSV" }, { status: 502 }); - } - - const portfolioIdBig = BigInt(upload.portfolioId); - - const values = rawRows.map((raw) => { - const userInputtedAddress = - ADDRESS_COLS.map((c) => normalize(raw[c])).filter(Boolean).join(", ") || null; - const userInputtedPostcode = normalize(raw[POSTCODE_COL]) || null; - - const uprn = parseUprn(raw[UPRN_COL]); - - const matchedAddressRaw = normalize(raw[MATCHED_ADDRESS_COL]); - const matchedAddress = isMissing(matchedAddressRaw) ? null : matchedAddressRaw; - - const address = matchedAddress ?? userInputtedAddress; - const postcode = extractPostcode(matchedAddress, userInputtedPostcode ?? ""); - - const internalRef = normalize(raw[INTERNAL_REF_COL]) || null; - const lexiscore = parseLexiscore(raw[LEXISCORE_COL]); - - return { - portfolioId: portfolioIdBig, - creationStatus: "READY" as const, - uprn, - landlordPropertyId: internalRef, - address, - postcode, - userInputtedAddress, - userInputtedPostcode, - lexiscore, - }; - }); - - try { - if (values.length > 0) { - await db - .insert(property) - .values(values) - .onConflictDoNothing({ - target: [property.portfolioId, property.uprn], - where: sql`${property.uprn} IS NOT NULL`, - }); - } - - await markFinalized(uploadId); - - revalidatePath("/portfolio/[slug]", "layout"); - - return new NextResponse(null, { status: 200 }); - } catch (err) { - console.error("Failed to finalize bulk upload:", err); - return NextResponse.json({ error: "Failed to import properties" }, { status: 500 }); + case "missing_task": + return NextResponse.json({ error: "Upload has no task to finalise" }, { status: 409 }); + case "wrong_state": + return NextResponse.json( + { error: `Upload not ready to finalize (state: ${result.current})` }, + { status: 409 } + ); + case "trigger_failed": + return NextResponse.json({ error: result.message }, { status: result.status }); } } diff --git a/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/OnboardingProgress.tsx b/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/OnboardingProgress.tsx index 3e64cca..749e6b7 100644 --- a/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/OnboardingProgress.tsx +++ b/src/app/portfolio/[slug]/(portfolio)/bulk-upload/[uploadId]/OnboardingProgress.tsx @@ -28,6 +28,7 @@ import { RoofTypeValues, } from "@/app/db/schema/landlord_overrides"; import { CLASSIFIER_FIELDS } from "@/lib/bulkUpload/columnFields"; +import { statusLabel } from "@/lib/bulkUpload/types"; // Valid enum options per classifier category, for the editable dropdowns (#299). const CATEGORY_VALUES: Record = { @@ -100,6 +101,9 @@ export default function OnboardingProgress({ const taskFailed = TASK_FAILED_STATUSES.has(taskStatus); const isCombining = upload.status === "combining"; const isImporting = upload.status === "awaiting_review"; + // Async finaliser in flight (ADR-0005). Polling continues (non-terminal) until + // the backend writes complete/failed. Surfaced to the user as "Uploading to ARA". + const isFinalising = upload.status === "finalising"; const canRunCombiner = taskDone && !taskFailed && upload.status === "processing"; const isAwaitingReview = upload.status === "awaiting_review"; @@ -178,6 +182,12 @@ export default function OnboardingProgress({ Awaiting import )} + {isFinalising && ( + + + {statusLabel("finalising")}… + + )} {needsVerify && sample && ( diff --git a/src/lib/bulkUpload/server.ts b/src/lib/bulkUpload/server.ts index 09710eb..2de69ff 100644 --- a/src/lib/bulkUpload/server.ts +++ b/src/lib/bulkUpload/server.ts @@ -562,9 +562,105 @@ export async function loadForFinalize(uploadId: string): Promise { - await db +export type DispatchFinaliserOutcome = + | { kind: "ok"; taskId: string; subTaskId: string } + | { kind: "not_found" } + | { kind: "already_finalized" } + | { kind: "not_yet_combined" } + | { kind: "wrong_state"; current: string } + | { kind: "missing_task" } + | { kind: "trigger_failed"; status: number; message: string }; + +// Dispatch the async bulk_upload_finaliser (ADR-0005). Replaces the old +// synchronous property insert + markFinalized. Order matters: +// 1. loadForFinalize — rich guards (combined output present, awaiting_review). +// 2. CAS claim `awaiting_review → finalising` — the double-dispatch guard: +// of two simultaneous clicks exactly one updates a row; the loser gets 409. +// 3. create the finaliser subtask under the upload's existing Task + POST the +// trigger. On trigger failure, revert the status so the user can retry and +// mark the subtask failed. The backend writes the terminal complete/failed. +export async function dispatchFinaliser(args: { + uploadId: string; + sessionToken: string | undefined; +}): Promise { + const guarded = await loadForFinalize(args.uploadId); + switch (guarded.kind) { + case "not_found": + return { kind: "not_found" }; + case "already_finalized": + return { kind: "already_finalized" }; + case "not_yet_combined": + return { kind: "not_yet_combined" }; + case "wrong_state": + return { kind: "wrong_state", current: guarded.current }; + } + const upload = guarded.upload; + if (!upload.taskId) return { kind: "missing_task" }; + + // CAS: atomically claim the dispatch. Only the request that flips + // awaiting_review → finalising proceeds; a concurrent one updates 0 rows. + const claimed = await db .update(bulkAddressUploads) - .set({ status: "complete" }) - .where(eq(bulkAddressUploads.id, uploadId)); + .set({ status: "finalising" }) + .where( + and( + eq(bulkAddressUploads.id, args.uploadId), + eq(bulkAddressUploads.status, "awaiting_review"), + ), + ) + .returning(); + if (claimed.length === 0) { + const current = await loadById(args.uploadId); + if (current?.status === "complete") return { kind: "already_finalized" }; + return { kind: "wrong_state", current: current?.status ?? "unknown" }; + } + + const [subTask] = await db + .insert(subTasks) + .values({ + taskId: upload.taskId, + status: "waiting", + service: SUBTASK_SERVICE.finaliser, + inputs: JSON.stringify({ bulk_upload_id: args.uploadId }), + }) + .returning(); + + const payload = { + task_id: upload.taskId, + sub_task_id: subTask.id, + s3_uri: upload.combinedOutputS3Uri, + portfolio_id: Number(upload.portfolioId), + bulk_upload_id: args.uploadId, + }; + + const trigger = await triggerFastApiPipeline({ + endpoint: "/v1/bulk-uploads/trigger-finaliser", + payload, + sessionToken: args.sessionToken, + }); + + if (!trigger.ok) { + // Roll the claim back so the user can retry, and fail the subtask. + await Promise.all([ + db + .update(bulkAddressUploads) + .set({ status: "awaiting_review" }) + .where(eq(bulkAddressUploads.id, args.uploadId)), + db + .update(subTasks) + .set({ + status: "failed", + outputs: JSON.stringify({ error: trigger.message }), + }) + .where(eq(subTasks.id, subTask.id)), + ]); + return { kind: "trigger_failed", status: trigger.status, message: trigger.message }; + } + + await db + .update(subTasks) + .set({ status: "in progress", inputs: JSON.stringify(payload) }) + .where(eq(subTasks.id, subTask.id)); + + return { kind: "ok", taskId: upload.taskId, subTaskId: subTask.id }; } diff --git a/src/lib/bulkUpload/types.ts b/src/lib/bulkUpload/types.ts index 38d8f12..9d6a604 100644 --- a/src/lib/bulkUpload/types.ts +++ b/src/lib/bulkUpload/types.ts @@ -6,6 +6,10 @@ export const BULK_UPLOAD_STATUSES = [ "processing", "combining", "awaiting_review", + // In-flight state of the async finaliser (ADR-0005); mirrors `combining`. The + // status column is free text, so no enum migration is needed. UI renders this + // as "Uploading to ARA" — see STATUS_LABELS. + "finalising", "complete", "failed", ] as const; @@ -19,8 +23,27 @@ export type BulkUpload = typeof bulkAddressUploads.$inferSelect; export const SUBTASK_SERVICE = { address: "address2uprn", classifier: "landlord_description_overrides", + finaliser: "bulk_upload_finaliser", } as const; +// User-facing label for a BulkUpload status. The persisted enum value stays +// canonical (`finalising`); the product surface for that state is "Uploading to +// ARA" (ADR-0005 — a display-layer label, never the enum name). +export const STATUS_LABELS: Record = { + ready_for_processing: "Ready for processing", + mapping_complete: "Mapping complete", + processing: "Processing", + combining: "Combining", + awaiting_review: "Awaiting review", + finalising: "Uploading to ARA", + complete: "Complete", + failed: "Failed", +}; + +export function statusLabel(status: string): string { + return STATUS_LABELS[status] ?? status; +} + export type TaskSummary = { id: string; taskSource: string;