Call to `get_user` causes TimeoutError

Open sbdchd opened this issue 4 years ago • 19 comments


When calling get_user() inside a consumer, sometimes the tests fail with a TimeoutError. The problem goes away when I remove this call and use self.user = self.scope["user"] instead.

However in the intended use case the user session length is relatively short so get_user() would be helpful.


I made a fresh django project with channels installed available at:


poetry install

poetry run pytest mysite

The tests should fail although I have noticed it sometimes takes a second run of the tests for the TimeoutError to occur.

The error looks as follows:

poetry run pytest mysite
❯ poetry run pytest mysite                                                                       3.24s
========================================= test session starts =========================================
platform darwin -- Python 3.7.3, pytest-5.1.1, py-1.8.0, pluggy-0.12.0
Django settings: mysite.settings (from ini file)
rootdir: /Users/steve/Downloads/channels_tut, inifile: tox.ini
plugins: django-3.5.1, asyncio-0.10.0
collected 1 item                                                                                      

mysite/chat/ F                                                                 [100%]

============================================== FAILURES ===============================================
____________________________________________ test_consumer ____________________________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x110b15860>, timeout = 1

    async def receive_output(self, timeout=1):
        Receives a single message from the application, with optional timeout.
        # Make sure there's not an exception to raise from the task
        if self.future.done():
        # Wait and receive the message
            async with async_timeout(timeout):
>               return await self.output_queue.get()

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Queue at 0x110d7c240 maxsize=0 tasks=1>

    async def get(self):
        """Remove and return an item from the queue.
        If queue is empty, wait until an item is available.
        while self.empty():
            getter = self._loop.create_future()
>               await getter
E               concurrent.futures._base.CancelledError

/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/ CancelledError

During handling of the above exception, another exception occurred:

    async def test_consumer():
        # 1. setup connections
        communicator = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="a"))
        connected, _ = await communicator.connect()
        assert connected
        comm_a = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="b"))
        connected, _ = await comm_a.connect()
        assert connected
        comm_b = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="c"))
        connected, _ = await comm_b.connect()
        assert connected
        # 3. subscribe to events
        subscribe = {"foo": "bar"}
        await communicator.send_json_to(subscribe)
        await comm_a.send_json_to(subscribe)
        await comm_b.send_json_to(subscribe)
        # 3. publish events
        channel_layer = get_channel_layer()
        await channel_layer.group_send(
                'type': 'broadcast_message',
        # 4. assert on received events
        expected_broadcast_msg = {"broadcast": "important update"}
        res = await communicator.receive_json_from()
        assert res == expected_broadcast_msg
>       res = await comm_a.receive_json_from()

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.venv/lib/python3.7/site-packages/channels/testing/ in receive_json_from
    payload = await self.receive_from(timeout)
.venv/lib/python3.7/site-packages/channels/testing/ in receive_from
    response = await self.receive_output(timeout)
.venv/lib/python3.7/site-packages/asgiref/ in receive_output
    raise e
.venv/lib/python3.7/site-packages/asgiref/ in receive_output
    return await self.output_queue.get()
.venv/lib/python3.7/site-packages/asgiref/ in __aexit__
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <asgiref.timeout.timeout object at 0x110de1cf8>
exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type: Type[BaseException]) -> None:
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

.venv/lib/python3.7/site-packages/asgiref/ TimeoutError
---------------------------------------- Captured stdout call -----------------------------------------
add me to the group plz a
add me to the group plz c
add me to the group plz b
========================================== 1 failed in 1.60s ==========================================
Task was destroyed but it is pending!
task: <Task pending coro=<double_to_single_callable.<locals>.new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object double_to_single_callable.<locals>.new_application at 0x110b137c8>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/", line 34, in new_application
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 183, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 41, in coroutine_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 59, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 57, in await_many_dispatch
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 688, in call_soon
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 480, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/]>
Task was destroyed but it is pending!
task: <Task pending coro=<double_to_single_callable.<locals>.new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object double_to_single_callable.<locals>.new_application at 0x110d62848>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/", line 34, in new_application
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 183, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 41, in coroutine_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 59, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 57, in await_many_dispatch
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 688, in call_soon
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 480, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/]>


OS: MacOS Python: Python 3.7.2


sbdchd avatar Aug 28 '19 02:08 sbdchd

Hi @sbdchd. Good report. Thank you.

Don't suppose you can experiment in the debugger can you? What happens if you increase timeout? Where inside get_user() are we loosing the time?

carltongibson avatar Aug 28 '19 07:08 carltongibson

I dug into this a bit more.

Increasing timeouts to exorbitant amounts e.g., 500 seconds didn't make a difference.

However, when replacing get_user() with a barebones version shown below, the issue still occurred, although it became more intermittent.

By adding the following diff and just running poetry run pytest mysite a couple times, the error should occur.

Seems like the TimeoutErrors are related to do @database_sync_to_async

EDIT: using @sync_to_async instead of @database_sync_to_async also causes the TimeoutError

diff --git a/mysite/chat/ b/mysite/chat/
index 42fd816..a24ba1f 100644
--- a/mysite/chat/
+++ b/mysite/chat/
@@ -1,8 +1,12 @@
 from channels.generic.websocket import AsyncJsonWebsocketConsumer
-from channels.auth import get_user
+from channels.db import database_sync_to_async
+def get_user(scope):
+    return None
 class ChatConsumer(AsyncJsonWebsocketConsumer):
     async def connect(self):
         await self.accept()
poetry run pytest mysite
========================================= test session starts =========================================
platform darwin -- Python 3.7.3, pytest-5.1.1, py-1.8.0, pluggy-0.12.0
Django settings: mysite.settings (from ini file)
rootdir: /Users/steve/Downloads/channels_tut, inifile: tox.ini
plugins: django-3.5.1, asyncio-0.10.0
collected 1 item                                                                                      

mysite/chat/ F                                                                 [100%]

============================================== FAILURES ===============================================
____________________________________________ test_consumer ____________________________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x106fa93c8>, timeout = 1

    async def receive_output(self, timeout=1):
        Receives a single message from the application, with optional timeout.
        # Make sure there's not an exception to raise from the task
        if self.future.done():
        # Wait and receive the message
            async with async_timeout(timeout):
>               return await self.output_queue.get()

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Queue at 0x106df9470 maxsize=0 tasks=1>

    async def get(self):
        """Remove and return an item from the queue.
        If queue is empty, wait until an item is available.
        while self.empty():
            getter = self._loop.create_future()
>               await getter
E               concurrent.futures._base.CancelledError

/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/ CancelledError

During handling of the above exception, another exception occurred:

    async def test_consumer():
        # 1. setup connections
        communicator = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="a"))
        connected, _ = await communicator.connect()
        assert connected
        comm_a = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="b"))
        connected, _ = await comm_a.connect()
        assert connected
        comm_b = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="c"))
        connected, _ = await comm_b.connect()
        assert connected
        # 3. subscribe to events
        subscribe = {"foo": "bar"}
        await communicator.send_json_to(subscribe)
        await comm_a.send_json_to(subscribe)
        await comm_b.send_json_to(subscribe)
        # 3. publish events
        channel_layer = get_channel_layer()
        await channel_layer.group_send(
                'type': 'broadcast_message',
        # 4. assert on received events
        expected_broadcast_msg = {"broadcast": "important update"}
>       res = await communicator.receive_json_from()

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.venv/lib/python3.7/site-packages/channels/testing/ in receive_json_from
    payload = await self.receive_from(timeout)
.venv/lib/python3.7/site-packages/channels/testing/ in receive_from
    response = await self.receive_output(timeout)
.venv/lib/python3.7/site-packages/asgiref/ in receive_output
    raise e
.venv/lib/python3.7/site-packages/asgiref/ in receive_output
    return await self.output_queue.get()
.venv/lib/python3.7/site-packages/asgiref/ in __aexit__
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <asgiref.timeout.timeout object at 0x1070c7cf8>
exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type: Type[BaseException]) -> None:
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

.venv/lib/python3.7/site-packages/asgiref/ TimeoutError
========================================== 1 failed in 1.63s ==========================================
Task was destroyed but it is pending!
task: <Task pending coro=<double_to_single_callable.<locals>.new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object double_to_single_callable.<locals>.new_application at 0x106e43848>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/", line 34, in new_application
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 183, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 41, in coroutine_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 59, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 57, in await_many_dispatch
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 688, in call_soon
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 480, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/]>
Task was destroyed but it is pending!
task: <Task pending coro=<double_to_single_callable.<locals>.new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object double_to_single_callable.<locals>.new_application at 0x106ea60c8>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/", line 34, in new_application
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 183, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 41, in coroutine_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 59, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 57, in await_many_dispatch
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 688, in call_soon
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 480, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/]>

sbdchd avatar Aug 29 '19 03:08 sbdchd

Increasing timeouts to exorbitant amounts e.g., 500 seconds didn't make a difference.

Grrr. Looks like a deadlock. We're being scheduled in a TreadPoolExecutor that is never getting to run. TBH not sure why that is.

(How's pytest.asyncio setting things up? — is my first thought.)

carltongibson avatar Aug 29 '19 08:08 carltongibson

I added timeouts of 10 seconds and ran the tests under cProfile and it seems the tests are stuck at select.kqueue

         1433004 function calls (1400889 primitive calls) in 11.716 seconds

   Ordered by: internal time
   List reduced from 8069 to 50 due to restriction <50>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      125   10.022    0.080   10.022    0.080 {method 'control' of 'select.kqueue' objects}
      903    0.123    0.000    0.123    0.000 {built-in method marshal.loads}
    12049    0.092    0.000    0.092    0.000 {built-in method posix.lstat}
     5989    0.076    0.000    0.076    0.000 {built-in method posix.stat}
3273/3166    0.058    0.000    0.197    0.000 {built-in method builtins.__build_class__}
      903    0.034    0.000    0.044    0.000 <frozen importlib._bootstrap_external>:914(get_data)
    39/38    0.025    0.001    0.027    0.001 {built-in method _imp.create_dynamic}
 1459/335    0.025    0.000    0.075    0.000
    12423    0.023    0.000    0.037    0.000
      303    0.023    0.000    0.023    0.000 {function SQLiteCursorWrapper.execute at 0x1107bc510}
62666/62620    0.022    0.000    0.023    0.000 {built-in method builtins.getattr}
       10    0.022    0.002    0.022    0.002 {built-in method fcntl.fcntl}
   110609    0.022    0.000    0.024    0.000 {built-in method builtins.isinstance}
64727/64706    0.021    0.000    0.022    0.000 {built-in method builtins.hasattr}
      285    0.017    0.000    0.017    0.000 {built-in method posix.listdir}
     1674    0.017    0.000    0.102    0.000 <frozen importlib._bootstrap_external>:1356(find_spec)
      925    0.017    0.000    0.020    0.000
     1056    0.016    0.000    0.160    0.000
      151    0.014    0.000    0.014    0.000 {built-in method builtins.compile}
     2336    0.013    0.000    0.021    0.000
     1639    0.013    0.000    0.013    0.000 {method 'search' of 're.Pattern' objects}
    25810    0.013    0.000    0.013    0.000
 2443/308    0.012    0.000    0.037    0.000
     2675    0.012    0.000    0.019    0.000
      917    0.011    0.000    0.011    0.000 {method 'read' of '_io.FileIO' objects}
    37081    0.010    0.000    0.010    0.000 {method 'startswith' of 'str' objects}
    85693    0.010    0.000    0.010    0.000 {method 'append' of 'list' objects}
     1038    0.010    0.000    0.202    0.000 <frozen importlib._bootstrap>:882(_find_spec)
     8401    0.010    0.000    0.011    0.000 {built-in method __new__ of type object at 0x10dace030}
      793    0.009    0.000    0.014    0.000
    12049    0.009    0.000    0.103    0.000
      903    0.009    0.000    0.207    0.000 <frozen importlib._bootstrap_external>:793(get_code)
      829    0.008    0.000    0.069    0.000
      518    0.008    0.000    0.014    0.000
 2854/766    0.008    0.000    0.011    0.000
     9557    0.008    0.000    0.021    0.000 <frozen importlib._bootstrap_external>:56(_path_join)
  5311/96    0.008    0.000    0.018    0.000
 1260/326    0.007    0.000    0.963    0.003 <frozen importlib._bootstrap>:978(_find_and_load)
    22199    0.007    0.000    0.018    0.000
    44480    0.007    0.000    0.008    0.000 {method 'get' of 'dict' objects}
      401    0.007    0.000    0.015    0.000
18325/18261    0.007    0.000    0.009    0.000 {method 'join' of 'str' objects}
    11516    0.007    0.000    0.007    0.000 {method 'match' of 're.Pattern' objects}
     9557    0.007    0.000    0.010    0.000 <frozen importlib._bootstrap_external>:58(<listcomp>)
    15760    0.007    0.000    0.010    0.000
       95    0.007    0.000    0.007    0.000 {built-in method}
     1816    0.006    0.000    0.015    0.000 <frozen importlib._bootstrap_external>:271(cache_from_source)
      332    0.006    0.000    0.006    0.000
     9544    0.006    0.000    0.013    0.000
    33897    0.006    0.000    0.006    0.000 {built-in method posix.fspath}

sbdchd avatar Aug 30 '19 00:08 sbdchd

Hey @andrewgodwin. Can I ask if you have any thoughts here? (Maybe you’ve seen this kind of thing before?)

I’m a bit like 😳 as to how to advise. Thanks.

carltongibson avatar Aug 30 '19 05:08 carltongibson

It's worth trying it on an older asgiref version - let's say 3.1.4 - and see if it persists there. I had someone say they might have a potential problem a bit like this with 3.2 but it was un-replicatable.

andrewgodwin avatar Aug 30 '19 05:08 andrewgodwin

Okay, I tested the following versions of asgiref and they exhibit the TimeoutError:

  • 3.2
  • 3.1.4
  • 3.0.0

sbdchd avatar Aug 30 '19 22:08 sbdchd

OK, then it's not asgiref, which is both good and bad.

Have you added logging to the stub get_user to see if it's getting called at all? Given you have three different consumers running at once, it's almost certainly a deadlock that's occurring, maybe due to the ThreadPoolExecutor being exhausted, but it should have a good 8-10 threads by default (and we have some anti-deadlock tests for that code already).

The best thing to do would be to log every step into and out of the get_user function, and the execution of the lines above and below the await, to work out exactly where it is getting stuck and what's run at that point. Tracebacks tend not to be very accurate when asyncio is involved.

andrewgodwin avatar Aug 31 '19 03:08 andrewgodwin

Thanks for the pointers.

I've added logging to the tests and the consumer but the logging seems to paint a similar picture as the traceback.

INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ about to subscribe
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ sub:comm1
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ sub:comm_a
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ sub:comm_b
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ pub
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ about to recv comm
❯ PYTHONASYNCIODEBUG=1 poetry run pytest -o log_cli=true mysite
============================================================================================= test session starts ==============================================================================================
platform darwin -- Python 3.7.3, pytest-5.1.1, py-1.8.0, pluggy-0.12.0
Django settings: mysite.settings (from ini file)
rootdir: /Users/steve/Downloads/channels_tut, inifile: tox.ini
plugins: django-3.5.1, asyncio-0.10.0
collected 1 item                                                                                                                                                                                               

------------------------------------------------------------------------------------------------ live log call -------------------------------------------------------------------------------------------------
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ about to subscribe
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ sub:comm1
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ sub:comm_a
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ sub:comm_b
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ pub
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ about to recv comm
FAILED                                                                                                                                                                                                   [100%]

=================================================================================================== FAILURES ===================================================================================================
________________________________________________________________________________________________ test_consumer _________________________________________________________________________________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x105600160>, timeout = 1

    async def receive_output(self, timeout=1):
        Receives a single message from the application, with optional timeout.
        # Make sure there's not an exception to raise from the task
        if self.future.done():
        # Wait and receive the message
            async with async_timeout(timeout):
>               return await self.output_queue.get()

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Queue at 0x105620358 maxsize=0 tasks=1>

    async def get(self):
        """Remove and return an item from the queue.
        If queue is empty, wait until an item is available.
        while self.empty():
            getter = self._loop.create_future()
>               await getter
E               concurrent.futures._base.CancelledError

/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/ CancelledError

During handling of the above exception, another exception occurred:

    async def test_consumer():
        # 1. setup connections
        communicator = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="a"))
        connected, _ = await communicator.connect()
        assert connected
        comm_a = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="b"))
        connected, _ = await comm_a.connect()
        assert connected
        comm_b = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="c"))
        connected, _ = await comm_b.connect()
        assert connected
   "about to subscribe")
        # 3. subscribe to events
        subscribe = {"foo": "bar"}"sub:comm1")
        await communicator.send_json_to(subscribe)"sub:comm_a")
        await comm_a.send_json_to(subscribe)"sub:comm_b")
        await comm_b.send_json_to(subscribe)
        # 3. publish events
        channel_layer = get_channel_layer()"pub")
        await channel_layer.group_send(
                'type': 'broadcast_message',
        # 4. assert on received events
        expected_broadcast_msg = {"broadcast": "important update"}
   "about to recv comm")
>       res = await communicator.receive_json_from()

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.7/site-packages/channels/testing/ in receive_json_from
    payload = await self.receive_from(timeout)
.venv/lib/python3.7/site-packages/channels/testing/ in receive_from
    response = await self.receive_output(timeout)
.venv/lib/python3.7/site-packages/asgiref/ in receive_output
    raise e
.venv/lib/python3.7/site-packages/asgiref/ in receive_output
    return await self.output_queue.get()
.venv/lib/python3.7/site-packages/asgiref/ in __aexit__
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asgiref.timeout.timeout object at 0x105a38748>, exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type: Type[BaseException]) -> None:
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

.venv/lib/python3.7/site-packages/asgiref/ TimeoutError
-------------------------------------------------------------------------------------------- Captured stderr setup ---------------------------------------------------------------------------------------------
Creating test database for alias 'default'...
---------------------------------------------------------------------------------------------- Captured log call -----------------------------------------------------------------------------------------------
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ about to subscribe
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ sub:comm1
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ sub:comm_a
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ sub:comm_b
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ pub
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/ about to recv comm
------------------------------------------------------------------------------------------- Captured stderr teardown -------------------------------------------------------------------------------------------
Destroying test database for alias 'default'...
=============================================================================================== warnings summary ===============================================================================================
  /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/generic/ RuntimeWarning: coroutine 'get_user' was never awaited
    await self.receive_json(await self.decode_json(text_data), **kwargs)

-- Docs:
======================================================================================== 1 failed, 1 warnings in 1.69s =========================================================================================
Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/bin/pytest", line 10, in <module>
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/config/", line 77, in main
    return config.hook.pytest_cmdline_main(config=config)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 228, in pytest_cmdline_main
    return wrap_session(config, _main)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 191, in wrap_session
    session.exitstatus = doit(config, session) or 0
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 235, in _main
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 256, in pytest_runtestloop
    item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 72, in pytest_runtest_protocol
    runtestprotocol(item, nextitem=nextitem)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 87, in runtestprotocol
    reports.append(call_and_report(item, "call", log))
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 167, in call_and_report
    call = call_runtest_hook(item, when, **kwds)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 192, in call_runtest_hook
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 220, in from_call
    result = func()
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 192, in <lambda>
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 117, in pytest_runtest_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 1423, in runtest
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pytest_asyncio/", line 158, in pytest_pyfunc_call
    pyfuncitem.obj(**testargs), loop=event_loop))
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 571, in run_until_complete
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 539, in run_forever
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 1767, in _run_once
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 88, in _run, *self._args)
  File "/Users/steve/Downloads/channels_tut/mysite/chat/", line 35, in test_consumer
    comm_a = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="b"))
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/testing/", line 26, in __init__
    super().__init__(application, self.scope)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/", line 21, in __init__
    self.application(scope, self.input_queue.get, self.output_queue.put)
task: <Task pending coro=<double_to_single_callable.<locals>.new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/> wait_for=<Future cancelled created at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/> created at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/>
Exception ignored in: <coroutine object double_to_single_callable.<locals>.new_application at 0x1058704c8>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/", line 34, in new_application
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 183, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 41, in coroutine_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 59, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 57, in await_many_dispatch
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 688, in call_soon
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 480, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/bin/pytest", line 10, in <module>
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/config/", line 77, in main
    return config.hook.pytest_cmdline_main(config=config)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 228, in pytest_cmdline_main
    return wrap_session(config, _main)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 191, in wrap_session
    session.exitstatus = doit(config, session) or 0
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 235, in _main
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 256, in pytest_runtestloop
    item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 72, in pytest_runtest_protocol
    runtestprotocol(item, nextitem=nextitem)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 87, in runtestprotocol
    reports.append(call_and_report(item, "call", log))
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 167, in call_and_report
    call = call_runtest_hook(item, when, **kwds)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 192, in call_runtest_hook
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 220, in from_call
    result = func()
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 192, in <lambda>
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 117, in pytest_runtest_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 1423, in runtest
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pytest_asyncio/", line 158, in pytest_pyfunc_call
    pyfuncitem.obj(**testargs), loop=event_loop))
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 571, in run_until_complete
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 539, in run_forever
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 1767, in _run_once
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 88, in _run, *self._args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/", line 34, in new_application
    return await instance(receive, send)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 183, in __call__
    return await self.inner(receive, self.send)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 41, in coroutine_call
    await inner_instance(receive, send)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 59, in __call__
    [receive, self.channel_receive], self.dispatch
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 53, in await_many_dispatch
    tasks[i] = asyncio.ensure_future(consumer_callables[i]())
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/> wait_for=<Future cancelled created at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/] created at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/>
Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/bin/pytest", line 10, in <module>
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/config/", line 77, in main
    return config.hook.pytest_cmdline_main(config=config)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 228, in pytest_cmdline_main
    return wrap_session(config, _main)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 191, in wrap_session
    session.exitstatus = doit(config, session) or 0
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 235, in _main
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 256, in pytest_runtestloop
    item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 72, in pytest_runtest_protocol
    runtestprotocol(item, nextitem=nextitem)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 87, in runtestprotocol
    reports.append(call_and_report(item, "call", log))
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 167, in call_and_report
    call = call_runtest_hook(item, when, **kwds)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 192, in call_runtest_hook
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 220, in from_call
    result = func()
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 192, in <lambda>
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 117, in pytest_runtest_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 1423, in runtest
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pytest_asyncio/", line 158, in pytest_pyfunc_call
    pyfuncitem.obj(**testargs), loop=event_loop))
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 571, in run_until_complete
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 539, in run_forever
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 1767, in _run_once
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 88, in _run, *self._args)
  File "/Users/steve/Downloads/channels_tut/mysite/chat/", line 39, in test_consumer
    comm_b = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="c"))
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/testing/", line 26, in __init__
    super().__init__(application, self.scope)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/", line 21, in __init__
    self.application(scope, self.input_queue.get, self.output_queue.put)
task: <Task pending coro=<double_to_single_callable.<locals>.new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/> wait_for=<Future cancelled created at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/> created at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/>
Exception ignored in: <coroutine object double_to_single_callable.<locals>.new_application at 0x1059f23c8>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/", line 34, in new_application
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 183, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 41, in coroutine_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 59, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 57, in await_many_dispatch
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 688, in call_soon
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 480, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/bin/pytest", line 10, in <module>
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/config/", line 77, in main
    return config.hook.pytest_cmdline_main(config=config)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 228, in pytest_cmdline_main
    return wrap_session(config, _main)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 191, in wrap_session
    session.exitstatus = doit(config, session) or 0
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 235, in _main
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 256, in pytest_runtestloop
    item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 72, in pytest_runtest_protocol
    runtestprotocol(item, nextitem=nextitem)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 87, in runtestprotocol
    reports.append(call_and_report(item, "call", log))
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 167, in call_and_report
    call = call_runtest_hook(item, when, **kwds)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 192, in call_runtest_hook
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 220, in from_call
    result = func()
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 192, in <lambda>
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 117, in pytest_runtest_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/", line 1423, in runtest
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pytest_asyncio/", line 158, in pytest_pyfunc_call
    pyfuncitem.obj(**testargs), loop=event_loop))
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 571, in run_until_complete
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 539, in run_forever
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 1767, in _run_once
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/", line 88, in _run, *self._args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/", line 34, in new_application
    return await instance(receive, send)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 183, in __call__
    return await self.inner(receive, self.send)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 41, in coroutine_call
    await inner_instance(receive, send)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 59, in __call__
    [receive, self.channel_receive], self.dispatch
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/", line 53, in await_many_dispatch
    tasks[i] = asyncio.ensure_future(consumer_callables[i]())
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/> wait_for=<Future cancelled created at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/] created at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/>

sbdchd avatar Aug 31 '19 04:08 sbdchd

So, I've replicated this enough to work out it's a side effect specific to channels_redis. If I replace the broadcast mechanism with something else, it functions fine.

andrewgodwin avatar Sep 04 '19 01:09 andrewgodwin

OK, so in my digging, this looks like it's related to the channels_redis mechanism which tries to share connections across coroutines in the same process. That code wasn't super amazing before, and something about what sync_to_async is doing is upsetting it and causing the entire tower of cards to fall over.

@sbdchd I know this isn't a very helpful suggestion, but if you can eliminate your use of channels_redis you will stop encountering this problem. I know that's probably not a realistic option, but if you were using it as a bridge until you invested in a real message bus, maybe now is the time.

I don't have the mental space to go digging into the channels_redis connection-sharing code right now, as I barely understood it as I wrote it, and it's honestly a lot of effort to solve the small problem of connection sharing and frankly the whole thing should probably be torn down for something simpler but slightly less efficient if nobody is able to debug issues like this!

andrewgodwin avatar Sep 04 '19 02:09 andrewgodwin

Hey @andrewgodwin. Thanks for the work. Do you have a reproduce set-up that demonstrates the issue? (Maybe you don't... 😬)

carltongibson avatar Sep 04 '19 06:09 carltongibson

carltongibson avatar Sep 04 '19 06:09 carltongibson

Yup, I cloned the demo repo above and set it up and it fails consistently.

andrewgodwin avatar Sep 04 '19 06:09 andrewgodwin

Well that’s easy... 😀

I kind of meant that highlights the exact pain point, but don’t worry. Thank you for looking into it.

carltongibson avatar Sep 04 '19 07:09 carltongibson

Okay, I kind of see how it might come up with the channel_redis connection pool. I'll have a play and see if I can pin it down.

carltongibson avatar Sep 04 '19 14:09 carltongibson

carltongibson avatar Sep 04 '19 14:09 carltongibson

I changed the config to use rabbitmq via however the tests still have issues with timeout errors.

It's possible that I don't have rabbitmq configured properly.

sbdchd avatar Sep 05 '19 23:09 sbdchd

That channel layer also uses a very complicated asyncio-based setup to share connections from a single process. Ideally asgiref would not disrupt whatever mechanism they're both using to work, but it needs to be pinned down to the exact failing call first before I can work out if I can make things compatible, unfortunately.

andrewgodwin avatar Sep 06 '19 01:09 andrewgodwin

andrewgodwin avatar Sep 06 '19 01:09 andrewgodwin

I can comfirm use InMemoryChannelLayer also cause the problem.

ghost avatar Aug 25 '20 16:08 ghost

I'm also facing this issue while testing, And I'm not using channels_redis in consumer.

I have written custom auth middleware and wrapped it up around URLRouter. My middleware is querying database to validate the token using @database_sync_to_async. Which is triggering TimeoutError


def get_user(token: bytes):
    #validate token and return user or return AnonymousUser

class WebSocketAuthMiddleware:
    def __init__(self, app):
        # Store the ASGI application we were passed = app

    def get_headers(self, scope) -> Dict[bytes, bytes]:
        return {h[0]: h[1] for h in scope.get("headers", ())}

    async def __call__(self, scope, receive, send):
        headers = self.get_headers(scope)
        scope["user"] = await get_user(headers.get(b"token", b""))


application = ProtocolTypeRouter(
        "http": get_asgi_application(),
        "websocket": WebSocketAuthMiddleware(URLRouter(websocket_urlpatterns)),

Let me know if you require a sample project to test this out. I'll upload it to Github and link it here.

libraries and versions: channels: 3.0.4 django: 4.0.1 pytest: 6.2.5 pytest-django: 4.5.2 pytest-asycnio: 0.17.2

Tested on: Mac OS Catalina 10.15.7 with python 3.8.6

kirankumbhar avatar Jan 31 '22 13:01 kirankumbhar