backoff icon indicating copy to clipboard operation
backoff copied to clipboard

Support for async iterators

Open gordonwatts opened this issue 3 years ago • 4 comments

I needed to restart an async iterator if it hadn't returned any items yet. The algorithm I needed was:

  • If an exception occurred before any item was returned, restart
  • If an exception occurred after the first item was returned, propagate the exception no matter the retry parameters

The logic here is basically once the async iterator has returned an item there is no way to "get it back" - so the exception should follow the same logic.

I used backoff in my project already, so I thought I'd just re-use the core async code you have already written to implement this. I came up with the code below. As you can tell, it is about 99% copied from this library. The only important difference comes in the retry_exception_itr function, in the retry function, in the while loop (so about 20 lines down from the top).

Which leads me to: is this a feature you'd be interested in having in this library? If so, does the below approach make sense?

def retry_exception_itr(target, wait_gen, exception,
                        max_tries, max_time, jitter, giveup,
                        on_success, on_backoff, on_giveup,
                        wait_gen_kwargs):
    on_success = _ensure_coroutines(on_success)
    on_backoff = _ensure_coroutines(on_backoff)
    on_giveup = _ensure_coroutines(on_giveup)
    giveup = _ensure_coroutine(giveup)

    # Easy to implement, please report if you need this.
    assert not asyncio.iscoroutinefunction(max_tries)
    assert not asyncio.iscoroutinefunction(jitter)

    @functools.wraps(target)
    async def retry(*args, **kwargs):
        # change names because python 2.x doesn't have nonlocal
        max_tries_ = _maybe_call(max_tries)
        max_time_ = _maybe_call(max_time)

        tries = 0
        start = datetime.datetime.now()
        wait = _init_wait_gen(wait_gen, wait_gen_kwargs)
        while True:
            tries += 1
            elapsed = timedelta.total_seconds(datetime.datetime.now() - start)
            details = (target, args, kwargs, tries, elapsed)

            got_one_item = False
            try:
                async for item in target(*args, **kwargs):
                    got_one_item = True
                    yield item
            except exception as e:
                # If we've already fed a result out of this method,
                # we can't pull it back. So don't try to pull back/retry
                # the exception either.
                if got_one_item:
                    raise

                giveup_result = await giveup(e)
                max_tries_exceeded = (tries == max_tries_)
                max_time_exceeded = (max_time_ is not None
                                     and elapsed >= max_time_)

                if giveup_result or max_tries_exceeded or max_time_exceeded:
                    await _call_handlers(on_giveup, *details)
                    raise

                try:
                    seconds = _next_wait(wait, jitter, elapsed, max_time_)
                except StopIteration:
                    await _call_handlers(on_giveup, *details)
                    raise e

                await _call_handlers(on_backoff, *details, wait=seconds)

                # Note: there is no convenient way to pass explicit event
                # loop to decorator, so here we assume that either default
                # thread event loop is set and correct (it mostly is
                # by default), or Python >= 3.5.3 or Python >= 3.6 is used
                # where loop.get_event_loop() in coroutine guaranteed to
                # return correct value.
                # See for details:
                #   <https://groups.google.com/forum/#!topic/python-tulip/yF9C-rFpiKk>
                #   <https://bugs.python.org/issue28613>
                await asyncio.sleep(seconds)
            else:
                await _call_handlers(on_success, *details)
                return
    return retry


def on_exception_itr(wait_gen,
                     exception,
                     max_tries=None,
                     max_time=None,
                     jitter=full_jitter,
                     giveup=lambda e: False,
                     on_success=None,
                     on_backoff=None,
                     on_giveup=None,
                     logger='backoff',
                     backoff_log_level=logging.INFO,
                     giveup_log_level=logging.ERROR,
                     **wait_gen_kwargs):
    def decorate(target):
        # change names because python 2.x doesn't have nonlocal
        logger_ = _prepare_logger(logger)

        on_success_ = _config_handlers(on_success)
        on_backoff_ = _config_handlers(
            on_backoff, _log_backoff, logger_, backoff_log_level
        )
        on_giveup_ = _config_handlers(
            on_giveup, _log_giveup, logger_, giveup_log_level
        )

        retry = None
        if sys.version_info[:2] >= (3, 5):   # pragma: python=3.5
            import asyncio

            if asyncio.iscoroutinefunction(target):
                import backoff._async
                retry = backoff._async.retry_exception

        if retry is None:
            retry = _sync.retry_exception

        return retry_exception_itr(target, wait_gen, exception,
                                   max_tries, max_time, jitter, giveup,
                                   on_success_, on_backoff_, on_giveup_,
                                   wait_gen_kwargs)

    # Return a function which decorates a target with a retry loop.
    return decorate

As always, thanks for a really excellent library!

gordonwatts avatar Jul 29 '21 07:07 gordonwatts

I think I'm having the same issue. I was trying to use async for ... with a function wrapped using backoff. The use case is handling websocket disconnects.

awm33 avatar Aug 03 '21 02:08 awm33

@awm33 - Would the algorithm above have handled it? The problem with re-starting something when the sequence is halfway done is... well... difficult.

gordonwatts avatar Aug 03 '21 09:08 gordonwatts

@gordonwatts I used a wrapper within the wrapper. Here's a condensed version of what I was trying to do:

async def events():
    url = getws_url()
    req_kwargs = {}
    add_auth_header(req_kwargs)
    
    @backoff.on_exception(backoff.expo,
                              (aiohttp.ClientResponseError,
                               aiohttp.ClientConnectorError,
                               aiohttp.WebSocketError,
                               aiohttp.WSCloseCode,
                               DisconnectError))
     async def connect_wrapper():
         async with session.ws_connect(url, **req_kwargs) as ws:
             async for msg in ws:
                    if msg.type == aiohttp.WSMsgType.TEXT:
                        yield msg.json()
                    elif msg.type == aiohttp.WSMsgType.ERROR:
                        break

              if not ws_disconnecting: ## tracks if the disconnect was intended
                    raise DisconnectError()

     async for event in connect_wrapper(): # there is no "async yield for ..."
         yield event

Then

async for event in self.events():
    print(event)

awm33 avatar Aug 03 '21 17:08 awm33

Ok - so this looks like it is the same pattern I had - you need to catch and recover from errors that occurred in the ws_connect call. But once messages started returning from the session, then you didn't need to catch errors there.

gordonwatts avatar Aug 04 '21 06:08 gordonwatts