Merge pull request #1326 from Hestia-Homes/feat/modelling-e2e-sqs-batching

Batch postcode groups into SQS messages for modelling_e2e
This commit is contained in:
Jun-te Kim 2026-06-25 11:23:53 +01:00 committed by GitHub
commit 1edf2be345
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -11,19 +11,33 @@ AWS creds come from the ambient ~/.aws profile.
from __future__ import annotations
import ast
import json
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, cast
from utilities.logger import setup_logger
# ---------------------------------------------------------------------------
# CONFIG — edit these before running
# ---------------------------------------------------------------------------
PORTFOLIO_ID: int = 796
SCENARIO_ID: int = 1268
PORTFOLIO_ID: int = 805
SCENARIO_ID: int = 1267
SQS_QUEUE_NAME: str = "modelling_e2e-queue-dev"
# Max number of properties to process this run (cost cap). Postcodes are added
# whole, smallest-group-first, until adding the next would exceed this — so the
# actual count lands at or just under the limit.
PROPERTIES_LIMIT: int = 5000
# Max number of properties to process this run (cost cap).
PROPERTIES_LIMIT: int = 32000
# Number of properties bundled into each SQS message / Lambda invocation.
BATCH_SIZE: int = 50
# Skip properties whose modelling_e2e sub_task completed at or after this time.
# Set to None to disable the filter and process all properties.
COMPLETED_SINCE: datetime | None = datetime(
2026, 6, 24, 12, 27, 54, 34000, tzinfo=timezone(timedelta(hours=1))
)
# True → Lambda runs the full pipeline but skips all DB writes (safe for testing).
DRY_RUN: bool = False
@ -32,12 +46,6 @@ DRY_RUN: bool = False
NO_SOLAR: bool = False
# ---------------------------------------------------------------------------
import ast
import json
import sys
from pathlib import Path
from typing import Any, cast
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT))
@ -66,29 +74,22 @@ def _load_postcode_map() -> dict[str, list[int]]:
return result
def _completed_property_ids() -> set[int]:
"""Return all property IDs with a completed modelling_e2e subtask for this
portfolio + scenario. Single DB round-trip."""
def _completed_property_ids(since: datetime) -> set[int]:
"""Return property IDs with a completed modelling_e2e sub_task on or after *since*."""
load_env(ENV_PATH)
engine = build_engine()
with engine.connect() as conn:
rows = conn.execute(
text("""
SELECT DISTINCT elem.value::int AS property_id
SELECT DISTINCT ((st.inputs::jsonb)->>'property_id')::int AS property_id
FROM sub_task st
JOIN tasks t ON t.id = st.task_id
CROSS JOIN jsonb_array_elements_text(
(st.inputs::jsonb)->'property_ids'
) AS elem(value)
WHERE t.task_source = 'modelling_e2e'
AND st.status = 'complete'
AND ((st.inputs::jsonb)->>'portfolio_id')::int = :portfolio_id
AND ((st.inputs::jsonb)->>'scenario_id')::int = :scenario_id
AND st.job_completed >= :since
AND (st.inputs::jsonb) ? 'property_id'
"""),
{
"portfolio_id": PORTFOLIO_ID,
"scenario_id": SCENARIO_ID,
},
{"since": since},
).fetchall()
return {int(r[0]) for r in rows}
@ -96,27 +97,48 @@ def _completed_property_ids() -> set[int]:
def main() -> None:
postcode_map = _load_postcode_map()
# Pending filter disabled — re-run every property regardless of whether it
# already has a completed modelling_e2e sub_task for this scenario.
batches: list[tuple[str, list[int]]] = []
for postcode, ids in postcode_map.items():
if ids:
batches.append((postcode, ids))
completed: set[int] = set()
if COMPLETED_SINCE is not None:
completed = _completed_property_ids(COMPLETED_SINCE)
logger.info(
f"skipping {len(completed)} properties already completed since {COMPLETED_SINCE}"
)
to_process: list[tuple[str, list[int]]] = []
# Filter to pending IDs, keeping postcode grouping intact.
pending: list[tuple[str, list[int]]] = [
(pc, [i for i in ids if i not in completed])
for pc, ids in postcode_map.items()
if any(i not in completed for i in ids)
]
# Apply PROPERTIES_LIMIT: skip whole postcodes that would exceed the cap.
selected: list[tuple[str, list[int]]] = []
property_count = 0
for postcode, ids in batches:
for postcode, ids in pending:
if property_count + len(ids) > PROPERTIES_LIMIT:
continue
to_process.append((postcode, ids))
selected.append((postcode, ids))
property_count += len(ids)
if not to_process:
# Pack postcodes into batches of ~BATCH_SIZE, never splitting a postcode.
# A postcode larger than BATCH_SIZE becomes its own oversized message.
batches: list[list[int]] = []
current: list[int] = []
for _postcode, ids in selected:
if current and len(current) + len(ids) > BATCH_SIZE:
batches.append(current)
current = list(ids)
else:
current.extend(ids)
if current:
batches.append(current)
if not batches:
logger.info("Nothing left to process.")
return
logger.info(
f"selected {len(to_process)} postcodes / {property_count} properties "
f"selected {property_count} properties across {len(batches)} batches of ~{BATCH_SIZE} "
f"(limit {PROPERTIES_LIMIT})"
)
@ -126,17 +148,17 @@ def main() -> None:
sqs_url: str = sqs.get_queue_url(QueueName=SQS_QUEUE_NAME)["QueueUrl"]
logger.info(
f"sending {len(to_process)} messages "
f"sending {len(batches)} messages "
f"(portfolio={PORTFOLIO_ID}, scenario={SCENARIO_ID}, "
f"dry_run={DRY_RUN}, no_solar={NO_SOLAR}) → {sqs_url}"
)
for postcode, ids in to_process:
for batch in batches:
sqs.send_message(
QueueUrl=sqs_url,
MessageBody=json.dumps(
{
"property_ids": ids,
"property_ids": batch,
"portfolio_id": PORTFOLIO_ID,
"scenario_id": SCENARIO_ID,
"no_solar": NO_SOLAR,
@ -144,9 +166,9 @@ def main() -> None:
}
),
)
logger.info(f" sent {postcode}: {ids}")
logger.info(f" sent batch: {batch}")
logger.info(f"\ndone — {len(to_process)} messages enqueued")
logger.info(f"\ndone — {len(batches)} messages enqueued")
main()