mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-30 13:10:47 +00:00
The handler fired ~2+2N read round-trips and N+N write transactions per SQS batch, pinning RDS CPU under ~32 concurrent containers on pool_size=1. Reads: merge the duplicate property query and add overrides_for_many / SolarRepository.get_many so overrides, solar, and property rows each load in one query (2+2N -> 3). Writes: buffer each modelled property's persistence intent in memory (_PropertyWrite) during the loop, then flush the whole batch in one PostgresUnitOfWork with a single commit, and run the baseline orchestrator once for all written ids (N+N -> 2 transactions). Per-property modelling failures stay isolated in the loop; the batch write is all-or-nothing and retried via SQS (saves are idempotent upserts). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
52 lines
1.9 KiB
Python
52 lines
1.9 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any, Optional
|
|
|
|
from sqlmodel import Session, col, select
|
|
|
|
from infrastructure.postgres.solar_table import SolarRow
|
|
from repositories.solar.solar_repository import SolarRepository
|
|
|
|
|
|
class SolarPostgresRepository(SolarRepository):
|
|
def __init__(self, session: Session) -> None:
|
|
self._session = session
|
|
|
|
def save(
|
|
self, uprn: int, *, longitude: float, latitude: float, insights: dict[str, Any]
|
|
) -> None:
|
|
existing = self._session.exec(
|
|
select(SolarRow).where(SolarRow.uprn == uprn)
|
|
).first()
|
|
if existing is None:
|
|
self._session.add(
|
|
SolarRow(
|
|
uprn=uprn,
|
|
longitude=longitude,
|
|
latitude=latitude,
|
|
google_api_response=insights,
|
|
)
|
|
)
|
|
else:
|
|
existing.longitude = longitude
|
|
existing.latitude = latitude
|
|
existing.google_api_response = insights
|
|
self._session.add(existing)
|
|
|
|
def get(self, uprn: int) -> Optional[dict[str, Any]]:
|
|
row = self._session.exec(
|
|
select(SolarRow).where(SolarRow.uprn == uprn)
|
|
).first()
|
|
return row.google_api_response if row is not None else None
|
|
|
|
def get_many(self, uprns: list[int]) -> dict[int, Optional[dict[str, Any]]]:
|
|
"""Stored insights for many UPRNs in one query — the batch form of
|
|
``get``. The returned dict has an entry for every requested UPRN; a UPRN
|
|
with no stored row maps to None (exactly as ``get`` returns)."""
|
|
rows = self._session.exec(
|
|
select(SolarRow).where(col(SolarRow.uprn).in_(uprns))
|
|
).all()
|
|
stored: dict[int, Optional[dict[str, Any]]] = {
|
|
row.uprn: row.google_api_response for row in rows
|
|
}
|
|
return {uprn: stored.get(uprn) for uprn in uprns}
|