diff --git a/applications/modelling_e2e/handler.py b/applications/modelling_e2e/handler.py index 9a8c8e73..0f2c4fe6 100644 --- a/applications/modelling_e2e/handler.py +++ b/applications/modelling_e2e/handler.py @@ -25,6 +25,7 @@ from __future__ import annotations import dataclasses import io +import json import os from collections.abc import Callable from typing import Any, Optional, cast @@ -289,7 +290,7 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: scenario = ScenarioPostgresRepository(read_session).get_many([scenario_id])[0] products = catalogue_with_off_catalogue_overrides(read_session) - errors: list[int] = [] + failures: list[dict[str, Any]] = [] for property_id in property_ids: try: @@ -426,10 +427,18 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: except Exception as error: # noqa: BLE001 logger.error( - f"property={property_id}: {type(error).__name__}: {error}", + f"property={property_id} uprn={uprns.get(property_id)}: " + f"{type(error).__name__}: {error}", exc_info=True, ) - errors.append(property_id) + failures.append( + { + "property_id": property_id, + "uprn": uprns.get(property_id), + "error_type": type(error).__name__, + "error": str(error), + } + ) # Cohort certs the mapper could not consume were skipped (not aborted on) # so prediction could proceed; surface them — with cert numbers — in the @@ -444,10 +453,20 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: f"{[s['certificate_number'] for s in skipped_certs]}" ) - if errors: - message = f"failed property_ids: {errors}" + if failures: + failed_ids = [f["property_id"] for f in failures] + # Persisted verbatim into the subtask's outputs.error (via + # SubTask.fail), so include each property's error type + message — + # not just the IDs — to make failed runs diagnosable without + # cross-referencing CloudWatch. + message = ( + f"failed property_ids: {failed_ids}; " + f"details: {json.dumps(failures)}" + ) if skipped_certs: - message += f"; skipped_unmappable_cohort_certs: {skipped_certs}" + message += ( + f"; skipped_unmappable_cohort_certs: {json.dumps(skipped_certs)}" + ) raise RuntimeError(message) return {"skipped_unmappable_cohort_certs": skipped_certs} if skipped_certs else None diff --git a/scripts/query_failed_modelling_e2e.py b/scripts/query_failed_modelling_e2e.py index a4cca69f..db684023 100644 --- a/scripts/query_failed_modelling_e2e.py +++ b/scripts/query_failed_modelling_e2e.py @@ -11,6 +11,7 @@ from __future__ import annotations import json import sys from pathlib import Path +from typing import Any, cast _REPO_ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(_REPO_ROOT)) @@ -30,6 +31,7 @@ with engine.connect() as conn: st.id AS subtask_id, st.task_id, st.inputs, + st.outputs, st.updated_at FROM sub_task st JOIN tasks t ON t.id = st.task_id @@ -45,19 +47,32 @@ with engine.connect() as conn: ) exit(0) + def _error_text(outputs_raw: object) -> str: + """Pull the persisted failure reason out of the subtask outputs JSON.""" + try: + outputs: Any = ( + json.loads(outputs_raw) + if isinstance(outputs_raw, str) + else (outputs_raw or {}) + ) + except Exception: + return str(outputs_raw or "") + if isinstance(outputs, dict): + return str(cast("dict[str, Any]", outputs).get("error", "")) + return "" + # Collect all property_ids across all rows all_property_ids: list[int] = [] - parsed: list[tuple[str, str, list[int], str, str]] = [] - for subtask_id, task_id, inputs_raw, updated_at in subtask_rows: + parsed: list[tuple[str, str, list[int], str, str, str]] = [] + for subtask_id, task_id, inputs_raw, outputs_raw, updated_at in subtask_rows: try: - inputs = ( + inputs: Any = ( json.loads(inputs_raw) if isinstance(inputs_raw, str) else (inputs_raw or {}) ) - property_ids: list[int] = [ - int(p) for p in (inputs.get("property_ids") or []) - ] + raw_ids = cast("list[Any]", cast("dict[str, Any]", inputs).get("property_ids") or []) + property_ids: list[int] = [int(p) for p in raw_ids] except Exception: property_ids = [] parsed.append( @@ -67,6 +82,7 @@ with engine.connect() as conn: property_ids, str(updated_at), inputs_raw or "", + _error_text(outputs_raw), ) ) all_property_ids.extend(property_ids) @@ -80,23 +96,29 @@ with engine.connect() as conn: ).fetchall() uprn_map = {int(r[0]): int(r[1]) for r in uprn_rows} +def _cell(value: str) -> str: + """Escape table-breaking characters so multi-line errors stay on one row.""" + return value.replace("|", "\\|").replace("\n", "
") + + lines: list[str] = [ "# Failed modelling_e2e Subtasks\n", - f"| Subtask ID | Task ID | Updated At | Property ID | UPRN | Inputs |", - f"|-----------|---------|------------|-------------|------|--------|", + f"| Subtask ID | Task ID | Updated At | Property ID | UPRN | Error | Inputs |", + f"|-----------|---------|------------|-------------|------|-------|--------|", ] -for subtask_id, task_id, property_ids, updated_at, inputs_raw in parsed: - inputs_cell = (inputs_raw or "").replace("|", "\\|") +for subtask_id, task_id, property_ids, updated_at, inputs_raw, error in parsed: + inputs_cell = _cell(inputs_raw or "") + error_cell = _cell(error or "") if property_ids: for pid in property_ids: uprn = uprn_map.get(pid, "unknown") lines.append( - f"| {subtask_id} | {task_id} | {updated_at} | {pid} | {uprn} | {inputs_cell} |" + f"| {subtask_id} | {task_id} | {updated_at} | {pid} | {uprn} | {error_cell} | {inputs_cell} |" ) else: lines.append( - f"| {subtask_id} | {task_id} | {updated_at} | — | — | {inputs_cell} |" + f"| {subtask_id} | {task_id} | {updated_at} | — | — | {error_cell} | {inputs_cell} |" ) _OUTPUT.write_text("\n".join(lines) + "\n", encoding="utf-8")