mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-30 13:10:47 +00:00
Merge pull request #1327 from Hestia-Homes/fix/solar-429-backoff-jitter
fix(solar): ride out Google Solar 429s with jittered backoff
This commit is contained in:
commit
c1869eb5db
4 changed files with 195 additions and 22 deletions
|
|
@ -12,6 +12,7 @@ propagate; postcodes.io soft-fails to the seed postcode).
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import random
|
||||
import time
|
||||
from typing import Callable, Optional, TypeVar
|
||||
|
||||
|
|
@ -35,12 +36,20 @@ def call_with_retry(
|
|||
backoff_base: float = 1.0,
|
||||
backoff_multiplier: float = 2.0,
|
||||
max_backoff: float = 60.0,
|
||||
jitter: bool = False,
|
||||
) -> T:
|
||||
"""Retry ``fn`` on transient failures — ``TransientHttpError`` (e.g. a 429)
|
||||
and ``httpx.TransportError`` (read/connect timeouts, connection resets) —
|
||||
backing off exponentially, or by the error's ``retry_after`` when it carries
|
||||
one. Non-transient failures propagate immediately; the last transient error
|
||||
is re-raised once ``max_retries`` is exhausted."""
|
||||
is re-raised once ``max_retries`` is exhausted.
|
||||
|
||||
``jitter`` applies **full jitter** to the *computed* backoff (a server-advised
|
||||
``retry_after`` is honoured exactly): the delay becomes a uniform random draw
|
||||
in ``[0, computed_backoff]``. This de-synchronises retries across many
|
||||
concurrent callers so they don't re-collide in lockstep — the "synchronised
|
||||
requests look like a DDoS" failure mode Google's Solar API best-practices warn
|
||||
about, and the cause of the 429 storm under 32 concurrent containers."""
|
||||
last_exc: Optional[Exception] = None
|
||||
for attempt in range(max_retries + 1):
|
||||
try:
|
||||
|
|
@ -52,9 +61,11 @@ def call_with_retry(
|
|||
exc.retry_after if isinstance(exc, TransientHttpError) else None
|
||||
)
|
||||
if retry_after is not None:
|
||||
delay = retry_after
|
||||
delay = min(retry_after, max_backoff)
|
||||
else:
|
||||
delay = backoff_base * (backoff_multiplier**attempt)
|
||||
time.sleep(min(delay, max_backoff))
|
||||
delay = min(backoff_base * (backoff_multiplier**attempt), max_backoff)
|
||||
if jitter:
|
||||
delay = random.uniform(0.0, delay)
|
||||
time.sleep(delay)
|
||||
assert last_exc is not None
|
||||
raise last_exc
|
||||
|
|
|
|||
|
|
@ -1,8 +1,9 @@
|
|||
import time
|
||||
from typing import Any, Literal, Optional
|
||||
|
||||
import requests
|
||||
|
||||
from infrastructure.http_retry import TransientHttpError, call_with_retry
|
||||
|
||||
|
||||
class BuildingInsightsNotFoundError(Exception):
|
||||
pass
|
||||
|
|
@ -10,12 +11,28 @@ class BuildingInsightsNotFoundError(Exception):
|
|||
|
||||
class GoogleSolarApiClient:
|
||||
base_url: str = "https://solar.googleapis.com/v1"
|
||||
MAX_RETRIES: int = 5
|
||||
# Bounded retries (NOT Google's infinite-loop example). With the 60s max
|
||||
# backoff below, 6 retries span ~2 of the Solar API's per-minute (600 QPM)
|
||||
# windows — enough to ride out a transient 429 burst, then give up so a
|
||||
# stuck request can't blow the container timeout; the subtask fails and is
|
||||
# re-triggered.
|
||||
MAX_RETRIES: int = 6
|
||||
MAX_BACKOFF_SECONDS: float = 60.0
|
||||
ENTITY_NOT_FOUND_ERROR: str = "Requested entity was not found."
|
||||
|
||||
def __init__(self, api_key: str) -> None:
|
||||
self._api_key = api_key
|
||||
|
||||
@staticmethod
|
||||
def _parse_retry_after(response: requests.Response) -> Optional[float]:
|
||||
header = response.headers.get("Retry-After")
|
||||
if header is None:
|
||||
return None
|
||||
try:
|
||||
return float(header)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
def get_building_insights(
|
||||
self,
|
||||
longitude: float,
|
||||
|
|
@ -29,22 +46,40 @@ class GoogleSolarApiClient:
|
|||
"requiredQuality": required_quality,
|
||||
"key": self._api_key,
|
||||
}
|
||||
last_exc: Optional[Exception] = None
|
||||
for attempt in range(self.MAX_RETRIES):
|
||||
|
||||
def _fetch() -> dict[str, Any]:
|
||||
try:
|
||||
response = requests.get(insights_url, params=params)
|
||||
response.raise_for_status()
|
||||
result: dict[str, Any] = response.json()
|
||||
return result
|
||||
except requests.exceptions.RequestException as e:
|
||||
response = e.response
|
||||
if response is None:
|
||||
# No response = a transport-level failure (connection reset /
|
||||
# read or connect timeout) — transient, retry.
|
||||
raise TransientHttpError(f"Solar API transport error: {e}") from e
|
||||
status = response.status_code
|
||||
if (
|
||||
e.response is not None
|
||||
and e.response.status_code == 404
|
||||
and e.response.json()["error"]["message"] == self.ENTITY_NOT_FOUND_ERROR
|
||||
status == 404
|
||||
and response.json().get("error", {}).get("message")
|
||||
== self.ENTITY_NOT_FOUND_ERROR
|
||||
):
|
||||
raise BuildingInsightsNotFoundError() from e
|
||||
last_exc = e
|
||||
time.sleep(2 ** attempt)
|
||||
if status == 429 or status >= 500:
|
||||
# Rate-limit (429) and server errors (5xx) are transient.
|
||||
# Honour the server's Retry-After when present (jittered
|
||||
# exponential backoff otherwise — see `call_with_retry`).
|
||||
raise TransientHttpError(
|
||||
f"Solar API {status}",
|
||||
retry_after=self._parse_retry_after(response),
|
||||
) from e
|
||||
# Other 4xx are the caller's fault — non-transient, propagate.
|
||||
raise
|
||||
|
||||
assert last_exc is not None
|
||||
raise last_exc
|
||||
return call_with_retry(
|
||||
_fetch,
|
||||
max_retries=self.MAX_RETRIES,
|
||||
max_backoff=self.MAX_BACKOFF_SECONDS,
|
||||
jitter=True,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,15 +1,100 @@
|
|||
from typing import Any
|
||||
from unittest.mock import MagicMock, patch
|
||||
from typing import Any, Optional
|
||||
from unittest.mock import MagicMock, call, patch
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from infrastructure.http_retry import TransientHttpError
|
||||
from infrastructure.solar.google_solar_api_client import (
|
||||
BuildingInsightsNotFoundError,
|
||||
GoogleSolarApiClient,
|
||||
)
|
||||
|
||||
|
||||
def _http_error(
|
||||
status_code: int, *, headers: Optional[dict[str, str]] = None
|
||||
) -> requests.exceptions.HTTPError:
|
||||
resp = MagicMock()
|
||||
resp.status_code = status_code
|
||||
resp.headers = headers or {}
|
||||
err = requests.exceptions.HTTPError(str(status_code))
|
||||
err.response = resp
|
||||
return err
|
||||
|
||||
|
||||
def _raising_response(err: Exception) -> MagicMock:
|
||||
resp = MagicMock()
|
||||
resp.raise_for_status.side_effect = err
|
||||
return resp
|
||||
|
||||
|
||||
def test_get_building_insights_retries_429_then_succeeds() -> None:
|
||||
# Arrange — a rate-limit (429) then a success. The 429 is transient, so the
|
||||
# client backs off and retries rather than failing.
|
||||
client = GoogleSolarApiClient(api_key="test-key")
|
||||
payload: dict[str, Any] = {"solarPotential": {"maxArrayPanelsCount": 20}}
|
||||
ok = MagicMock()
|
||||
ok.json.return_value = payload
|
||||
ok.raise_for_status.return_value = None
|
||||
|
||||
with patch("requests.get", side_effect=[_raising_response(_http_error(429)), ok]) as mock_get:
|
||||
with patch("infrastructure.http_retry.time.sleep"):
|
||||
# Act
|
||||
result = client.get_building_insights(longitude=-0.1278, latitude=51.5074)
|
||||
|
||||
# Assert
|
||||
assert result == payload
|
||||
assert mock_get.call_count == 2
|
||||
|
||||
|
||||
def test_get_building_insights_honours_retry_after_on_429() -> None:
|
||||
# Arrange — a persistent 429 advertising Retry-After: 2s. The client sleeps
|
||||
# exactly that (not a jittered computed backoff) and gives up bounded.
|
||||
client = GoogleSolarApiClient(api_key="test-key")
|
||||
err = _http_error(429, headers={"Retry-After": "2"})
|
||||
|
||||
with patch("requests.get", return_value=_raising_response(err)) as mock_get:
|
||||
with patch("infrastructure.http_retry.time.sleep") as sleep:
|
||||
# Act / Assert — exhaustion raises a TransientHttpError, not the raw HTTPError.
|
||||
with pytest.raises(TransientHttpError):
|
||||
client.get_building_insights(longitude=-0.1278, latitude=51.5074)
|
||||
|
||||
# Bounded: MAX_RETRIES retries + the initial attempt; Retry-After honoured.
|
||||
assert mock_get.call_count == GoogleSolarApiClient.MAX_RETRIES + 1
|
||||
assert call(2.0) in sleep.call_args_list
|
||||
|
||||
|
||||
def test_get_building_insights_retries_on_500() -> None:
|
||||
# Arrange — a 5xx is transient too; retry then succeed.
|
||||
client = GoogleSolarApiClient(api_key="test-key")
|
||||
payload: dict[str, Any] = {"solarPotential": {}}
|
||||
ok = MagicMock()
|
||||
ok.json.return_value = payload
|
||||
ok.raise_for_status.return_value = None
|
||||
|
||||
with patch("requests.get", side_effect=[_raising_response(_http_error(503)), ok]) as mock_get:
|
||||
with patch("infrastructure.http_retry.time.sleep"):
|
||||
# Act
|
||||
result = client.get_building_insights(longitude=-0.1278, latitude=51.5074)
|
||||
|
||||
# Assert
|
||||
assert result == payload
|
||||
assert mock_get.call_count == 2
|
||||
|
||||
|
||||
def test_get_building_insights_does_not_retry_a_400() -> None:
|
||||
# Arrange — a non-transient client error (e.g. bad request) propagates at once.
|
||||
client = GoogleSolarApiClient(api_key="test-key")
|
||||
|
||||
with patch("requests.get", return_value=_raising_response(_http_error(400))) as mock_get:
|
||||
with patch("infrastructure.http_retry.time.sleep"):
|
||||
# Act / Assert
|
||||
with pytest.raises(requests.exceptions.HTTPError):
|
||||
client.get_building_insights(longitude=-0.1278, latitude=51.5074)
|
||||
|
||||
assert mock_get.call_count == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Slice 1: Successful response returns parsed JSON
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
@ -94,17 +179,19 @@ def test_get_building_insights_raises_on_entity_not_found() -> None:
|
|||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_get_building_insights_propagates_exception_after_retry_exhaustion() -> None:
|
||||
# Arrange
|
||||
def test_get_building_insights_fails_bounded_after_retry_exhaustion() -> None:
|
||||
# Arrange — a persistent transport failure (no response). It's transient, so
|
||||
# the client retries up to the bound, then surfaces a TransientHttpError for
|
||||
# the caller to fail the subtask on (re-triggerable) — NOT an infinite loop.
|
||||
client = GoogleSolarApiClient(api_key="test-key")
|
||||
|
||||
error = requests.exceptions.ConnectionError("persistent failure")
|
||||
error.response = None # type: ignore[attr-defined]
|
||||
|
||||
with patch("requests.get", side_effect=error) as mock_get:
|
||||
with patch("time.sleep"):
|
||||
with patch("infrastructure.http_retry.time.sleep"):
|
||||
# Act / Assert
|
||||
with pytest.raises(requests.exceptions.ConnectionError):
|
||||
with pytest.raises(TransientHttpError):
|
||||
client.get_building_insights(longitude=-0.1278, latitude=51.5074)
|
||||
|
||||
assert mock_get.call_count == GoogleSolarApiClient.MAX_RETRIES
|
||||
assert mock_get.call_count == GoogleSolarApiClient.MAX_RETRIES + 1
|
||||
|
|
|
|||
|
|
@ -109,6 +109,46 @@ def test_non_transient_error_propagates_without_retry(_no_sleep: MagicMock) -> N
|
|||
_no_sleep.assert_not_called()
|
||||
|
||||
|
||||
def test_jitter_full_randomizes_the_computed_backoff(_no_sleep: MagicMock) -> None:
|
||||
# Arrange — fail once transiently (no server Retry-After), then succeed.
|
||||
calls = {"n": 0}
|
||||
|
||||
def fn() -> str:
|
||||
calls["n"] += 1
|
||||
if calls["n"] < 2:
|
||||
raise TransientHttpError("429")
|
||||
return "ok"
|
||||
|
||||
# Act — full jitter draws the sleep uniformly from [0, computed_backoff].
|
||||
with patch("infrastructure.http_retry.random.uniform", return_value=0.4) as runi:
|
||||
result = call_with_retry(fn, jitter=True)
|
||||
|
||||
# Assert — the first computed backoff is 1.0; jitter draws over [0, 1.0]
|
||||
# and the drawn value is what we sleep for (de-syncs concurrent retries).
|
||||
assert result == "ok"
|
||||
runi.assert_called_once_with(0.0, 1.0)
|
||||
_no_sleep.assert_called_once_with(0.4)
|
||||
|
||||
|
||||
def test_jitter_does_not_apply_to_a_server_retry_after(_no_sleep: MagicMock) -> None:
|
||||
# Arrange — the server advised an exact Retry-After; jitter must not perturb it.
|
||||
raised = {"done": False}
|
||||
|
||||
def fn() -> str:
|
||||
if not raised["done"]:
|
||||
raised["done"] = True
|
||||
raise TransientHttpError("429", retry_after=7.0)
|
||||
return "ok"
|
||||
|
||||
# Act
|
||||
with patch("infrastructure.http_retry.random.uniform") as runi:
|
||||
call_with_retry(fn, jitter=True)
|
||||
|
||||
# Assert — Retry-After is honoured verbatim, no random draw.
|
||||
runi.assert_not_called()
|
||||
_no_sleep.assert_called_once_with(7.0)
|
||||
|
||||
|
||||
def test_retry_after_is_capped_by_max_backoff(_no_sleep: MagicMock) -> None:
|
||||
# Arrange
|
||||
raised = {"done": False}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue