asyncio-redis icon indicating copy to clipboard operation
asyncio-redis copied to clipboard

Pending task destroyed warning

Open JanBednarik opened this issue 10 years ago • 10 comments

When I run this script:

import asyncio
import asyncio_redis

loop = asyncio.get_event_loop()
conn = loop.run_until_complete(asyncio_redis.Connection.create('localhost', 6379))
conn.close()

I get warning:

Task was destroyed but it is pending!
task: <Task pending coro=<_reader_coroutine() running at /Users/honza/dev/envs/test/lib/python3.4/site-packages/asyncio_redis/protocol.py:919> wait_for=<Future pending cb=[Task._wakeup()]>>

Enviroment:

  • Python 3.4.2
  • asyncio-redis 0.13.4

Asyncio docs says it's probably a bug: https://docs.python.org/3/library/asyncio-dev.html#pending-task-destroyed

JanBednarik avatar Nov 20 '14 17:11 JanBednarik

Thanks for reporting! I have a look. I wonder whether RedisProtocol.connection_lost is called in this case.

jonathanslenders avatar Nov 21 '14 11:11 jonathanslenders

What does this line?

loop.run_until_complete(asyncio_redis.Connection.create('localhost', 6379))

First, it starts asyncio event loop and waits for result of Connection.create. Then asyncio loop is destroyed.

What does Connection.create?

  1. It opens socket, performs some handshake logic and starts reading bytes parsing redis responses and sending it to Futures waiting for results. Concept code is something like this:
@asyncio.coroutine
def _reader_coroutine():
  while True:
    bytes = yield from socket.read()
    response = parser.parse(bytes)
    if response is not None:
      self.futures[0].set_result(response)
      self.futures = self.futures[1:]
  1. It returns active redis connection that is already being parsing redis responses.

When you run conn.close(), it cancels reader_coroutine and closes socket.

When do you receive warning, before or after conn.close()? If before, it's normal. reader_coroutine is stopped but not finished (eventloop is not running). If after, it may be a bug, that conn.close doesn't cancel reader_coroutine.

What does next code do for you?


@asyncio.coroutine
def test():
    conn = yield from asyncio_redis.Connection.create('localhost', 6379)
    conn.close()

# I'm not sure if next line is completely correct
loop.run_until_complete(asyncio.Task(test()))

tumb1er avatar Nov 21 '14 12:11 tumb1er

The last code prints the same warning.

It happens in the end of the script when the loop is being closed and garbage collected. You can test/force GC like this:

import asyncio
import asyncio_redis
import gc

loop = asyncio.get_event_loop()
print("A")
conn = loop.run_until_complete(asyncio_redis.Connection.create('localhost', 6379))
conn.close()
del conn
gc.collect()
print("B")
loop.close()
del loop
gc.collect()
print("C")

Output:

A
B
Task was destroyed but it is pending!
task: <Task pending coro=<_reader_coroutine() running at /Users/honza/dev/envs/test/lib/python3.4/site-packages/asyncio_redis/protocol.py:919> wait_for=<Future pending cb=[Task._wakeup()]>>
C

I'm new to asyncio stuff so I'm not sure what's going on in there.

JanBednarik avatar Nov 21 '14 13:11 JanBednarik

I have same problem, do this:

@asyncio.coroutine
def callback(cors,envenloop):
    result = yield from asyncio.wait_for(cors,timeout=None,loop=envenloop)
    yield from asyncio.sleep(1)
@asyncio.coroutine
def test(even_loop=None):
    conn = yield from asyncio_redis.Connection.create('localhost', 6379,loop=even_loop)
    conn.close()

evenloop = asyncio.get_event_loop()
cors = test(eventloop)
eventloop.run_until_complete(callback(cors,eventloop))

AttentionZ avatar Apr 06 '15 06:04 AttentionZ

TLDR: close is working correctly. It just doesn't wait for all tasks to complete because it's not a coroutine.

As @jonathanslenders suggested, the task pending warning is because connection_lost is not called by the time the event loop stops. Here is a minimal example to show this behavior:

import asyncio
import asyncio_redis

@asyncio.coroutine
def test():
    conn = yield from asyncio_redis.Connection.create('localhost', 6379)
    conn.close()

    # uncomment to avoid error
    # yield

asyncio.get_event_loop().run_until_complete(test())

The reason for the error is because the _reader_coroutine is still running (waiting for connection_lost). The reader _reader_coroutine is scheduled in parallel to the main coroutine (test in my example) here. It is possible for the connection to wait for it to complete, but doing so is pretty hackish

# Connection.close
if self.protocol.transport:
    self.protocol.transport.close()
    yield from self.protocol._reader_f   # wait for the reader task

and doing so would make close a coroutine changing its interface. So, very yuck.

One possible solution that works if the transport is a selector transport on a Unix system is simply to and one more trip back to the scheduler in the main coroutine (see my first example) by adding a yield. This gives the callback on asyncio/selector_events.py:568, which is what triggers connection_lost to free the reader, time to run.

Ultimately, I don't think this bug can be fixed without tying a particular protocol to the Connection class. If we always know we have the RedisProtocol, then we can write a wait_close coroutine method for the Connection class and document that close only schedules closure but doesn't teardown resources/tasks, whereas a wait_close coroutine would give the caller the option to wait for the connection to be fully closed rather than just closing.

So, I think there is a design issue here. Personally, I would add a coroutine hook or two to the base protocol for redis so that protocol implementers would be required to implement these hooks to cancel any running tasks that they start up. Then the connection could simply choose to either schedule those coroutines with async (thereby giving us the current behavior of close) or yield from them inside of a wait_close.

I hope the links help people understand why this is happening. Ultimately, this is really not a bug in my opinion. It's just part of the system that close doesn't block until the connection is closed. It's really a "please close the connection eventually."

housleyjk avatar Apr 16 '15 22:04 housleyjk

I created a PR that allows you to optionally wait until the closing is complete: https://github.com/jonathanslenders/asyncio-redis/pull/68

mrdon avatar May 14 '15 17:05 mrdon

I've faced similar problems when I play with coroutines. As @housleyjk noted out, usually letting the main event loop to progress once more makes the things to clean up properly. This is particularly true when there are coroutines waiting for socket reads whereas they should have a time to check/consume the final result: an empty string (e.g., the standard library's socket) or exceptions representing connection closes (e.g., aiozmq's stream closed error). A quick fix is to insert loop.run_until_complete(asyncio.sleep(0)) between closing any socket connections (including asyncio_redis' Connection/Pool) and loop.close().

achimnol avatar Aug 02 '15 17:08 achimnol

Yes, there is no simple solution. A task can be cancelled, but that doesn't mean it's destroyed.

In [5]:   1 import asyncio
          2 @asyncio.coroutine
          3 def corot():
          4     pass
          5 f = asyncio.async(corot())

In [6]: f.cancelled()
Out[6]: False

In [7]: f.cancel()
Out[7]: True

In [8]: f.cancelled()
Out[8]: False

Cancelling _reader_f task in connection_lost doesn't guarantee anything yet. It only informs the schedular that this coroutine should be cancelled. It's still pending, until the next roundtrip to the schedular, where a CancelledError will be sent into it.

To make a correct blocking "close", we would probably have to create a Future which is only set after all of the spawned coroutines received/processed CancelledError, for instance through an add_done_callback. (_reader_f.add_done_callback.) (The patch of @mrdon is close, but missing this part.)

I'm not sure this is the design pattern that we want to use for asyncio software in general. (I could be wrong.)

However, the simple solution is indeed to call loop._run_once just once before closing the loop. By doing this we say indeed: "Give every cancelled coroutine one more chance to handle that CancelledError before really closing.

The patch for reduring the noice in the unit tests would be this:

diff --git a/tests.py b/tests.py
index 24afd98..e33e8b6 100755
--- a/tests.py
+++ b/tests.py
@@ -90,6 +90,7 @@ def redis_test(function):
                 transport.close()

         self.loop.run_until_complete(c())
+        self.loop._run_once()
     return wrapper

Jonathan

jonathanslenders avatar Aug 02 '15 20:08 jonathanslenders

I noticed that there is _closed flag in Connection class, and that flag is set to False during initialization and never updated after it. Is it correct to implement it like that?

...
class Connection:
    ...
    def create(cls, ...):
        ...
        connection._closing = False
        connection._closed = False
        ...
        def connection_lost():
            if connection._auto_reconnect and not connection._closing:
                asyncio.ensure_future(connection._reconnect(), loop=connection._loop)
            else:  # << here the new code
                connection._closed = True
    ...
    @asyncio.coroutine
    def close_wait(self):
        self.close()
        while not self._closed:
            yield from asyncio.sleep(0)

This works for me, but I am not sure whether it works because it is correct or just because of extra yield roundtrip.

MarSoft avatar Mar 12 '16 18:03 MarSoft

Maybe StackOverflow - Please explain “Task was destroyed but it is pending!” sheds some light (TL; DR, perhaps loop.stop()? I just came across this issue and found it still open. Note that in Python 3.7 it doesn't seem to happen).

Lonami avatar Dec 21 '18 09:12 Lonami