Model/repositories/solar/solar_postgres_repository.py
Jun-te Kim de5e9a2362 perf(modelling_e2e): bulk reads and batch writes to cut RDS load
The handler fired ~2+2N read round-trips and N+N write transactions per
SQS batch, pinning RDS CPU under ~32 concurrent containers on pool_size=1.

Reads: merge the duplicate property query and add overrides_for_many /
SolarRepository.get_many so overrides, solar, and property rows each load
in one query (2+2N -> 3).

Writes: buffer each modelled property's persistence intent in memory
(_PropertyWrite) during the loop, then flush the whole batch in one
PostgresUnitOfWork with a single commit, and run the baseline orchestrator
once for all written ids (N+N -> 2 transactions). Per-property modelling
failures stay isolated in the loop; the batch write is all-or-nothing and
retried via SQS (saves are idempotent upserts).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-24 18:43:42 +00:00

52 lines
1.9 KiB
Python

from __future__ import annotations
from typing import Any, Optional
from sqlmodel import Session, col, select
from infrastructure.postgres.solar_table import SolarRow
from repositories.solar.solar_repository import SolarRepository
class SolarPostgresRepository(SolarRepository):
def __init__(self, session: Session) -> None:
self._session = session
def save(
self, uprn: int, *, longitude: float, latitude: float, insights: dict[str, Any]
) -> None:
existing = self._session.exec(
select(SolarRow).where(SolarRow.uprn == uprn)
).first()
if existing is None:
self._session.add(
SolarRow(
uprn=uprn,
longitude=longitude,
latitude=latitude,
google_api_response=insights,
)
)
else:
existing.longitude = longitude
existing.latitude = latitude
existing.google_api_response = insights
self._session.add(existing)
def get(self, uprn: int) -> Optional[dict[str, Any]]:
row = self._session.exec(
select(SolarRow).where(SolarRow.uprn == uprn)
).first()
return row.google_api_response if row is not None else None
def get_many(self, uprns: list[int]) -> dict[int, Optional[dict[str, Any]]]:
"""Stored insights for many UPRNs in one query — the batch form of
``get``. The returned dict has an entry for every requested UPRN; a UPRN
with no stored row maps to None (exactly as ``get`` returns)."""
rows = self._session.exec(
select(SolarRow).where(col(SolarRow.uprn).in_(uprns))
).all()
stored: dict[int, Optional[dict[str, Any]]] = {
row.uprn: row.google_api_response for row in rows
}
return {uprn: stored.get(uprn) for uprn in uprns}