Model/infrastructure/solar/google_solar_api_client.py

50 lines
1.6 KiB
Python

import time
from typing import Any, Literal, Optional
import requests
class BuildingInsightsNotFoundError(Exception):
pass
class GoogleSolarApiClient:
base_url: str = "https://solar.googleapis.com/v1"
MAX_RETRIES: int = 5
ENTITY_NOT_FOUND_ERROR: str = "Requested entity was not found."
def __init__(self, api_key: str) -> None:
self._api_key = api_key
def get_building_insights(
self,
longitude: float,
latitude: float,
required_quality: Literal["HIGH", "MEDIUM", "LOW"] = "MEDIUM",
) -> dict[str, Any]:
insights_url = f"{self.base_url}/buildingInsights:findClosest"
params: dict[str, str] = {
"location.latitude": f"{latitude:.5f}",
"location.longitude": f"{longitude:.5f}",
"requiredQuality": required_quality,
"key": self._api_key,
}
last_exc: Optional[Exception] = None
for attempt in range(self.MAX_RETRIES):
try:
response = requests.get(insights_url, params=params)
response.raise_for_status()
result: dict[str, Any] = response.json()
return result
except requests.exceptions.RequestException as e:
if (
e.response is not None
and e.response.status_code == 404
and e.response.json()["error"]["message"] == self.ENTITY_NOT_FOUND_ERROR
):
raise BuildingInsightsNotFoundError() from e
last_exc = e
time.sleep(2 ** attempt)
assert last_exc is not None
raise last_exc