mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
34 lines
1.1 KiB
Python
34 lines
1.1 KiB
Python
import time
|
|
from typing import Callable, Optional, TypeVar
|
|
|
|
from infrastructure.epc.exceptions import EpcRateLimitError
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
def call_with_retry(
|
|
fn: Callable[[], T],
|
|
max_retries: int = 5,
|
|
backoff_base: float = 1.0,
|
|
backoff_multiplier: float = 2.0,
|
|
max_backoff: float = 60.0,
|
|
) -> T:
|
|
"""Call ``fn``, retrying on EpcRateLimitError with exponential backoff.
|
|
|
|
Honours the API's ``Retry-After`` header when present, otherwise backs off
|
|
``backoff_base * backoff_multiplier ** attempt`` (capped at ``max_backoff``).
|
|
"""
|
|
last_exc: Optional[EpcRateLimitError] = None
|
|
for attempt in range(max_retries + 1):
|
|
try:
|
|
return fn()
|
|
except EpcRateLimitError as exc:
|
|
last_exc = exc
|
|
if attempt < max_retries:
|
|
if exc.retry_after is not None:
|
|
delay = exc.retry_after
|
|
else:
|
|
delay = backoff_base * (backoff_multiplier**attempt)
|
|
time.sleep(min(delay, max_backoff))
|
|
assert last_exc is not None
|
|
raise last_exc
|