Model/backend/epc_client/_retry.py
Jun-te Kim c347865b9e retry
2026-05-13 09:34:51 +00:00

28 lines
823 B
Python

import time
from typing import Callable, TypeVar
from backend.epc_client.exceptions import EpcRateLimitError
T = TypeVar("T")
def call_with_retry(
fn: Callable[[], T],
max_retries: int = 5,
backoff_base: float = 1.0,
backoff_multiplier: float = 2.0,
max_backoff: float = 60.0,
) -> T:
last_exc: EpcRateLimitError | None = None
for attempt in range(max_retries + 1):
try:
return fn()
except EpcRateLimitError as exc:
last_exc = exc
if attempt < max_retries:
if exc.retry_after is not None:
delay = exc.retry_after
else:
delay = backoff_base * (backoff_multiplier ** attempt)
time.sleep(min(delay, max_backoff))
raise last_exc # type: ignore[misc]