backoff
backoff copied to clipboard
Support for async iterators
I needed to restart an async iterator if it hadn't returned any items yet. The algorithm I needed was:
- If an exception occurred before any item was returned, restart
- If an exception occurred after the first item was returned, propagate the exception no matter the retry parameters
The logic here is basically once the async iterator has returned an item there is no way to "get it back" - so the exception should follow the same logic.
I used backoff in my project already, so I thought I'd just re-use the core async code you have already written to implement this. I came up with the code below. As you can tell, it is about 99% copied from this library. The only important difference comes in the retry_exception_itr function, in the retry function, in the while loop (so about 20 lines down from the top).
Which leads me to: is this a feature you'd be interested in having in this library? If so, does the below approach make sense?
def retry_exception_itr(target, wait_gen, exception,
max_tries, max_time, jitter, giveup,
on_success, on_backoff, on_giveup,
wait_gen_kwargs):
on_success = _ensure_coroutines(on_success)
on_backoff = _ensure_coroutines(on_backoff)
on_giveup = _ensure_coroutines(on_giveup)
giveup = _ensure_coroutine(giveup)
# Easy to implement, please report if you need this.
assert not asyncio.iscoroutinefunction(max_tries)
assert not asyncio.iscoroutinefunction(jitter)
@functools.wraps(target)
async def retry(*args, **kwargs):
# change names because python 2.x doesn't have nonlocal
max_tries_ = _maybe_call(max_tries)
max_time_ = _maybe_call(max_time)
tries = 0
start = datetime.datetime.now()
wait = _init_wait_gen(wait_gen, wait_gen_kwargs)
while True:
tries += 1
elapsed = timedelta.total_seconds(datetime.datetime.now() - start)
details = (target, args, kwargs, tries, elapsed)
got_one_item = False
try:
async for item in target(*args, **kwargs):
got_one_item = True
yield item
except exception as e:
# If we've already fed a result out of this method,
# we can't pull it back. So don't try to pull back/retry
# the exception either.
if got_one_item:
raise
giveup_result = await giveup(e)
max_tries_exceeded = (tries == max_tries_)
max_time_exceeded = (max_time_ is not None
and elapsed >= max_time_)
if giveup_result or max_tries_exceeded or max_time_exceeded:
await _call_handlers(on_giveup, *details)
raise
try:
seconds = _next_wait(wait, jitter, elapsed, max_time_)
except StopIteration:
await _call_handlers(on_giveup, *details)
raise e
await _call_handlers(on_backoff, *details, wait=seconds)
# Note: there is no convenient way to pass explicit event
# loop to decorator, so here we assume that either default
# thread event loop is set and correct (it mostly is
# by default), or Python >= 3.5.3 or Python >= 3.6 is used
# where loop.get_event_loop() in coroutine guaranteed to
# return correct value.
# See for details:
# <https://groups.google.com/forum/#!topic/python-tulip/yF9C-rFpiKk>
# <https://bugs.python.org/issue28613>
await asyncio.sleep(seconds)
else:
await _call_handlers(on_success, *details)
return
return retry
def on_exception_itr(wait_gen,
exception,
max_tries=None,
max_time=None,
jitter=full_jitter,
giveup=lambda e: False,
on_success=None,
on_backoff=None,
on_giveup=None,
logger='backoff',
backoff_log_level=logging.INFO,
giveup_log_level=logging.ERROR,
**wait_gen_kwargs):
def decorate(target):
# change names because python 2.x doesn't have nonlocal
logger_ = _prepare_logger(logger)
on_success_ = _config_handlers(on_success)
on_backoff_ = _config_handlers(
on_backoff, _log_backoff, logger_, backoff_log_level
)
on_giveup_ = _config_handlers(
on_giveup, _log_giveup, logger_, giveup_log_level
)
retry = None
if sys.version_info[:2] >= (3, 5): # pragma: python=3.5
import asyncio
if asyncio.iscoroutinefunction(target):
import backoff._async
retry = backoff._async.retry_exception
if retry is None:
retry = _sync.retry_exception
return retry_exception_itr(target, wait_gen, exception,
max_tries, max_time, jitter, giveup,
on_success_, on_backoff_, on_giveup_,
wait_gen_kwargs)
# Return a function which decorates a target with a retry loop.
return decorate
As always, thanks for a really excellent library!
I think I'm having the same issue. I was trying to use async for ... with a function wrapped using backoff. The use case is handling websocket disconnects.
@awm33 - Would the algorithm above have handled it? The problem with re-starting something when the sequence is halfway done is... well... difficult.
@gordonwatts I used a wrapper within the wrapper. Here's a condensed version of what I was trying to do:
async def events():
url = getws_url()
req_kwargs = {}
add_auth_header(req_kwargs)
@backoff.on_exception(backoff.expo,
(aiohttp.ClientResponseError,
aiohttp.ClientConnectorError,
aiohttp.WebSocketError,
aiohttp.WSCloseCode,
DisconnectError))
async def connect_wrapper():
async with session.ws_connect(url, **req_kwargs) as ws:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
yield msg.json()
elif msg.type == aiohttp.WSMsgType.ERROR:
break
if not ws_disconnecting: ## tracks if the disconnect was intended
raise DisconnectError()
async for event in connect_wrapper(): # there is no "async yield for ..."
yield event
Then
async for event in self.events():
print(event)
Ok - so this looks like it is the same pattern I had - you need to catch and recover from errors that occurred in the ws_connect call. But once messages started returning from the session, then you didn't need to catch errors there.