mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-30 13:10:47 +00:00
Merge pull request #1332 from Hestia-Homes/fix/solar-429-concurrency
fix(solar): throttle per-container Solar calls to hold the fleet under 600 QPM
This commit is contained in:
commit
d77232c333
5 changed files with 164 additions and 2 deletions
|
|
@ -284,6 +284,27 @@ def _spatial_for(
|
|||
return None
|
||||
|
||||
|
||||
# The 32-wide fallback gap between this container's Solar calls: 0.8 (safety
|
||||
# headroom) × 600 QPM ÷ 60 ÷ 32 containers ≈ one call every 4s. Used when the
|
||||
# env var is unset so the Lambda self-protects even if terraform wiring is missed.
|
||||
_DEFAULT_SOLAR_MIN_REQUEST_INTERVAL_SECONDS: float = 4.0
|
||||
|
||||
|
||||
def _solar_min_request_interval_seconds() -> float:
|
||||
"""Per-container minimum gap (seconds) between Google Solar API calls, read
|
||||
from ``SOLAR_MIN_REQUEST_INTERVAL_SECONDS``. Terraform derives the value from
|
||||
the queue's ``maximum_concurrency`` (0.8 × 10 QPS ÷ N) so the up-to-32-wide
|
||||
fleet stays under the hard 600 QPM Solar ceiling. Falls back to the 32-wide
|
||||
default when unset or unparseable."""
|
||||
raw = os.environ.get("SOLAR_MIN_REQUEST_INTERVAL_SECONDS")
|
||||
if raw is None:
|
||||
return _DEFAULT_SOLAR_MIN_REQUEST_INTERVAL_SECONDS
|
||||
try:
|
||||
return float(raw)
|
||||
except ValueError:
|
||||
return _DEFAULT_SOLAR_MIN_REQUEST_INTERVAL_SECONDS
|
||||
|
||||
|
||||
def _solar_insights_for(
|
||||
solar_client: GoogleSolarApiClient, spatial: Optional[SpatialReference]
|
||||
) -> Optional[dict[str, Any]]:
|
||||
|
|
@ -402,7 +423,10 @@ def handler(body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator,
|
|||
engine = _get_engine()
|
||||
epc_client = EpcClientService(os.environ["OPEN_EPC_API_TOKEN"])
|
||||
geospatial = GeospatialS3Repository(_s3_parquet_reader())
|
||||
solar_client = GoogleSolarApiClient(os.environ["GOOGLE_SOLAR_API_KEY"])
|
||||
solar_client = GoogleSolarApiClient(
|
||||
os.environ["GOOGLE_SOLAR_API_KEY"],
|
||||
min_request_interval_seconds=_solar_min_request_interval_seconds(),
|
||||
)
|
||||
|
||||
with engine.connect() as conn:
|
||||
property_rows = conn.execute(
|
||||
|
|
|
|||
|
|
@ -13,6 +13,17 @@ data "aws_secretsmanager_secret_version" "db_credentials" {
|
|||
|
||||
locals {
|
||||
db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)
|
||||
|
||||
# Per-container minimum gap (seconds) between Google Solar API calls. The Solar
|
||||
# API caps the whole GCP project at a hard 600 QPM shared across Building
|
||||
# Insights + Data Layers. Each of the up-to-`maximum_concurrency` containers
|
||||
# loops its ~50-property batch sequentially, so pacing each one to
|
||||
# 0.8 (safety headroom) × 600 QPM ÷ 60 ÷ N keeps the fleet sum under the
|
||||
# ceiling — eliminating the sustained 429 storm that per-process backoff alone
|
||||
# can't ride out. Derived from `maximum_concurrency` so N has one source of
|
||||
# truth. At N=32 → 4.0s. Consumed by the handler's throttle (see
|
||||
# infrastructure/solar/google_solar_api_client.py).
|
||||
solar_min_request_interval_seconds = var.maximum_concurrency / (0.8 * 600 / 60)
|
||||
}
|
||||
|
||||
module "lambda" {
|
||||
|
|
@ -42,6 +53,8 @@ module "lambda" {
|
|||
OPEN_EPC_API_TOKEN = var.open_epc_api_token
|
||||
GOOGLE_SOLAR_API_KEY = var.google_solar_api_key
|
||||
DATA_BUCKET = "retrofit-data-${var.stage}"
|
||||
|
||||
SOLAR_MIN_REQUEST_INTERVAL_SECONDS = tostring(local.solar_min_request_interval_seconds)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import time
|
||||
from typing import Any, Literal, Optional
|
||||
|
||||
import requests
|
||||
|
|
@ -20,8 +21,41 @@ class GoogleSolarApiClient:
|
|||
MAX_BACKOFF_SECONDS: float = 60.0
|
||||
ENTITY_NOT_FOUND_ERROR: str = "Requested entity was not found."
|
||||
|
||||
def __init__(self, api_key: str) -> None:
|
||||
def __init__(
|
||||
self, api_key: str, min_request_interval_seconds: float = 0.0
|
||||
) -> None:
|
||||
# Per-instance min-interval throttle. The Solar API caps the whole GCP
|
||||
# project at a hard 600 QPM shared across Building Insights + Data Layers
|
||||
# (https://developers.google.com/maps/documentation/solar/usage-and-billing).
|
||||
# The modelling_e2e fleet runs up to `maximum_concurrency` (32) containers,
|
||||
# each looping its ~50-property batch sequentially — so without pacing the
|
||||
# fleet offers multiples of the quota and a steady fraction of calls 429
|
||||
# for a sustained window that #1327's jittered backoff can't ride out.
|
||||
# Each container holding >= quota/N seconds between its OWN consecutive
|
||||
# calls keeps the fleet sum under the ceiling. Default 0.0 = no throttle,
|
||||
# so the interactive backend path and tests are unchanged; the scaled
|
||||
# caller (the Lambda) opts in via env.
|
||||
self._api_key = api_key
|
||||
self._min_request_interval_seconds = min_request_interval_seconds
|
||||
self._last_request_monotonic: Optional[float] = None
|
||||
|
||||
def _await_rate_limit(self) -> None:
|
||||
"""Block until at least ``min_request_interval_seconds`` has elapsed since
|
||||
this instance's previous call, so the container never exceeds its share of
|
||||
the fleet's 600 QPM Solar budget. A no-op when the interval is 0 (the
|
||||
default / interactive path). The call's own latency counts toward the gap
|
||||
(we sleep only the *remainder*), so we don't double-pay the request time."""
|
||||
if self._min_request_interval_seconds <= 0:
|
||||
return
|
||||
now = time.monotonic()
|
||||
if self._last_request_monotonic is not None:
|
||||
wait = self._min_request_interval_seconds - (
|
||||
now - self._last_request_monotonic
|
||||
)
|
||||
if wait > 0:
|
||||
time.sleep(wait)
|
||||
now = time.monotonic()
|
||||
self._last_request_monotonic = now
|
||||
|
||||
@staticmethod
|
||||
def _parse_retry_after(response: requests.Response) -> Optional[float]:
|
||||
|
|
@ -39,6 +73,7 @@ class GoogleSolarApiClient:
|
|||
latitude: float,
|
||||
required_quality: Literal["HIGH", "MEDIUM", "LOW"] = "MEDIUM",
|
||||
) -> dict[str, Any]:
|
||||
self._await_rate_limit()
|
||||
insights_url = f"{self.base_url}/buildingInsights:findClosest"
|
||||
params: dict[str, str] = {
|
||||
"location.latitude": f"{latitude:.5f}",
|
||||
|
|
|
|||
|
|
@ -123,6 +123,43 @@ def _baseline_orchestrator() -> Iterator[MagicMock]:
|
|||
yield orchestrator
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Solar throttle: per-container call gap resolved from env (quota/N safety)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_solar_min_request_interval_falls_back_to_32_wide_default() -> None:
|
||||
# Arrange — env var absent: the Lambda must still self-protect at the 32-wide
|
||||
# gap so a missed terraform wiring can't reopen the 600 QPM over-quota storm.
|
||||
from applications.modelling_e2e.handler import (
|
||||
_DEFAULT_SOLAR_MIN_REQUEST_INTERVAL_SECONDS,
|
||||
_solar_min_request_interval_seconds,
|
||||
)
|
||||
|
||||
with patch.dict("os.environ", {}, clear=True):
|
||||
# Act
|
||||
result = _solar_min_request_interval_seconds()
|
||||
|
||||
# Assert
|
||||
assert result == _DEFAULT_SOLAR_MIN_REQUEST_INTERVAL_SECONDS
|
||||
|
||||
|
||||
def test_solar_min_request_interval_reads_env_when_set() -> None:
|
||||
# Arrange — terraform injects the computed gap (e.g. a smaller N → smaller gap).
|
||||
from applications.modelling_e2e.handler import (
|
||||
_solar_min_request_interval_seconds,
|
||||
)
|
||||
|
||||
with patch.dict(
|
||||
"os.environ", {"SOLAR_MIN_REQUEST_INTERVAL_SECONDS": "2.0"}, clear=True
|
||||
):
|
||||
# Act
|
||||
result = _solar_min_request_interval_seconds()
|
||||
|
||||
# Assert
|
||||
assert result == 2.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Trigger body validation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -95,6 +95,59 @@ def test_get_building_insights_does_not_retry_a_400() -> None:
|
|||
assert mock_get.call_count == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Throttle: per-instance min-interval keeps the fleet under the 600 QPM cap
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_throttle_spaces_consecutive_calls_by_min_interval() -> None:
|
||||
# Arrange — a client paced at a 4s minimum gap between its OWN Solar calls
|
||||
# (quota/N at the 32-wide fleet: 0.8 * 600 QPM / 32 ≈ one call / 4s). The
|
||||
# first call fires immediately; the second must wait out the remainder of the
|
||||
# interval before hitting the API, so the fleet stays under the hard 600 QPM
|
||||
# ceiling instead of sustaining the over-quota burst that 429'd portfolio 796.
|
||||
client = GoogleSolarApiClient(
|
||||
api_key="test-key", min_request_interval_seconds=4.0
|
||||
)
|
||||
ok = MagicMock()
|
||||
ok.json.return_value = {"solarPotential": {}}
|
||||
ok.raise_for_status.return_value = None
|
||||
|
||||
with patch("requests.get", return_value=ok):
|
||||
with patch(
|
||||
"infrastructure.solar.google_solar_api_client.time.sleep"
|
||||
) as sleep:
|
||||
# Act — two back-to-back calls with no real work between them.
|
||||
client.get_building_insights(longitude=-0.1278, latitude=51.5074)
|
||||
client.get_building_insights(longitude=-0.1278, latitude=51.5074)
|
||||
|
||||
# Assert — first call doesn't wait; second waits ~the full interval (no time
|
||||
# elapsed between them, so the remaining gap is essentially the whole 4s).
|
||||
assert sleep.call_count == 1
|
||||
(waited,), _ = sleep.call_args
|
||||
assert 3.5 <= waited <= 4.0
|
||||
|
||||
|
||||
def test_throttle_off_by_default_does_not_sleep() -> None:
|
||||
# Arrange — no interval configured (the interactive backend path): the client
|
||||
# must not pace, preserving today's behaviour.
|
||||
client = GoogleSolarApiClient(api_key="test-key")
|
||||
ok = MagicMock()
|
||||
ok.json.return_value = {"solarPotential": {}}
|
||||
ok.raise_for_status.return_value = None
|
||||
|
||||
with patch("requests.get", return_value=ok):
|
||||
with patch(
|
||||
"infrastructure.solar.google_solar_api_client.time.sleep"
|
||||
) as sleep:
|
||||
# Act
|
||||
client.get_building_insights(longitude=-0.1278, latitude=51.5074)
|
||||
client.get_building_insights(longitude=-0.1278, latitude=51.5074)
|
||||
|
||||
# Assert
|
||||
sleep.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Slice 1: Successful response returns parsed JSON
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue