Record per-property failure detail in modelling_e2e subtask outputs

When a property failed, the handler recorded only its bare property_id and
raised RuntimeError("failed property_ids: [...]"). That string is what
SubTask.fail persists into the subtask outputs.error column, so a failed run
told you which property failed but never why — forcing a CloudWatch lookup.

The per-property catch now captures property_id, uprn, error_type, and the
error message, and the raised RuntimeError embeds those as JSON so the subtask
outputs column is parseable directly. query_failed_modelling_e2e.py reads that
outputs.error into a new Error column in its report.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jun-te Kim 2026-06-23 17:52:19 +00:00
parent 6f0e526d3d
commit 290097b1c7
2 changed files with 59 additions and 18 deletions

View file

@ -25,6 +25,7 @@ from __future__ import annotations
import dataclasses
import io
import json
import os
from collections.abc import Callable
from typing import Any, Optional, cast
@ -289,7 +290,7 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]:
scenario = ScenarioPostgresRepository(read_session).get_many([scenario_id])[0]
products = catalogue_with_off_catalogue_overrides(read_session)
errors: list[int] = []
failures: list[dict[str, Any]] = []
for property_id in property_ids:
try:
@ -426,10 +427,18 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]:
except Exception as error: # noqa: BLE001
logger.error(
f"property={property_id}: {type(error).__name__}: {error}",
f"property={property_id} uprn={uprns.get(property_id)}: "
f"{type(error).__name__}: {error}",
exc_info=True,
)
errors.append(property_id)
failures.append(
{
"property_id": property_id,
"uprn": uprns.get(property_id),
"error_type": type(error).__name__,
"error": str(error),
}
)
# Cohort certs the mapper could not consume were skipped (not aborted on)
# so prediction could proceed; surface them — with cert numbers — in the
@ -444,10 +453,20 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]:
f"{[s['certificate_number'] for s in skipped_certs]}"
)
if errors:
message = f"failed property_ids: {errors}"
if failures:
failed_ids = [f["property_id"] for f in failures]
# Persisted verbatim into the subtask's outputs.error (via
# SubTask.fail), so include each property's error type + message —
# not just the IDs — to make failed runs diagnosable without
# cross-referencing CloudWatch.
message = (
f"failed property_ids: {failed_ids}; "
f"details: {json.dumps(failures)}"
)
if skipped_certs:
message += f"; skipped_unmappable_cohort_certs: {skipped_certs}"
message += (
f"; skipped_unmappable_cohort_certs: {json.dumps(skipped_certs)}"
)
raise RuntimeError(message)
return {"skipped_unmappable_cohort_certs": skipped_certs} if skipped_certs else None

View file

@ -11,6 +11,7 @@ from __future__ import annotations
import json
import sys
from pathlib import Path
from typing import Any, cast
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT))
@ -30,6 +31,7 @@ with engine.connect() as conn:
st.id AS subtask_id,
st.task_id,
st.inputs,
st.outputs,
st.updated_at
FROM sub_task st
JOIN tasks t ON t.id = st.task_id
@ -45,19 +47,32 @@ with engine.connect() as conn:
)
exit(0)
def _error_text(outputs_raw: object) -> str:
"""Pull the persisted failure reason out of the subtask outputs JSON."""
try:
outputs: Any = (
json.loads(outputs_raw)
if isinstance(outputs_raw, str)
else (outputs_raw or {})
)
except Exception:
return str(outputs_raw or "")
if isinstance(outputs, dict):
return str(cast("dict[str, Any]", outputs).get("error", ""))
return ""
# Collect all property_ids across all rows
all_property_ids: list[int] = []
parsed: list[tuple[str, str, list[int], str, str]] = []
for subtask_id, task_id, inputs_raw, updated_at in subtask_rows:
parsed: list[tuple[str, str, list[int], str, str, str]] = []
for subtask_id, task_id, inputs_raw, outputs_raw, updated_at in subtask_rows:
try:
inputs = (
inputs: Any = (
json.loads(inputs_raw)
if isinstance(inputs_raw, str)
else (inputs_raw or {})
)
property_ids: list[int] = [
int(p) for p in (inputs.get("property_ids") or [])
]
raw_ids = cast("list[Any]", cast("dict[str, Any]", inputs).get("property_ids") or [])
property_ids: list[int] = [int(p) for p in raw_ids]
except Exception:
property_ids = []
parsed.append(
@ -67,6 +82,7 @@ with engine.connect() as conn:
property_ids,
str(updated_at),
inputs_raw or "",
_error_text(outputs_raw),
)
)
all_property_ids.extend(property_ids)
@ -80,23 +96,29 @@ with engine.connect() as conn:
).fetchall()
uprn_map = {int(r[0]): int(r[1]) for r in uprn_rows}
def _cell(value: str) -> str:
"""Escape table-breaking characters so multi-line errors stay on one row."""
return value.replace("|", "\\|").replace("\n", "<br>")
lines: list[str] = [
"# Failed modelling_e2e Subtasks\n",
f"| Subtask ID | Task ID | Updated At | Property ID | UPRN | Inputs |",
f"|-----------|---------|------------|-------------|------|--------|",
f"| Subtask ID | Task ID | Updated At | Property ID | UPRN | Error | Inputs |",
f"|-----------|---------|------------|-------------|------|-------|--------|",
]
for subtask_id, task_id, property_ids, updated_at, inputs_raw in parsed:
inputs_cell = (inputs_raw or "").replace("|", "\\|")
for subtask_id, task_id, property_ids, updated_at, inputs_raw, error in parsed:
inputs_cell = _cell(inputs_raw or "")
error_cell = _cell(error or "")
if property_ids:
for pid in property_ids:
uprn = uprn_map.get(pid, "unknown")
lines.append(
f"| {subtask_id} | {task_id} | {updated_at} | {pid} | {uprn} | {inputs_cell} |"
f"| {subtask_id} | {task_id} | {updated_at} | {pid} | {uprn} | {error_cell} | {inputs_cell} |"
)
else:
lines.append(
f"| {subtask_id} | {task_id} | {updated_at} | — | — | {inputs_cell} |"
f"| {subtask_id} | {task_id} | {updated_at} | — | — | {error_cell} | {inputs_cell} |"
)
_OUTPUT.write_text("\n".join(lines) + "\n", encoding="utf-8")