From 25fb0a9ebb88d50dbbb91374f5a1c6594c353b5a Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Thu, 25 Jun 2026 10:21:17 +0000 Subject: [PATCH] Batch postcode groups into SQS messages for modelling_e2e Group pending property IDs by postcode and pack them into ~BATCH_SIZE messages, never splitting a postcode, so each SQS message drives one batched modelling_e2e Lambda invocation. Adds a completed-since skip filter and a properties cap. Co-Authored-By: Claude Opus 4.8 (1M context) --- scripts/trigger_modelling_e2e_sqs.py | 104 ++++++++++++++++----------- 1 file changed, 63 insertions(+), 41 deletions(-) diff --git a/scripts/trigger_modelling_e2e_sqs.py b/scripts/trigger_modelling_e2e_sqs.py index 76105422..e4b0dd4f 100644 --- a/scripts/trigger_modelling_e2e_sqs.py +++ b/scripts/trigger_modelling_e2e_sqs.py @@ -11,19 +11,33 @@ AWS creds come from the ambient ~/.aws profile. from __future__ import annotations +import ast +import json +import sys +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, cast + from utilities.logger import setup_logger # --------------------------------------------------------------------------- # CONFIG — edit these before running # --------------------------------------------------------------------------- -PORTFOLIO_ID: int = 796 -SCENARIO_ID: int = 1268 +PORTFOLIO_ID: int = 805 +SCENARIO_ID: int = 1267 SQS_QUEUE_NAME: str = "modelling_e2e-queue-dev" -# Max number of properties to process this run (cost cap). Postcodes are added -# whole, smallest-group-first, until adding the next would exceed this — so the -# actual count lands at or just under the limit. -PROPERTIES_LIMIT: int = 5000 +# Max number of properties to process this run (cost cap). +PROPERTIES_LIMIT: int = 32000 + +# Number of properties bundled into each SQS message / Lambda invocation. +BATCH_SIZE: int = 50 + +# Skip properties whose modelling_e2e sub_task completed at or after this time. +# Set to None to disable the filter and process all properties. +COMPLETED_SINCE: datetime | None = datetime( + 2026, 6, 24, 12, 27, 54, 34000, tzinfo=timezone(timedelta(hours=1)) +) # True → Lambda runs the full pipeline but skips all DB writes (safe for testing). DRY_RUN: bool = False @@ -32,12 +46,6 @@ DRY_RUN: bool = False NO_SOLAR: bool = False # --------------------------------------------------------------------------- -import ast -import json -import sys -from pathlib import Path -from typing import Any, cast - _REPO_ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(_REPO_ROOT)) @@ -66,29 +74,22 @@ def _load_postcode_map() -> dict[str, list[int]]: return result -def _completed_property_ids() -> set[int]: - """Return all property IDs with a completed modelling_e2e subtask for this - portfolio + scenario. Single DB round-trip.""" +def _completed_property_ids(since: datetime) -> set[int]: + """Return property IDs with a completed modelling_e2e sub_task on or after *since*.""" load_env(ENV_PATH) engine = build_engine() with engine.connect() as conn: rows = conn.execute( text(""" - SELECT DISTINCT elem.value::int AS property_id + SELECT DISTINCT ((st.inputs::jsonb)->>'property_id')::int AS property_id FROM sub_task st JOIN tasks t ON t.id = st.task_id - CROSS JOIN jsonb_array_elements_text( - (st.inputs::jsonb)->'property_ids' - ) AS elem(value) WHERE t.task_source = 'modelling_e2e' AND st.status = 'complete' - AND ((st.inputs::jsonb)->>'portfolio_id')::int = :portfolio_id - AND ((st.inputs::jsonb)->>'scenario_id')::int = :scenario_id + AND st.job_completed >= :since + AND (st.inputs::jsonb) ? 'property_id' """), - { - "portfolio_id": PORTFOLIO_ID, - "scenario_id": SCENARIO_ID, - }, + {"since": since}, ).fetchall() return {int(r[0]) for r in rows} @@ -96,27 +97,48 @@ def _completed_property_ids() -> set[int]: def main() -> None: postcode_map = _load_postcode_map() - # Pending filter disabled — re-run every property regardless of whether it - # already has a completed modelling_e2e sub_task for this scenario. - batches: list[tuple[str, list[int]]] = [] - for postcode, ids in postcode_map.items(): - if ids: - batches.append((postcode, ids)) + completed: set[int] = set() + if COMPLETED_SINCE is not None: + completed = _completed_property_ids(COMPLETED_SINCE) + logger.info( + f"skipping {len(completed)} properties already completed since {COMPLETED_SINCE}" + ) - to_process: list[tuple[str, list[int]]] = [] + # Filter to pending IDs, keeping postcode grouping intact. + pending: list[tuple[str, list[int]]] = [ + (pc, [i for i in ids if i not in completed]) + for pc, ids in postcode_map.items() + if any(i not in completed for i in ids) + ] + + # Apply PROPERTIES_LIMIT: skip whole postcodes that would exceed the cap. + selected: list[tuple[str, list[int]]] = [] property_count = 0 - for postcode, ids in batches: + for postcode, ids in pending: if property_count + len(ids) > PROPERTIES_LIMIT: continue - to_process.append((postcode, ids)) + selected.append((postcode, ids)) property_count += len(ids) - if not to_process: + # Pack postcodes into batches of ~BATCH_SIZE, never splitting a postcode. + # A postcode larger than BATCH_SIZE becomes its own oversized message. + batches: list[list[int]] = [] + current: list[int] = [] + for _postcode, ids in selected: + if current and len(current) + len(ids) > BATCH_SIZE: + batches.append(current) + current = list(ids) + else: + current.extend(ids) + if current: + batches.append(current) + + if not batches: logger.info("Nothing left to process.") return logger.info( - f"selected {len(to_process)} postcodes / {property_count} properties " + f"selected {property_count} properties across {len(batches)} batches of ~{BATCH_SIZE} " f"(limit {PROPERTIES_LIMIT})" ) @@ -126,17 +148,17 @@ def main() -> None: sqs_url: str = sqs.get_queue_url(QueueName=SQS_QUEUE_NAME)["QueueUrl"] logger.info( - f"sending {len(to_process)} messages " + f"sending {len(batches)} messages " f"(portfolio={PORTFOLIO_ID}, scenario={SCENARIO_ID}, " f"dry_run={DRY_RUN}, no_solar={NO_SOLAR}) → {sqs_url}" ) - for postcode, ids in to_process: + for batch in batches: sqs.send_message( QueueUrl=sqs_url, MessageBody=json.dumps( { - "property_ids": ids, + "property_ids": batch, "portfolio_id": PORTFOLIO_ID, "scenario_id": SCENARIO_ID, "no_solar": NO_SOLAR, @@ -144,9 +166,9 @@ def main() -> None: } ), ) - logger.info(f" sent {postcode}: {ids}") + logger.info(f" sent batch: {batch}") - logger.info(f"\ndone — {len(to_process)} messages enqueued") + logger.info(f"\ndone — {len(batches)} messages enqueued") main()