diff --git a/applications/modelling_e2e/handler.py b/applications/modelling_e2e/handler.py index cf1d1564..5b3db070 100644 --- a/applications/modelling_e2e/handler.py +++ b/applications/modelling_e2e/handler.py @@ -284,6 +284,27 @@ def _spatial_for( return None +# The 32-wide fallback gap between this container's Solar calls: 0.8 (safety +# headroom) × 600 QPM ÷ 60 ÷ 32 containers ≈ one call every 4s. Used when the +# env var is unset so the Lambda self-protects even if terraform wiring is missed. +_DEFAULT_SOLAR_MIN_REQUEST_INTERVAL_SECONDS: float = 4.0 + + +def _solar_min_request_interval_seconds() -> float: + """Per-container minimum gap (seconds) between Google Solar API calls, read + from ``SOLAR_MIN_REQUEST_INTERVAL_SECONDS``. Terraform derives the value from + the queue's ``maximum_concurrency`` (0.8 × 10 QPS ÷ N) so the up-to-32-wide + fleet stays under the hard 600 QPM Solar ceiling. Falls back to the 32-wide + default when unset or unparseable.""" + raw = os.environ.get("SOLAR_MIN_REQUEST_INTERVAL_SECONDS") + if raw is None: + return _DEFAULT_SOLAR_MIN_REQUEST_INTERVAL_SECONDS + try: + return float(raw) + except ValueError: + return _DEFAULT_SOLAR_MIN_REQUEST_INTERVAL_SECONDS + + def _solar_insights_for( solar_client: GoogleSolarApiClient, spatial: Optional[SpatialReference] ) -> Optional[dict[str, Any]]: @@ -402,7 +423,10 @@ def handler(body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator, engine = _get_engine() epc_client = EpcClientService(os.environ["OPEN_EPC_API_TOKEN"]) geospatial = GeospatialS3Repository(_s3_parquet_reader()) - solar_client = GoogleSolarApiClient(os.environ["GOOGLE_SOLAR_API_KEY"]) + solar_client = GoogleSolarApiClient( + os.environ["GOOGLE_SOLAR_API_KEY"], + min_request_interval_seconds=_solar_min_request_interval_seconds(), + ) with engine.connect() as conn: property_rows = conn.execute( diff --git a/deployment/terraform/lambda/modelling_e2e/main.tf b/deployment/terraform/lambda/modelling_e2e/main.tf index 826e83aa..4be3bd2a 100644 --- a/deployment/terraform/lambda/modelling_e2e/main.tf +++ b/deployment/terraform/lambda/modelling_e2e/main.tf @@ -13,6 +13,17 @@ data "aws_secretsmanager_secret_version" "db_credentials" { locals { db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string) + + # Per-container minimum gap (seconds) between Google Solar API calls. The Solar + # API caps the whole GCP project at a hard 600 QPM shared across Building + # Insights + Data Layers. Each of the up-to-`maximum_concurrency` containers + # loops its ~50-property batch sequentially, so pacing each one to + # 0.8 (safety headroom) × 600 QPM ÷ 60 ÷ N keeps the fleet sum under the + # ceiling — eliminating the sustained 429 storm that per-process backoff alone + # can't ride out. Derived from `maximum_concurrency` so N has one source of + # truth. At N=32 → 4.0s. Consumed by the handler's throttle (see + # infrastructure/solar/google_solar_api_client.py). + solar_min_request_interval_seconds = var.maximum_concurrency / (0.8 * 600 / 60) } module "lambda" { @@ -42,6 +53,8 @@ module "lambda" { OPEN_EPC_API_TOKEN = var.open_epc_api_token GOOGLE_SOLAR_API_KEY = var.google_solar_api_key DATA_BUCKET = "retrofit-data-${var.stage}" + + SOLAR_MIN_REQUEST_INTERVAL_SECONDS = tostring(local.solar_min_request_interval_seconds) } } diff --git a/infrastructure/solar/google_solar_api_client.py b/infrastructure/solar/google_solar_api_client.py index 6dcc2e18..fd3c39cf 100644 --- a/infrastructure/solar/google_solar_api_client.py +++ b/infrastructure/solar/google_solar_api_client.py @@ -1,3 +1,4 @@ +import time from typing import Any, Literal, Optional import requests @@ -20,8 +21,41 @@ class GoogleSolarApiClient: MAX_BACKOFF_SECONDS: float = 60.0 ENTITY_NOT_FOUND_ERROR: str = "Requested entity was not found." - def __init__(self, api_key: str) -> None: + def __init__( + self, api_key: str, min_request_interval_seconds: float = 0.0 + ) -> None: + # Per-instance min-interval throttle. The Solar API caps the whole GCP + # project at a hard 600 QPM shared across Building Insights + Data Layers + # (https://developers.google.com/maps/documentation/solar/usage-and-billing). + # The modelling_e2e fleet runs up to `maximum_concurrency` (32) containers, + # each looping its ~50-property batch sequentially — so without pacing the + # fleet offers multiples of the quota and a steady fraction of calls 429 + # for a sustained window that #1327's jittered backoff can't ride out. + # Each container holding >= quota/N seconds between its OWN consecutive + # calls keeps the fleet sum under the ceiling. Default 0.0 = no throttle, + # so the interactive backend path and tests are unchanged; the scaled + # caller (the Lambda) opts in via env. self._api_key = api_key + self._min_request_interval_seconds = min_request_interval_seconds + self._last_request_monotonic: Optional[float] = None + + def _await_rate_limit(self) -> None: + """Block until at least ``min_request_interval_seconds`` has elapsed since + this instance's previous call, so the container never exceeds its share of + the fleet's 600 QPM Solar budget. A no-op when the interval is 0 (the + default / interactive path). The call's own latency counts toward the gap + (we sleep only the *remainder*), so we don't double-pay the request time.""" + if self._min_request_interval_seconds <= 0: + return + now = time.monotonic() + if self._last_request_monotonic is not None: + wait = self._min_request_interval_seconds - ( + now - self._last_request_monotonic + ) + if wait > 0: + time.sleep(wait) + now = time.monotonic() + self._last_request_monotonic = now @staticmethod def _parse_retry_after(response: requests.Response) -> Optional[float]: @@ -39,6 +73,7 @@ class GoogleSolarApiClient: latitude: float, required_quality: Literal["HIGH", "MEDIUM", "LOW"] = "MEDIUM", ) -> dict[str, Any]: + self._await_rate_limit() insights_url = f"{self.base_url}/buildingInsights:findClosest" params: dict[str, str] = { "location.latitude": f"{latitude:.5f}", diff --git a/tests/applications/modelling_e2e/test_handler.py b/tests/applications/modelling_e2e/test_handler.py index 860e5313..7f11c7d7 100644 --- a/tests/applications/modelling_e2e/test_handler.py +++ b/tests/applications/modelling_e2e/test_handler.py @@ -123,6 +123,43 @@ def _baseline_orchestrator() -> Iterator[MagicMock]: yield orchestrator +# --------------------------------------------------------------------------- +# Solar throttle: per-container call gap resolved from env (quota/N safety) +# --------------------------------------------------------------------------- + + +def test_solar_min_request_interval_falls_back_to_32_wide_default() -> None: + # Arrange — env var absent: the Lambda must still self-protect at the 32-wide + # gap so a missed terraform wiring can't reopen the 600 QPM over-quota storm. + from applications.modelling_e2e.handler import ( + _DEFAULT_SOLAR_MIN_REQUEST_INTERVAL_SECONDS, + _solar_min_request_interval_seconds, + ) + + with patch.dict("os.environ", {}, clear=True): + # Act + result = _solar_min_request_interval_seconds() + + # Assert + assert result == _DEFAULT_SOLAR_MIN_REQUEST_INTERVAL_SECONDS + + +def test_solar_min_request_interval_reads_env_when_set() -> None: + # Arrange — terraform injects the computed gap (e.g. a smaller N → smaller gap). + from applications.modelling_e2e.handler import ( + _solar_min_request_interval_seconds, + ) + + with patch.dict( + "os.environ", {"SOLAR_MIN_REQUEST_INTERVAL_SECONDS": "2.0"}, clear=True + ): + # Act + result = _solar_min_request_interval_seconds() + + # Assert + assert result == 2.0 + + # --------------------------------------------------------------------------- # Trigger body validation # --------------------------------------------------------------------------- diff --git a/tests/infrastructure/solar/test_google_solar_api_client.py b/tests/infrastructure/solar/test_google_solar_api_client.py index 9deebc15..ce55cbc8 100644 --- a/tests/infrastructure/solar/test_google_solar_api_client.py +++ b/tests/infrastructure/solar/test_google_solar_api_client.py @@ -95,6 +95,59 @@ def test_get_building_insights_does_not_retry_a_400() -> None: assert mock_get.call_count == 1 +# --------------------------------------------------------------------------- +# Throttle: per-instance min-interval keeps the fleet under the 600 QPM cap +# --------------------------------------------------------------------------- + + +def test_throttle_spaces_consecutive_calls_by_min_interval() -> None: + # Arrange — a client paced at a 4s minimum gap between its OWN Solar calls + # (quota/N at the 32-wide fleet: 0.8 * 600 QPM / 32 ≈ one call / 4s). The + # first call fires immediately; the second must wait out the remainder of the + # interval before hitting the API, so the fleet stays under the hard 600 QPM + # ceiling instead of sustaining the over-quota burst that 429'd portfolio 796. + client = GoogleSolarApiClient( + api_key="test-key", min_request_interval_seconds=4.0 + ) + ok = MagicMock() + ok.json.return_value = {"solarPotential": {}} + ok.raise_for_status.return_value = None + + with patch("requests.get", return_value=ok): + with patch( + "infrastructure.solar.google_solar_api_client.time.sleep" + ) as sleep: + # Act — two back-to-back calls with no real work between them. + client.get_building_insights(longitude=-0.1278, latitude=51.5074) + client.get_building_insights(longitude=-0.1278, latitude=51.5074) + + # Assert — first call doesn't wait; second waits ~the full interval (no time + # elapsed between them, so the remaining gap is essentially the whole 4s). + assert sleep.call_count == 1 + (waited,), _ = sleep.call_args + assert 3.5 <= waited <= 4.0 + + +def test_throttle_off_by_default_does_not_sleep() -> None: + # Arrange — no interval configured (the interactive backend path): the client + # must not pace, preserving today's behaviour. + client = GoogleSolarApiClient(api_key="test-key") + ok = MagicMock() + ok.json.return_value = {"solarPotential": {}} + ok.raise_for_status.return_value = None + + with patch("requests.get", return_value=ok): + with patch( + "infrastructure.solar.google_solar_api_client.time.sleep" + ) as sleep: + # Act + client.get_building_insights(longitude=-0.1278, latitude=51.5074) + client.get_building_insights(longitude=-0.1278, latitude=51.5074) + + # Assert + sleep.assert_not_called() + + # --------------------------------------------------------------------------- # Slice 1: Successful response returns parsed JSON # ---------------------------------------------------------------------------