Merge pull request #1274 from Hestia-Homes/trigger-e2e-locally

Limit database connections using NullPool
This commit is contained in:
Daniel Roth 2026-06-23 17:42:22 +01:00 committed by GitHub
commit a76666926f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 4338 additions and 69 deletions

View file

@ -23,6 +23,7 @@ invocations (ADR-0012).
from __future__ import annotations
import dataclasses
import io
import os
from collections.abc import Callable
@ -93,7 +94,11 @@ logger = setup_logger()
def _get_engine() -> Engine:
global _engine
if _engine is None:
_engine = make_engine(PostgresConfig.from_env(dict(os.environ)))
config = PostgresConfig.from_env(dict(os.environ))
# Reduced pool for Lambda: 32 concurrent containers × 3 connections = 96 max,
# vs the default 3+5=8 which would reach 256+ and exhaust RDS max_connections.
# pool_size=2 covers the simultaneous read_session + UoW session per invocation.
_engine = make_engine(dataclasses.replace(config, pool_size=2, max_overflow=1))
return _engine

View file

@ -1,20 +1,25 @@
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any, Optional, Type
from sqlalchemy.engine import Engine
from sqlalchemy.pool import Pool
from sqlmodel import Session, create_engine
from infrastructure.postgres.config import PostgresConfig
def make_engine(config: PostgresConfig) -> Engine:
return create_engine(
config.url(),
pool_size=config.pool_size,
max_overflow=config.max_overflow,
pool_pre_ping=config.pool_pre_ping,
pool_recycle=config.pool_recycle,
)
def make_engine(
config: PostgresConfig, poolclass: Optional[Type[Pool]] = None
) -> Engine:
kwargs: dict[str, Any] = {"pool_pre_ping": config.pool_pre_ping}
if poolclass is None:
kwargs["pool_size"] = config.pool_size
kwargs["max_overflow"] = config.max_overflow
kwargs["pool_recycle"] = config.pool_recycle
else:
kwargs["poolclass"] = poolclass
return create_engine(config.url(), **kwargs)
def make_session(engine: Engine) -> Session:

View file

@ -0,0 +1,17 @@
# Failed modelling_e2e Subtasks
| Subtask ID | Task ID | Updated At | Property ID | UPRN | Inputs |
|-----------|---------|------------|-------------|------|--------|
| ce6a5844-cf31-495e-9e45-91a83aedb8e7 | a520b8a0-2b7a-4a99-a87d-bfe4bf785de6 | 2026-06-23 11:35:58.448106+00:00 | 719897 | 100021919718 | {"property_ids": [719897], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 31e687bb-9b9d-4fcc-b640-f68f878cf49a | b7dde417-2e5d-42ee-b695-c9d7a9ca81c1 | 2026-06-23 11:35:51.384121+00:00 | 733315 | 100020401711 | {"property_ids": [733315], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| b337172c-9dc5-48a0-9eb8-8021893a0ef1 | 01266210-d44a-4715-bf21-eda88a67a5e7 | 2026-06-23 11:35:41.863424+00:00 | 723589 | 100020407755 | {"property_ids": [723589], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| d2af286a-8964-4040-8429-e289d215c635 | 5f408513-be4d-4ce0-96b0-ceb654563ca2 | 2026-06-23 11:35:39.939416+00:00 | 726592 | 100021918195 | {"property_ids": [726592], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 1d13c3a2-b269-4ca4-a3f7-f19479081444 | cebcff9b-4a46-48f0-a648-f40b030951b2 | 2026-06-23 11:35:18.760333+00:00 | 711228 | 100020416477 | {"property_ids": [711228], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| f920f079-edd0-4e3e-893e-809e57a57292 | e6a75a2f-2165-4c6a-b929-f485db08b5a2 | 2026-06-23 11:35:11.908026+00:00 | 717435 | 22010468 | {"property_ids": [717435], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 36ec4a16-be95-4fda-8279-d7d33ed5a556 | 693c3886-efc6-48c1-b99d-576b5736c7e9 | 2026-06-23 11:35:06.439654+00:00 | 710339 | 10013151061 | {"property_ids": [710339], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 8960e551-10af-48ac-b4ad-821154a79a1c | 17791ee1-1ec6-49ea-a6fc-6f1e8d20914d | 2026-06-23 11:34:58.830828+00:00 | 721815 | 22086690 | {"property_ids": [721815], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 453ca0df-3b0d-427d-abf5-8462194f770b | 4712998d-d130-4472-860f-5b1d2471b3e3 | 2026-06-23 11:34:47.106853+00:00 | 712401 | 100020394694 | {"property_ids": [712401], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 1f5ec8fb-7202-44d6-8794-44515f9b4d82 | e50d8753-fd8b-4735-ac89-225428989ec5 | 2026-06-23 11:34:39.473828+00:00 | 723881 | 22005280 | {"property_ids": [723881], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 6ddff51f-1c29-439c-9335-b5befee64836 | dbac1632-3648-46da-97bc-0ca572bc9c45 | 2026-06-23 11:34:35.708295+00:00 | 715891 | 22082258 | {"property_ids": [715891], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 3255673d-7cba-49f8-ba88-106301dfa029 | 620f5571-d6c5-469a-afe3-8986b53dd041 | 2026-06-23 11:34:32.737237+00:00 | 716049 | 22104161 | {"property_ids": [716049], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |
| 9b1a8289-de92-4935-afd6-b93a89f400e6 | 35abffbe-7574-448a-b8cf-89586bf9057d | 2026-06-23 11:34:31.003170+00:00 | 730259 | 100061905741 | {"property_ids": [730259], "portfolio_id": 796, "scenario_id": 1268, "no_solar": false, "dry_run": false} |

View file

@ -0,0 +1,47 @@
"""Print a dict of postcode → property IDs for a portfolio, sorted by group size.
Edit PORTFOLIO_ID below, then hit Run.
"""
from __future__ import annotations
# ---------------------------------------------------------------------------
# CONFIG
# ---------------------------------------------------------------------------
PORTFOLIO_ID: int = 796
# ---------------------------------------------------------------------------
import sys
from collections import defaultdict
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT))
from sqlalchemy import text # noqa: E402
from scripts.e2e_common import ENV_PATH, build_engine, load_env # noqa: E402
load_env(ENV_PATH)
engine = build_engine()
with engine.connect() as conn:
rows = conn.execute(
text("SELECT id, postcode FROM property WHERE portfolio_id = :pid ORDER BY postcode, id"),
{"pid": PORTFOLIO_ID},
).fetchall()
by_postcode: dict[str, list[int]] = defaultdict(list)
for pid, postcode in rows:
by_postcode[postcode or "UNKNOWN"].append(int(pid))
sorted_dict = dict(sorted(by_postcode.items(), key=lambda kv: len(kv[1])))
output_path = _REPO_ROOT / "scripts" / f"properties_by_postcode_{PORTFOLIO_ID}.txt"
lines = [f"{postcode!r}: {ids}" for postcode, ids in sorted_dict.items()]
lines.append(
f"\nTotal postcodes: {len(sorted_dict)}, total properties: {sum(len(v) for v in sorted_dict.values())}"
)
output_path.write_text("\n".join(lines), encoding="utf-8")
print(f"Saved to {output_path}")

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,103 @@
"""Query failed modelling_e2e subtasks and write a markdown report.
Joins sub_task tasks, pulls property_ids from the inputs JSON, then looks up
UPRNs from the property table.
Hit Run output written to scripts/failed_modelling_e2e.md
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT))
from sqlalchemy import text # noqa: E402
from scripts.e2e_common import ENV_PATH, build_engine, load_env # noqa: E402
_OUTPUT = _REPO_ROOT / "scripts" / "failed_modelling_e2e.md"
load_env(ENV_PATH)
engine = build_engine()
with engine.connect() as conn:
subtask_rows = conn.execute(text("""
SELECT
st.id AS subtask_id,
st.task_id,
st.inputs,
st.updated_at
FROM sub_task st
JOIN tasks t ON t.id = st.task_id
WHERE t.task_source = 'modelling_e2e'
AND st.status = 'failed'
ORDER BY st.updated_at DESC
""")).fetchall()
if not subtask_rows:
print("No failed modelling_e2e subtasks found.")
_OUTPUT.write_text(
"# Failed modelling_e2e Subtasks\n\nNone found.\n", encoding="utf-8"
)
exit(0)
# Collect all property_ids across all rows
all_property_ids: list[int] = []
parsed: list[tuple[str, str, list[int], str, str]] = []
for subtask_id, task_id, inputs_raw, updated_at in subtask_rows:
try:
inputs = (
json.loads(inputs_raw)
if isinstance(inputs_raw, str)
else (inputs_raw or {})
)
property_ids: list[int] = [
int(p) for p in (inputs.get("property_ids") or [])
]
except Exception:
property_ids = []
parsed.append(
(
str(subtask_id),
str(task_id),
property_ids,
str(updated_at),
inputs_raw or "",
)
)
all_property_ids.extend(property_ids)
# Look up UPRNs
uprn_map: dict[int, int] = {}
if all_property_ids:
uprn_rows = conn.execute(
text("SELECT id, uprn FROM property WHERE id = ANY(:ids)"),
{"ids": all_property_ids},
).fetchall()
uprn_map = {int(r[0]): int(r[1]) for r in uprn_rows}
lines: list[str] = [
"# Failed modelling_e2e Subtasks\n",
f"| Subtask ID | Task ID | Updated At | Property ID | UPRN | Inputs |",
f"|-----------|---------|------------|-------------|------|--------|",
]
for subtask_id, task_id, property_ids, updated_at, inputs_raw in parsed:
inputs_cell = (inputs_raw or "").replace("|", "\\|")
if property_ids:
for pid in property_ids:
uprn = uprn_map.get(pid, "unknown")
lines.append(
f"| {subtask_id} | {task_id} | {updated_at} | {pid} | {uprn} | {inputs_cell} |"
)
else:
lines.append(
f"| {subtask_id} | {task_id} | {updated_at} | — | — | {inputs_cell} |"
)
_OUTPUT.write_text("\n".join(lines) + "\n", encoding="utf-8")
print(f"Written {len(parsed)} failed subtasks → {_OUTPUT}")

View file

@ -1,37 +1,41 @@
"""Enqueue one SQS message per property for the modelling_e2e Lambda.
"""Enqueue one SQS message per postcode group for the modelling_e2e Lambda.
Reads all property IDs for the given portfolio from the DB and sends a batch of
SQS messages, one per property. The Lambda then processes each message
independently, enabling concurrent modelling at scale.
Reads postcode property ID groups from the file produced by
list_properties_by_postcode.py, queries the DB for already-completed
property IDs, then sends one SQS message per postcode batch containing only
the properties that still need processing.
Edit the CONFIG block below, then run via VSCode Run button or Jupyter.
AWS creds come from the ambient ~/.aws profile; DB creds from backend/.env.
Edit the CONFIG block below, then hit Run.
AWS creds come from the ambient ~/.aws profile.
"""
from __future__ import annotations
# --------------------------------------------------------------------------
from utilities.logger import setup_logger
# ---------------------------------------------------------------------------
# CONFIG — edit these before running
# ---------------------------------------------------------------------------
PORTFOLIO_ID: int = 785
SCENARIO_ID: int = 1266
SQS_URL: str = "https://sqs.eu-west-2.amazonaws.com/ACCOUNT_ID/modelling-e2e-STAGE"
PORTFOLIO_ID: int = 796
SCENARIO_ID: int = 1268
SQS_QUEUE_NAME: str = "modelling_e2e-queue-dev"
# Set to a positive integer to enqueue only the first N properties (trial run).
LIMIT: int | None = 10
# Number of postcodes to process this run (postcodes where all properties are
# already completed are skipped and do not count toward this limit).
POSTCODES_LIMIT: int = 1000
# True → Lambda runs the full pipeline but skips all DB writes (safe for testing).
DRY_RUN: bool = True
DRY_RUN: bool = False
# True → Lambda skips the Google Solar fetch.
NO_SOLAR: bool = False
# ---------------------------------------------------------------------------
import ast
import json
import sys
from pathlib import Path
from typing import Any, cast
from uuid import uuid4
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT))
@ -41,65 +45,97 @@ from sqlalchemy import text # noqa: E402
from scripts.e2e_common import ENV_PATH, build_engine, load_env # noqa: E402
_BATCH_SIZE = 10
logger = setup_logger()
_POSTCODES_FILE = _REPO_ROOT / "scripts" / f"properties_by_postcode_{PORTFOLIO_ID}.txt"
def _property_ids(portfolio_id: int, limit: int | None, engine: object) -> list[int]:
from sqlalchemy.engine import Engine
def _load_postcode_map() -> dict[str, list[int]]:
if not _POSTCODES_FILE.exists():
raise FileNotFoundError(
f"{_POSTCODES_FILE} not found — run list_properties_by_postcode.py first"
)
result: dict[str, list[int]] = {}
for line in _POSTCODES_FILE.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("Total"):
continue
postcode_repr, ids_repr = line.split(": ", 1)
result[ast.literal_eval(postcode_repr)] = ast.literal_eval(ids_repr)
return result
assert isinstance(engine, Engine)
query = "SELECT id FROM property WHERE portfolio_id = :pid ORDER BY id"
if limit is not None:
query += f" LIMIT {int(limit)}"
def _completed_property_ids() -> set[int]:
"""Return all property IDs with a completed modelling_e2e subtask for this
portfolio + scenario. Single DB round-trip."""
load_env(ENV_PATH)
engine = build_engine()
with engine.connect() as conn:
rows = conn.execute(text(query), {"pid": portfolio_id}).fetchall()
return [int(r[0]) for r in rows]
def _batches(items: list[int], size: int) -> list[list[int]]:
return [items[i : i + size] for i in range(0, len(items), size)]
rows = conn.execute(
text("""
SELECT DISTINCT elem.value::int AS property_id
FROM sub_task st
JOIN tasks t ON t.id = st.task_id
CROSS JOIN jsonb_array_elements_text(
(st.inputs::jsonb)->'property_ids'
) AS elem(value)
WHERE t.task_source = 'modelling_e2e'
AND st.status = 'complete'
AND ((st.inputs::jsonb)->>'portfolio_id')::int = :portfolio_id
AND ((st.inputs::jsonb)->>'scenario_id')::int = :scenario_id
"""),
{
"portfolio_id": PORTFOLIO_ID,
"scenario_id": SCENARIO_ID,
},
).fetchall()
return {int(r[0]) for r in rows}
def main() -> None:
load_env(ENV_PATH)
engine = build_engine()
postcode_map = _load_postcode_map()
completed = _completed_property_ids()
logger.info(f"{len(completed)} property IDs already completed — skipping")
ids = _property_ids(PORTFOLIO_ID, LIMIT, engine)
if not ids:
print(f"no properties found for portfolio {PORTFOLIO_ID}")
batches: list[tuple[str, list[int]]] = []
for postcode, ids in postcode_map.items():
pending = [pid for pid in ids if pid not in completed]
if pending:
batches.append((postcode, pending))
to_process = batches[:POSTCODES_LIMIT]
if not to_process:
logger.info("Nothing left to process.")
return
print(
f"enqueuing {len(ids)} properties "
sqs: Any = cast(
Any, boto3.client("sqs", region_name="eu-west-2")
) # pyright: ignore[reportUnknownMemberType]
sqs_url: str = sqs.get_queue_url(QueueName=SQS_QUEUE_NAME)["QueueUrl"]
logger.info(
f"sending {len(to_process)} messages "
f"(portfolio={PORTFOLIO_ID}, scenario={SCENARIO_ID}, "
f"no_solar={NO_SOLAR}, dry_run={DRY_RUN}) → {SQS_URL}"
f"dry_run={DRY_RUN}, no_solar={NO_SOLAR}) → {sqs_url}"
)
sqs: Any = cast(
Any, boto3.client("sqs")
) # pyright: ignore[reportUnknownMemberType]
sent = 0
for batch in _batches(ids, _BATCH_SIZE):
entries = [
{
"Id": str(uuid4()).replace("-", "")[:8] + str(i),
"MessageBody": json.dumps(
{
"property_id": [pid],
"portfolio_id": PORTFOLIO_ID,
"scenario_id": SCENARIO_ID,
"no_solar": NO_SOLAR,
"dry_run": DRY_RUN,
}
),
}
for i, pid in enumerate(batch)
]
sqs.send_message_batch(QueueUrl=SQS_URL, Entries=entries)
sent += len(batch)
print(f" sent {sent}/{len(ids)}", end="\r")
for postcode, ids in to_process:
sqs.send_message(
QueueUrl=sqs_url,
MessageBody=json.dumps(
{
"property_ids": ids,
"portfolio_id": PORTFOLIO_ID,
"scenario_id": SCENARIO_ID,
"no_solar": NO_SOLAR,
"dry_run": DRY_RUN,
}
),
)
logger.info(f" sent {postcode}: {ids}")
print(f"\ndone — {sent} messages enqueued")
logger.info(f"\ndone — {len(to_process)} messages enqueued")
main()

View file

@ -2,6 +2,7 @@ import os
from collections.abc import Generator
from contextlib import contextmanager
from sqlalchemy.pool import NullPool
from sqlmodel import Session
from infrastructure.postgres.config import PostgresConfig
@ -17,8 +18,11 @@ def default_orchestrator() -> Generator[TaskOrchestrator, None, None]:
Connection params come from os.environ via PostgresConfig.from_env. Each
handler invocation gets its own session, cleaned up on context exit.
NullPool is intentional: a new engine is created on every invocation, so
pooling would accumulate idle connections across warm Lambda containers.
"""
engine = make_engine(PostgresConfig.from_env(dict(os.environ)))
engine = make_engine(PostgresConfig.from_env(dict(os.environ)), poolclass=NullPool)
with Session(engine) as session:
yield TaskOrchestrator(
task_repo=TaskPostgresRepository(session=session),