solar + more logs

This commit is contained in:
Jun-te Kim 2026-06-23 18:07:10 +00:00
parent 485ab220d1
commit 51118ce121
3 changed files with 104 additions and 33 deletions

View file

@ -25,6 +25,7 @@ from __future__ import annotations
import dataclasses
import io
import json
import os
from collections.abc import Callable
from typing import Any, Optional, cast
@ -82,6 +83,7 @@ from repositories.property.property_overrides_postgres_reader import (
from repositories.scenario.scenario_postgres_repository import (
ScenarioPostgresRepository,
)
from repositories.solar.solar_postgres_repository import SolarPostgresRepository
from utilities.aws_lambda.task_handler import task_handler
from utilities.logger import setup_logger
@ -235,8 +237,9 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]:
try:
scenario = ScenarioPostgresRepository(read_session).get_many([scenario_id])[0]
products = catalogue_with_off_catalogue_overrides(read_session)
solar_reader = SolarPostgresRepository(read_session)
errors: list[int] = []
failures: list[dict[str, Any]] = []
for property_id in property_ids:
try:
@ -300,9 +303,18 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]:
landlord_overrides=overrides,
).effective_epc
solar_insights: Optional[dict[str, Any]] = (
None if no_solar else _solar_insights_for(solar_client, spatial)
)
# Read-before-fetch: the Google Solar call is paid, so skip it
# when this UPRN's insights are already persisted. Only a cache
# miss hits Google — re-runs cost nothing for solar.
solar_insights: Optional[dict[str, Any]]
solar_was_fetched = False
if no_solar:
solar_insights = None
else:
solar_insights = solar_reader.get(uprn)
if solar_insights is None:
solar_insights = _solar_insights_for(solar_client, spatial)
solar_was_fetched = solar_insights is not None
# All Measure Types are considered: the off-catalogue overlay
# (catalogue_with_off_catalogue_overrides) prices the measures the
@ -339,7 +351,8 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]:
if spatial is not None:
uow.spatial.save(uprn, spatial)
if (
solar_insights is not None
solar_was_fetched
and solar_insights is not None
and spatial is not None
and spatial.coordinates is not None
):
@ -364,10 +377,18 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]:
except Exception as error: # noqa: BLE001
logger.error(
f"property={property_id}: {type(error).__name__}: {error}",
f"property={property_id} uprn={uprns.get(property_id)}: "
f"{type(error).__name__}: {error}",
exc_info=True,
)
errors.append(property_id)
failures.append(
{
"property_id": property_id,
"uprn": uprns.get(property_id),
"error_type": type(error).__name__,
"error": str(error),
}
)
# Cohort certs the mapper could not consume were skipped (not aborted on)
# so prediction could proceed; surface them — with cert numbers — in the
@ -382,12 +403,29 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]:
f"{[s['certificate_number'] for s in skipped_certs]}"
)
if errors:
message = f"failed property_ids: {errors}"
# A property that errored AND a cohort cert the mapper could not consume
# are both surfaced as failures, so the subtask is marked failed and
# shows up for debugging. The whole batch has already run by this point —
# every property that could be modelled was written to DB above — so
# failing here flags the run without discarding the work done so far.
if failures or skipped_certs:
parts: list[str] = []
if failures:
failed_ids = [f["property_id"] for f in failures]
# Persisted verbatim into the subtask's outputs.error (via
# SubTask.fail): include each property's error type + message,
# not just the IDs, so failed runs are diagnosable without
# cross-referencing CloudWatch.
parts.append(
f"failed property_ids: {failed_ids}; "
f"details: {json.dumps(failures)}"
)
if skipped_certs:
message += f"; skipped_unmappable_cohort_certs: {skipped_certs}"
raise RuntimeError(message)
parts.append(
f"skipped_unmappable_cohort_certs: {json.dumps(skipped_certs)}"
)
raise RuntimeError("; ".join(parts))
return {"skipped_unmappable_cohort_certs": skipped_certs} if skipped_certs else None
return None
finally:
read_session.close()

View file

@ -11,6 +11,7 @@ from __future__ import annotations
import json
import sys
from pathlib import Path
from typing import Any, cast
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT))
@ -30,6 +31,7 @@ with engine.connect() as conn:
st.id AS subtask_id,
st.task_id,
st.inputs,
st.outputs,
st.updated_at
FROM sub_task st
JOIN tasks t ON t.id = st.task_id
@ -45,19 +47,32 @@ with engine.connect() as conn:
)
exit(0)
def _error_text(outputs_raw: object) -> str:
"""Pull the persisted failure reason out of the subtask outputs JSON."""
try:
outputs: Any = (
json.loads(outputs_raw)
if isinstance(outputs_raw, str)
else (outputs_raw or {})
)
except Exception:
return str(outputs_raw or "")
if isinstance(outputs, dict):
return str(cast("dict[str, Any]", outputs).get("error", ""))
return ""
# Collect all property_ids across all rows
all_property_ids: list[int] = []
parsed: list[tuple[str, str, list[int], str, str]] = []
for subtask_id, task_id, inputs_raw, updated_at in subtask_rows:
parsed: list[tuple[str, str, list[int], str, str, str]] = []
for subtask_id, task_id, inputs_raw, outputs_raw, updated_at in subtask_rows:
try:
inputs = (
inputs: Any = (
json.loads(inputs_raw)
if isinstance(inputs_raw, str)
else (inputs_raw or {})
)
property_ids: list[int] = [
int(p) for p in (inputs.get("property_ids") or [])
]
raw_ids = cast("list[Any]", cast("dict[str, Any]", inputs).get("property_ids") or [])
property_ids: list[int] = [int(p) for p in raw_ids]
except Exception:
property_ids = []
parsed.append(
@ -67,6 +82,7 @@ with engine.connect() as conn:
property_ids,
str(updated_at),
inputs_raw or "",
_error_text(outputs_raw),
)
)
all_property_ids.extend(property_ids)
@ -80,23 +96,29 @@ with engine.connect() as conn:
).fetchall()
uprn_map = {int(r[0]): int(r[1]) for r in uprn_rows}
def _cell(value: str) -> str:
"""Escape table-breaking characters so multi-line errors stay on one row."""
return value.replace("|", "\\|").replace("\n", "<br>")
lines: list[str] = [
"# Failed modelling_e2e Subtasks\n",
f"| Subtask ID | Task ID | Updated At | Property ID | UPRN | Inputs |",
f"|-----------|---------|------------|-------------|------|--------|",
f"| Subtask ID | Task ID | Updated At | Property ID | UPRN | Error | Inputs |",
f"|-----------|---------|------------|-------------|------|-------|--------|",
]
for subtask_id, task_id, property_ids, updated_at, inputs_raw in parsed:
inputs_cell = (inputs_raw or "").replace("|", "\\|")
for subtask_id, task_id, property_ids, updated_at, inputs_raw, error in parsed:
inputs_cell = _cell(inputs_raw or "")
error_cell = _cell(error or "")
if property_ids:
for pid in property_ids:
uprn = uprn_map.get(pid, "unknown")
lines.append(
f"| {subtask_id} | {task_id} | {updated_at} | {pid} | {uprn} | {inputs_cell} |"
f"| {subtask_id} | {task_id} | {updated_at} | {pid} | {uprn} | {error_cell} | {inputs_cell} |"
)
else:
lines.append(
f"| {subtask_id} | {task_id} | {updated_at} | — | — | {inputs_cell} |"
f"| {subtask_id} | {task_id} | {updated_at} | — | — | {error_cell} | {inputs_cell} |"
)
_OUTPUT.write_text("\n".join(lines) + "\n", encoding="utf-8")

View file

@ -20,9 +20,10 @@ PORTFOLIO_ID: int = 796
SCENARIO_ID: int = 1268
SQS_QUEUE_NAME: str = "modelling_e2e-queue-dev"
# Number of postcodes to process this run (postcodes where all properties are
# already completed are skipped and do not count toward this limit).
POSTCODES_LIMIT: int = 1000
# Max number of properties to process this run (cost cap). Postcodes are added
# whole, smallest-group-first, until adding the next would exceed this — so the
# actual count lands at or just under the limit.
PROPERTIES_LIMIT: int = 5000
# True → Lambda runs the full pipeline but skips all DB writes (safe for testing).
DRY_RUN: bool = False
@ -94,21 +95,31 @@ def _completed_property_ids() -> set[int]:
def main() -> None:
postcode_map = _load_postcode_map()
completed = _completed_property_ids()
logger.info(f"{len(completed)} property IDs already completed — skipping")
# Pending filter disabled — re-run every property regardless of whether it
# already has a completed modelling_e2e sub_task for this scenario.
batches: list[tuple[str, list[int]]] = []
for postcode, ids in postcode_map.items():
pending = [pid for pid in ids if pid not in completed]
if pending:
batches.append((postcode, pending))
if ids:
batches.append((postcode, ids))
to_process = batches[:POSTCODES_LIMIT]
to_process: list[tuple[str, list[int]]] = []
property_count = 0
for postcode, ids in batches:
if property_count + len(ids) > PROPERTIES_LIMIT:
continue
to_process.append((postcode, ids))
property_count += len(ids)
if not to_process:
logger.info("Nothing left to process.")
return
logger.info(
f"selected {len(to_process)} postcodes / {property_count} properties "
f"(limit {PROPERTIES_LIMIT})"
)
sqs: Any = cast(
Any, boto3.client("sqs", region_name="eu-west-2")
) # pyright: ignore[reportUnknownMemberType]