hyx
hyx copied to clipboard
Introduce circuit breaker manager
As brought up in #85, I would be great to have a single component that manages seamlessly all breakers in the service.
The manager could be defined somewhere as a singleton and passed around the places (via dependency injection, for example):
from typing import Any
import httpx
class CurrencyRate:
...
class CurrencyRates:
def __init__(self, breaker_manager: CircuitBreakerManager) -> None:
self._breaker_manager = breaker_manager
async def get_rates(self) -> list[CurrencyRate]:
with self.breaker_manager("currency.rates", exceptions=(httpx.NetworkError,)): # manager creates/retrieves and activates breaker
async with httpx.AsyncClient() as client:
response = await client.get("https://api.github.com/events")
return self._map_rates(response.json())
def _map_rates(self, data: list[dict[str, Any]]) -> list[CurrencyRate]:
...