From 51118ce121385c3b1a88ad66b1884d060dfe7a49 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Tue, 23 Jun 2026 18:07:10 +0000 Subject: [PATCH] solar + more logs --- applications/modelling_e2e/handler.py | 62 +++++++++++++++++++++------ scripts/query_failed_modelling_e2e.py | 46 ++++++++++++++------ scripts/trigger_modelling_e2e_sqs.py | 29 +++++++++---- 3 files changed, 104 insertions(+), 33 deletions(-) diff --git a/applications/modelling_e2e/handler.py b/applications/modelling_e2e/handler.py index 3faf09f7..d1f454f4 100644 --- a/applications/modelling_e2e/handler.py +++ b/applications/modelling_e2e/handler.py @@ -25,6 +25,7 @@ from __future__ import annotations import dataclasses import io +import json import os from collections.abc import Callable from typing import Any, Optional, cast @@ -82,6 +83,7 @@ from repositories.property.property_overrides_postgres_reader import ( from repositories.scenario.scenario_postgres_repository import ( ScenarioPostgresRepository, ) +from repositories.solar.solar_postgres_repository import SolarPostgresRepository from utilities.aws_lambda.task_handler import task_handler from utilities.logger import setup_logger @@ -235,8 +237,9 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: try: scenario = ScenarioPostgresRepository(read_session).get_many([scenario_id])[0] products = catalogue_with_off_catalogue_overrides(read_session) + solar_reader = SolarPostgresRepository(read_session) - errors: list[int] = [] + failures: list[dict[str, Any]] = [] for property_id in property_ids: try: @@ -300,9 +303,18 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: landlord_overrides=overrides, ).effective_epc - solar_insights: Optional[dict[str, Any]] = ( - None if no_solar else _solar_insights_for(solar_client, spatial) - ) + # Read-before-fetch: the Google Solar call is paid, so skip it + # when this UPRN's insights are already persisted. Only a cache + # miss hits Google — re-runs cost nothing for solar. + solar_insights: Optional[dict[str, Any]] + solar_was_fetched = False + if no_solar: + solar_insights = None + else: + solar_insights = solar_reader.get(uprn) + if solar_insights is None: + solar_insights = _solar_insights_for(solar_client, spatial) + solar_was_fetched = solar_insights is not None # All Measure Types are considered: the off-catalogue overlay # (catalogue_with_off_catalogue_overrides) prices the measures the @@ -339,7 +351,8 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: if spatial is not None: uow.spatial.save(uprn, spatial) if ( - solar_insights is not None + solar_was_fetched + and solar_insights is not None and spatial is not None and spatial.coordinates is not None ): @@ -364,10 +377,18 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: except Exception as error: # noqa: BLE001 logger.error( - f"property={property_id}: {type(error).__name__}: {error}", + f"property={property_id} uprn={uprns.get(property_id)}: " + f"{type(error).__name__}: {error}", exc_info=True, ) - errors.append(property_id) + failures.append( + { + "property_id": property_id, + "uprn": uprns.get(property_id), + "error_type": type(error).__name__, + "error": str(error), + } + ) # Cohort certs the mapper could not consume were skipped (not aborted on) # so prediction could proceed; surface them — with cert numbers — in the @@ -382,12 +403,29 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: f"{[s['certificate_number'] for s in skipped_certs]}" ) - if errors: - message = f"failed property_ids: {errors}" + # A property that errored AND a cohort cert the mapper could not consume + # are both surfaced as failures, so the subtask is marked failed and + # shows up for debugging. The whole batch has already run by this point — + # every property that could be modelled was written to DB above — so + # failing here flags the run without discarding the work done so far. + if failures or skipped_certs: + parts: list[str] = [] + if failures: + failed_ids = [f["property_id"] for f in failures] + # Persisted verbatim into the subtask's outputs.error (via + # SubTask.fail): include each property's error type + message, + # not just the IDs, so failed runs are diagnosable without + # cross-referencing CloudWatch. + parts.append( + f"failed property_ids: {failed_ids}; " + f"details: {json.dumps(failures)}" + ) if skipped_certs: - message += f"; skipped_unmappable_cohort_certs: {skipped_certs}" - raise RuntimeError(message) + parts.append( + f"skipped_unmappable_cohort_certs: {json.dumps(skipped_certs)}" + ) + raise RuntimeError("; ".join(parts)) - return {"skipped_unmappable_cohort_certs": skipped_certs} if skipped_certs else None + return None finally: read_session.close() diff --git a/scripts/query_failed_modelling_e2e.py b/scripts/query_failed_modelling_e2e.py index a4cca69f..db684023 100644 --- a/scripts/query_failed_modelling_e2e.py +++ b/scripts/query_failed_modelling_e2e.py @@ -11,6 +11,7 @@ from __future__ import annotations import json import sys from pathlib import Path +from typing import Any, cast _REPO_ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(_REPO_ROOT)) @@ -30,6 +31,7 @@ with engine.connect() as conn: st.id AS subtask_id, st.task_id, st.inputs, + st.outputs, st.updated_at FROM sub_task st JOIN tasks t ON t.id = st.task_id @@ -45,19 +47,32 @@ with engine.connect() as conn: ) exit(0) + def _error_text(outputs_raw: object) -> str: + """Pull the persisted failure reason out of the subtask outputs JSON.""" + try: + outputs: Any = ( + json.loads(outputs_raw) + if isinstance(outputs_raw, str) + else (outputs_raw or {}) + ) + except Exception: + return str(outputs_raw or "") + if isinstance(outputs, dict): + return str(cast("dict[str, Any]", outputs).get("error", "")) + return "" + # Collect all property_ids across all rows all_property_ids: list[int] = [] - parsed: list[tuple[str, str, list[int], str, str]] = [] - for subtask_id, task_id, inputs_raw, updated_at in subtask_rows: + parsed: list[tuple[str, str, list[int], str, str, str]] = [] + for subtask_id, task_id, inputs_raw, outputs_raw, updated_at in subtask_rows: try: - inputs = ( + inputs: Any = ( json.loads(inputs_raw) if isinstance(inputs_raw, str) else (inputs_raw or {}) ) - property_ids: list[int] = [ - int(p) for p in (inputs.get("property_ids") or []) - ] + raw_ids = cast("list[Any]", cast("dict[str, Any]", inputs).get("property_ids") or []) + property_ids: list[int] = [int(p) for p in raw_ids] except Exception: property_ids = [] parsed.append( @@ -67,6 +82,7 @@ with engine.connect() as conn: property_ids, str(updated_at), inputs_raw or "", + _error_text(outputs_raw), ) ) all_property_ids.extend(property_ids) @@ -80,23 +96,29 @@ with engine.connect() as conn: ).fetchall() uprn_map = {int(r[0]): int(r[1]) for r in uprn_rows} +def _cell(value: str) -> str: + """Escape table-breaking characters so multi-line errors stay on one row.""" + return value.replace("|", "\\|").replace("\n", "
") + + lines: list[str] = [ "# Failed modelling_e2e Subtasks\n", - f"| Subtask ID | Task ID | Updated At | Property ID | UPRN | Inputs |", - f"|-----------|---------|------------|-------------|------|--------|", + f"| Subtask ID | Task ID | Updated At | Property ID | UPRN | Error | Inputs |", + f"|-----------|---------|------------|-------------|------|-------|--------|", ] -for subtask_id, task_id, property_ids, updated_at, inputs_raw in parsed: - inputs_cell = (inputs_raw or "").replace("|", "\\|") +for subtask_id, task_id, property_ids, updated_at, inputs_raw, error in parsed: + inputs_cell = _cell(inputs_raw or "") + error_cell = _cell(error or "") if property_ids: for pid in property_ids: uprn = uprn_map.get(pid, "unknown") lines.append( - f"| {subtask_id} | {task_id} | {updated_at} | {pid} | {uprn} | {inputs_cell} |" + f"| {subtask_id} | {task_id} | {updated_at} | {pid} | {uprn} | {error_cell} | {inputs_cell} |" ) else: lines.append( - f"| {subtask_id} | {task_id} | {updated_at} | — | — | {inputs_cell} |" + f"| {subtask_id} | {task_id} | {updated_at} | — | — | {error_cell} | {inputs_cell} |" ) _OUTPUT.write_text("\n".join(lines) + "\n", encoding="utf-8") diff --git a/scripts/trigger_modelling_e2e_sqs.py b/scripts/trigger_modelling_e2e_sqs.py index f0eaed38..76105422 100644 --- a/scripts/trigger_modelling_e2e_sqs.py +++ b/scripts/trigger_modelling_e2e_sqs.py @@ -20,9 +20,10 @@ PORTFOLIO_ID: int = 796 SCENARIO_ID: int = 1268 SQS_QUEUE_NAME: str = "modelling_e2e-queue-dev" -# Number of postcodes to process this run (postcodes where all properties are -# already completed are skipped and do not count toward this limit). -POSTCODES_LIMIT: int = 1000 +# Max number of properties to process this run (cost cap). Postcodes are added +# whole, smallest-group-first, until adding the next would exceed this — so the +# actual count lands at or just under the limit. +PROPERTIES_LIMIT: int = 5000 # True → Lambda runs the full pipeline but skips all DB writes (safe for testing). DRY_RUN: bool = False @@ -94,21 +95,31 @@ def _completed_property_ids() -> set[int]: def main() -> None: postcode_map = _load_postcode_map() - completed = _completed_property_ids() - logger.info(f"{len(completed)} property IDs already completed — skipping") + # Pending filter disabled — re-run every property regardless of whether it + # already has a completed modelling_e2e sub_task for this scenario. batches: list[tuple[str, list[int]]] = [] for postcode, ids in postcode_map.items(): - pending = [pid for pid in ids if pid not in completed] - if pending: - batches.append((postcode, pending)) + if ids: + batches.append((postcode, ids)) - to_process = batches[:POSTCODES_LIMIT] + to_process: list[tuple[str, list[int]]] = [] + property_count = 0 + for postcode, ids in batches: + if property_count + len(ids) > PROPERTIES_LIMIT: + continue + to_process.append((postcode, ids)) + property_count += len(ids) if not to_process: logger.info("Nothing left to process.") return + logger.info( + f"selected {len(to_process)} postcodes / {property_count} properties " + f"(limit {PROPERTIES_LIMIT})" + ) + sqs: Any = cast( Any, boto3.client("sqs", region_name="eu-west-2") ) # pyright: ignore[reportUnknownMemberType]