channels icon indicating copy to clipboard operation
channels copied to clipboard

Call to `get_user` causes TimeoutError

Open sbdchd opened this issue 4 years ago • 19 comments

Problem

When calling get_user() inside a consumer, sometimes the tests fail with a TimeoutError. The problem goes away when I remove this call and use self.user = self.scope["user"] instead.

However in the intended use case the user session length is relatively short so get_user() would be helpful.

Reproduction

I made a fresh django project with channels installed available at: https://github.com/sbdchd/django-channels-bug-repo

setup:

poetry install

poetry run pytest mysite

The tests should fail although I have noticed it sometimes takes a second run of the tests for the TimeoutError to occur.

The error looks as follows:

poetry run pytest mysite
❯ poetry run pytest mysite                                                                       3.24s
========================================= test session starts =========================================
platform darwin -- Python 3.7.3, pytest-5.1.1, py-1.8.0, pluggy-0.12.0
Django settings: mysite.settings (from ini file)
rootdir: /Users/steve/Downloads/channels_tut, inifile: tox.ini
plugins: django-3.5.1, asyncio-0.10.0
collected 1 item                                                                                      

mysite/chat/test_consumers.py F                                                                 [100%]

============================================== FAILURES ===============================================
____________________________________________ test_consumer ____________________________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x110b15860>, timeout = 1

    async def receive_output(self, timeout=1):
        """
        Receives a single message from the application, with optional timeout.
        """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout(timeout):
>               return await self.output_queue.get()

.venv/lib/python3.7/site-packages/asgiref/testing.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Queue at 0x110d7c240 maxsize=0 tasks=1>

    async def get(self):
        """Remove and return an item from the queue.
    
        If queue is empty, wait until an item is available.
        """
        while self.empty():
            getter = self._loop.create_future()
            self._getters.append(getter)
            try:
>               await getter
E               concurrent.futures._base.CancelledError

/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159: CancelledError

During handling of the above exception, another exception occurred:

    @pytest.mark.django_db(transaction=True)
    @pytest.mark.asyncio
    async def test_consumer():
        # 1. setup connections
        communicator = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="a"))
        connected, _ = await communicator.connect()
        assert connected
    
        comm_a = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="b"))
        connected, _ = await comm_a.connect()
        assert connected
    
        comm_b = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="c"))
        connected, _ = await comm_b.connect()
        assert connected
    
        # 3. subscribe to events
        subscribe = {"foo": "bar"}
        await communicator.send_json_to(subscribe)
        await comm_a.send_json_to(subscribe)
        await comm_b.send_json_to(subscribe)
    
        # 3. publish events
        channel_layer = get_channel_layer()
        await channel_layer.group_send(
            GROUP_NAME,
            {
                'type': 'broadcast_message',
            }
        )
    
        # 4. assert on received events
        expected_broadcast_msg = {"broadcast": "important update"}
    
        res = await communicator.receive_json_from()
        assert res == expected_broadcast_msg
    
>       res = await comm_a.receive_json_from()

mysite/chat/test_consumers.py:59: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.venv/lib/python3.7/site-packages/channels/testing/websocket.py:93: in receive_json_from
    payload = await self.receive_from(timeout)
.venv/lib/python3.7/site-packages/channels/testing/websocket.py:72: in receive_from
    response = await self.receive_output(timeout)
.venv/lib/python3.7/site-packages/asgiref/testing.py:86: in receive_output
    raise e
.venv/lib/python3.7/site-packages/asgiref/testing.py:75: in receive_output
    return await self.output_queue.get()
.venv/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__
    self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <asgiref.timeout.timeout object at 0x110de1cf8>
exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type: Type[BaseException]) -> None:
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

.venv/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError
---------------------------------------- Captured stdout call -----------------------------------------
add me to the group plz a
add me to the group plz c
add me to the group plz b
========================================== 1 failed in 1.60s ==========================================
Task was destroyed but it is pending!
task: <Task pending coro=<double_to_single_callable.<locals>.new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object double_to_single_callable.<locals>.new_application at 0x110b137c8>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440]>
Task was destroyed but it is pending!
task: <Task pending coro=<double_to_single_callable.<locals>.new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object double_to_single_callable.<locals>.new_application at 0x110d62848>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440]>

Specs

OS: MacOS Python: Python 3.7.2

Deps

poetry show
❯ poetry show
aioredis                   1.2.0    asyncio (PEP 3156) Redis support
appdirs                    1.4.3    A small Python module for determining appropriate platform-specific dirs, e.g. a "user data dir".
appnope                    0.1.0    Disable App Nap on OS X 10.9
argh                       0.26.2   An unobtrusive argparse wrapper with natural syntax
asgiref                    3.2.1    ASGI specs, helper code, and adapters
asn1crypto                 0.24.0   Fast ASN.1 parser and serializer with definitions for private keys, public keys, certificates, CRL, OCSP, CMS, PKCS#3, PKCS#7, PKCS#8, PKCS#12, PKCS#5, X.509 and TSP
astroid                    2.2.5    An abstract syntax tree for Python with inference support.
async-timeout              3.0.1    Timeout context manager for asyncio programs
atomicwrites               1.3.0    Atomic file writes.
attrs                      19.1.0   Classes Without Boilerplate
autobahn                   19.8.1   WebSocket client & server library, WAMP real-time framework
automat                    0.7.0    Self-service finite-state machines for the programmer on the go.
backcall                   0.1.0    Specifications for callback functions passed in to an API
black                      18.9b0   The uncompromising code formatter.
certifi                    2019.3.9 Python package for providing Mozilla's CA Bundle.
cffi                       1.12.3   Foreign Function Interface for Python calling C code.
channels                   2.2.0    Brings async, event-driven capabilities to Django. Django 1.11 and up only.
channels-redis             2.4.0    Redis-backed ASGI channel layer implementation
chardet                    3.0.4    Universal encoding detector for Python 2 and 3
click                      7.0      Composable command line interface toolkit
colorama                   0.4.1    Cross-platform colored terminal text.
constantly                 15.1.0   Symbolic constants in Python
coreapi                    2.3.3    Python client library for Core API.
coreschema                 0.0.4    Core Schema.
coverage                   4.5.2    Code coverage measurement for Python
cryptography               2.7      cryptography is a package which provides cryptographic recipes and primitives to Python developers.
daphne                     2.3.0    Django ASGI (HTTP/WebSocket) server
dataclasses                0.6      A backport of the dataclasses module for Python 3.6
decorator                  4.3.2    Better living through Python with decorators
defusedxml                 0.5.0    XML bomb protection for Python stdlib modules
dj-database-url            0.4.2    Use Database URLs in your Django Application.
django                     2.2      A high-level Python Web framework that encourages rapid development and clean, pragmatic design.
django-allauth             0.39.1   Integrated set of Django applications addressing authentication, registration, account management as well as 3rd party (social) account authentication.
django-softdelete          0.9.0    Soft delete support for Django ORM, with undelete.
django-user-sessions       1.6.0    Django sessions with a foreign key to the user
djangorestframework        3.9.2    Web APIs for Django, made easy.
docopt                     0.6.2    Pythonic argument parser, that will make you smile
drf-nested-routers         0.90.0   Nested resources for the Django Rest Framework
drf-yasg                   1.16.0   Automated generation of real Swagger/OpenAPI 2.0 schemas from Django Rest Framework code.
entrypoints                0.3      Discover and load entry points from installed packages.
flake8                     3.7.7    the modular source code checker: pep8, pyflakes and co
flake8-comprehensions      2.1.0    A flake8 plugin to help you write better list/set/dict comprehensions.
flake8-formatter-junit-xml 0.0.5    JUnit XML Formatter for flake8
flake8-pie                 0.4.0    A flake8 extension that implements misc. lints
flake8-print               3.1.0    print statement checker plugin for flake8
gunicorn                   19.7.1   WSGI HTTP Server for UNIX
h11                        0.8.1    A pure-Python, bring-your-own-I/O implementation of HTTP/1.1
hiredis                    1.0.0    Python wrapper for hiredis
honcho                     1.0.1    Honcho: a Python clone of Foreman. For managing Procfile-based applications.
httptools                  0.0.13   A collection of framework independent HTTP protocol utils.
hyperlink                  19.0.0   A featureful, immutable, and correct URL for Python.
icdiff                     1.9.1    improved colored diff
idna                       2.8      Internationalized Domain Names in Applications (IDNA)
incremental                17.5.0   
inflection                 0.3.1    A port of Ruby on Rails inflector to Python
ipdb                       0.11     IPython-enabled pdb
ipython                    7.3.0    IPython: Productive Interactive Computing
ipython-genutils           0.2.0    Vestigial utilities from IPython
isort                      4.3.15   A Python utility / library to sort Python imports.
itypes                     1.1.0    Simple immutable types for python.
jedi                       0.13.3   An autocompletion tool for Python that can be used for text editors.
jinja2                     2.10.1   A small but fast and easy to use stand-alone template engine written in pure python.
junit-xml                  1.8      Creates JUnit XML test result documents that can be read by tools such as Jenkins
lazy-object-proxy          1.3.1    A fast and thorough lazy object proxy.
markupsafe                 1.1.1    Safely add untrusted strings to HTML/XML markup.
mccabe                     0.6.1    McCabe checker, plugin for flake8
more-itertools             7.0.0    More routines for operating on iterables, beyond itertools
msgpack                    0.6.1    MessagePack (de)serializer.
mypy                       0.720    Optional static typing for Python
mypy-extensions            0.4.1    Experimental type system extensions for programs checked with the mypy typechecker.
oauthlib                   3.0.1    A generic, spec-compliant, thorough implementation of the OAuth request-signing logic
packaging                  19.0     Core utilities for Python packages
parso                      0.3.4    A Python Parser
pathtools                  0.1.2    File system general utilities
pexpect                    4.6.0    Pexpect allows easy control of interactive console applications.
pickleshare                0.7.5    Tiny 'shelve'-like database with concurrency support
pint                       0.8.1    Physical quantities module
pluggy                     0.9.0    plugin and hook calling mechanisms for python
pprintpp                   0.4.0    A drop-in replacement for pprint that's actually pretty
prompt-toolkit             2.0.9    Library for building powerful interactive command lines in Python
psycopg2                   2.7.3.2  psycopg2 - Python-PostgreSQL Database Adapter
ptyprocess                 0.6.0    Run a subprocess in a pseudo terminal
py                         1.8.0    library with cross-python path, ini-parsing, io, code, log facilities
pycodestyle                2.5.0    Python style guide checker
pycparser                  2.19     C parser in Python
pydantic                   0.32.2   Data validation and settings management using python 3.6 type hinting
pyflakes                   2.1.1    passive checker of Python programs
pygments                   2.3.1    Pygments is a syntax highlighting package written in Python.
pyhamcrest                 1.9.0    Hamcrest framework for matcher objects
pylint                     2.3.1    python code static checker
pyparsing                  2.3.1    Python parsing module
pytest                     3.10.1   pytest: simple powerful testing with Python
pytest-asyncio             0.10.0   Pytest support for asyncio.
pytest-cov                 2.6.1    Pytest plugin for measuring coverage.
pytest-django              3.4.8    A Django plugin for pytest.
pytest-icdiff              0.2      use icdiff for better error messages in pytest assertions
pytest-sugar               0.9.2    pytest-sugar is a plugin for pytest that changes the default look and feel of pytest (e.g. progressbar, show tests that fail instantly).
pytest-testmon             0.9.16   take TDD to a new level with py.test and testmon
pytest-watch               4.2.0    Local continuous test runner with pytest and watchdog.
python-dotenv              0.10.1   Add .env support to your django/flask apps in development and deployments
python-json-logger         0.1.8    A python library adding a json log formatter
python3-openid             3.1.0    OpenID support for modern servers and consumers.
pytz                       2018.9   World timezone definitions, modern and historical
pywatchman                 1.4.1    Watchman client for python
pyyaml                     5.1      YAML parser and emitter for Python
requests                   2.21.0   Python HTTP for Humans.
requests-oauthlib          1.2.0    OAuthlib authentication support for Requests.
rope                       0.14.0   a python refactoring library...
ruamel.yaml                0.15.97  ruamel.yaml is a YAML parser/emitter that supports roundtrip preservation of comments, seq/map flow style, and map key order
sentry-sdk                 0.7.6    Python client for Sentry (https://getsentry.com)
six                        1.12.0   Python 2 and 3 compatibility utilities
sqlparse                   0.2.4    Non-validating SQL parser
termcolor                  1.1.0    ANSII Color formatting for output in terminal.
toml                       0.10.0   Python Library for Tom's Obvious, Minimal Language
traitlets                  4.3.2    Traitlets Python config system
twisted                    19.7.0   An asynchronous networking framework written in Python
txaio                      18.8.1   Compatibility API between asyncio/Twisted/Trollius
typed-ast                  1.4.0    a fork of Python 2 and 3 ast modules with type comment support
typing                     3.7.4    Type Hints for Python
typing-extensions          3.7.4    Backported and Experimental Type Hints for Python 3.5+
uritemplate                3.0.0    URI templates
urllib3                    1.24.1   HTTP library with thread-safe connection pooling, file post, and more.
uvicorn                    0.8.6    The lightning-fast ASGI server.
uvloop                     0.12.2   Fast implementation of asyncio event loop on top of libuv
watchdog                   0.9.0    Filesystem events monitoring
wcwidth                    0.1.7    Measures number of Terminal column cells of wide-character codes
websockets                 7.0      An implementation of the WebSocket Protocol (RFC 6455 & 7692)
wrapt                      1.11.1   Module for decorators, wrappers and monkey patching.
zope.interface             4.6.0    Interfaces for Python

sbdchd avatar Aug 28 '19 02:08 sbdchd

Hi @sbdchd. Good report. Thank you.

Don't suppose you can experiment in the debugger can you? What happens if you increase timeout? Where inside get_user() are we loosing the time?

carltongibson avatar Aug 28 '19 07:08 carltongibson

I dug into this a bit more.

Increasing timeouts to exorbitant amounts e.g., 500 seconds didn't make a difference.

However, when replacing get_user() with a barebones version shown below, the issue still occurred, although it became more intermittent.

By adding the following diff and just running poetry run pytest mysite a couple times, the error should occur.

Seems like the TimeoutErrors are related to do @database_sync_to_async

EDIT: using @sync_to_async instead of @database_sync_to_async also causes the TimeoutError

diff --git a/mysite/chat/consumers.py b/mysite/chat/consumers.py
index 42fd816..a24ba1f 100644
--- a/mysite/chat/consumers.py
+++ b/mysite/chat/consumers.py
@@ -1,8 +1,12 @@
 from channels.generic.websocket import AsyncJsonWebsocketConsumer
-from channels.auth import get_user
+from channels.db import database_sync_to_async
 
 GROUP_NAME = "FOO_GROUP"
 
+@database_sync_to_async
+def get_user(scope):
+    return None
+
 class ChatConsumer(AsyncJsonWebsocketConsumer):
     async def connect(self):
         await self.accept()
poetry run pytest mysite
========================================= test session starts =========================================
platform darwin -- Python 3.7.3, pytest-5.1.1, py-1.8.0, pluggy-0.12.0
Django settings: mysite.settings (from ini file)
rootdir: /Users/steve/Downloads/channels_tut, inifile: tox.ini
plugins: django-3.5.1, asyncio-0.10.0
collected 1 item                                                                                      

mysite/chat/test_consumers.py F                                                                 [100%]

============================================== FAILURES ===============================================
____________________________________________ test_consumer ____________________________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x106fa93c8>, timeout = 1

    async def receive_output(self, timeout=1):
        """
        Receives a single message from the application, with optional timeout.
        """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout(timeout):
>               return await self.output_queue.get()

.venv/lib/python3.7/site-packages/asgiref/testing.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Queue at 0x106df9470 maxsize=0 tasks=1>

    async def get(self):
        """Remove and return an item from the queue.
    
        If queue is empty, wait until an item is available.
        """
        while self.empty():
            getter = self._loop.create_future()
            self._getters.append(getter)
            try:
>               await getter
E               concurrent.futures._base.CancelledError

/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159: CancelledError

During handling of the above exception, another exception occurred:

    @pytest.mark.django_db(transaction=True)
    @pytest.mark.asyncio
    async def test_consumer():
        # 1. setup connections
        communicator = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="a"))
        connected, _ = await communicator.connect()
        assert connected
    
        comm_a = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="b"))
        connected, _ = await comm_a.connect()
        assert connected
    
        comm_b = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="c"))
        connected, _ = await comm_b.connect()
        assert connected
    
        # 3. subscribe to events
        subscribe = {"foo": "bar"}
        await communicator.send_json_to(subscribe)
        await comm_a.send_json_to(subscribe)
        await comm_b.send_json_to(subscribe)
    
        # 3. publish events
        channel_layer = get_channel_layer()
        await channel_layer.group_send(
            GROUP_NAME,
            {
                'type': 'broadcast_message',
            }
        )
    
        # 4. assert on received events
        expected_broadcast_msg = {"broadcast": "important update"}
    
>       res = await communicator.receive_json_from()

mysite/chat/test_consumers.py:56: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.venv/lib/python3.7/site-packages/channels/testing/websocket.py:93: in receive_json_from
    payload = await self.receive_from(timeout)
.venv/lib/python3.7/site-packages/channels/testing/websocket.py:72: in receive_from
    response = await self.receive_output(timeout)
.venv/lib/python3.7/site-packages/asgiref/testing.py:86: in receive_output
    raise e
.venv/lib/python3.7/site-packages/asgiref/testing.py:75: in receive_output
    return await self.output_queue.get()
.venv/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__
    self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <asgiref.timeout.timeout object at 0x1070c7cf8>
exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type: Type[BaseException]) -> None:
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

.venv/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError
========================================== 1 failed in 1.63s ==========================================
Task was destroyed but it is pending!
task: <Task pending coro=<double_to_single_callable.<locals>.new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object double_to_single_callable.<locals>.new_application at 0x106e43848>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440]>
Task was destroyed but it is pending!
task: <Task pending coro=<double_to_single_callable.<locals>.new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object double_to_single_callable.<locals>.new_application at 0x106ea60c8>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440]>

sbdchd avatar Aug 29 '19 03:08 sbdchd

Increasing timeouts to exorbitant amounts e.g., 500 seconds didn't make a difference.

Grrr. Looks like a deadlock. We're being scheduled in a TreadPoolExecutor that is never getting to run. TBH not sure why that is.

(How's pytest.asyncio setting things up? — is my first thought.)

carltongibson avatar Aug 29 '19 08:08 carltongibson

I added timeouts of 10 seconds and ran the tests under cProfile and it seems the tests are stuck at select.kqueue


         1433004 function calls (1400889 primitive calls) in 11.716 seconds

   Ordered by: internal time
   List reduced from 8069 to 50 due to restriction <50>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      125   10.022    0.080   10.022    0.080 {method 'control' of 'select.kqueue' objects}
      903    0.123    0.000    0.123    0.000 {built-in method marshal.loads}
    12049    0.092    0.000    0.092    0.000 {built-in method posix.lstat}
     5989    0.076    0.000    0.076    0.000 {built-in method posix.stat}
3273/3166    0.058    0.000    0.197    0.000 {built-in method builtins.__build_class__}
      903    0.034    0.000    0.044    0.000 <frozen importlib._bootstrap_external>:914(get_data)
    39/38    0.025    0.001    0.027    0.001 {built-in method _imp.create_dynamic}
 1459/335    0.025    0.000    0.075    0.000 sre_parse.py:475(_parse)
    12423    0.023    0.000    0.037    0.000 posixpath.py:75(join)
      303    0.023    0.000    0.023    0.000 {function SQLiteCursorWrapper.execute at 0x1107bc510}
62666/62620    0.022    0.000    0.023    0.000 {built-in method builtins.getattr}
       10    0.022    0.002    0.022    0.002 {built-in method fcntl.fcntl}
   110609    0.022    0.000    0.024    0.000 {built-in method builtins.isinstance}
64727/64706    0.021    0.000    0.022    0.000 {built-in method builtins.hasattr}
      285    0.017    0.000    0.017    0.000 {built-in method posix.listdir}
     1674    0.017    0.000    0.102    0.000 <frozen importlib._bootstrap_external>:1356(find_spec)
      925    0.017    0.000    0.020    0.000 sre_compile.py:276(_optimize_charset)
     1056    0.016    0.000    0.160    0.000 posixpath.py:400(_joinrealpath)
      151    0.014    0.000    0.014    0.000 {built-in method builtins.compile}
     2336    0.013    0.000    0.021    0.000 posixpath.py:338(normpath)
     1639    0.013    0.000    0.013    0.000 {method 'search' of 're.Pattern' objects}
    25810    0.013    0.000    0.013    0.000 sre_parse.py:233(__next)
 2443/308    0.012    0.000    0.037    0.000 sre_compile.py:71(_compile)
     2675    0.012    0.000    0.019    0.000 pathlib.py:62(parse_parts)
      917    0.011    0.000    0.011    0.000 {method 'read' of '_io.FileIO' objects}
    37081    0.010    0.000    0.010    0.000 {method 'startswith' of 'str' objects}
    85693    0.010    0.000    0.010    0.000 {method 'append' of 'list' objects}
     1038    0.010    0.000    0.202    0.000 <frozen importlib._bootstrap>:882(_find_spec)
     8401    0.010    0.000    0.011    0.000 {built-in method __new__ of type object at 0x10dace030}
      793    0.009    0.000    0.014    0.000 __init__.py:133(__init__)
    12049    0.009    0.000    0.103    0.000 posixpath.py:168(islink)
      903    0.009    0.000    0.207    0.000 <frozen importlib._bootstrap_external>:793(get_code)
      829    0.008    0.000    0.069    0.000 rewrite.py:142(_early_rewrite_bailout)
      518    0.008    0.000    0.014    0.000 __init__.py:398(deconstruct)
 2854/766    0.008    0.000    0.011    0.000 sre_parse.py:174(getwidth)
     9557    0.008    0.000    0.021    0.000 <frozen importlib._bootstrap_external>:56(_path_join)
  5311/96    0.008    0.000    0.018    0.000 copy.py:132(deepcopy)
 1260/326    0.007    0.000    0.963    0.003 <frozen importlib._bootstrap>:978(_find_and_load)
    22199    0.007    0.000    0.018    0.000 sre_parse.py:254(get)
    44480    0.007    0.000    0.008    0.000 {method 'get' of 'dict' objects}
      401    0.007    0.000    0.015    0.000 functional.py:125(__prepare_class__)
18325/18261    0.007    0.000    0.009    0.000 {method 'join' of 'str' objects}
    11516    0.007    0.000    0.007    0.000 {method 'match' of 're.Pattern' objects}
     9557    0.007    0.000    0.010    0.000 <frozen importlib._bootstrap_external>:58(<listcomp>)
    15760    0.007    0.000    0.010    0.000 sre_parse.py:164(__getitem__)
       95    0.007    0.000    0.007    0.000 {built-in method posix.open}
     1816    0.006    0.000    0.015    0.000 <frozen importlib._bootstrap_external>:271(cache_from_source)
      332    0.006    0.000    0.006    0.000 base.py:48(subclass_exception)
     9544    0.006    0.000    0.013    0.000 ast.py:184(iter_child_nodes)
    33897    0.006    0.000    0.006    0.000 {built-in method posix.fspath}

sbdchd avatar Aug 30 '19 00:08 sbdchd

Hey @andrewgodwin. Can I ask if you have any thoughts here? (Maybe you’ve seen this kind of thing before?)

I’m a bit like 😳 as to how to advise. Thanks.

carltongibson avatar Aug 30 '19 05:08 carltongibson

It's worth trying it on an older asgiref version - let's say 3.1.4 - and see if it persists there. I had someone say they might have a potential problem a bit like this with 3.2 but it was un-replicatable.

andrewgodwin avatar Aug 30 '19 05:08 andrewgodwin

Okay, I tested the following versions of asgiref and they exhibit the TimeoutError:

  • 3.2
  • 3.1.4
  • 3.0.0

sbdchd avatar Aug 30 '19 22:08 sbdchd

OK, then it's not asgiref, which is both good and bad.

Have you added logging to the stub get_user to see if it's getting called at all? Given you have three different consumers running at once, it's almost certainly a deadlock that's occurring, maybe due to the ThreadPoolExecutor being exhausted, but it should have a good 8-10 threads by default (and we have some anti-deadlock tests for that code already).

The best thing to do would be to log every step into and out of the get_user function, and the execution of the lines above and below the await, to work out exactly where it is getting stuck and what's run at that point. Tracebacks tend not to be very accurate when asyncio is involved.

andrewgodwin avatar Aug 31 '19 03:08 andrewgodwin

Thanks for the pointers.

I've added logging to the tests and the consumer but the logging seems to paint a similar picture as the traceback.

INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:43 about to subscribe
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:47 sub:comm1
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:49 sub:comm_a
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:51 sub:comm_b
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:56 pub
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:67 about to recv comm
Full traceback
❯ poetry run pytest -o log_cli=true mysite
========================================= test session starts =========================================
platform darwin -- Python 3.7.3, pytest-5.1.1, py-1.8.0, pluggy-0.12.0
Django settings: mysite.settings (from ini file)
rootdir: /Users/steve/Downloads/channels_tut, inifile: tox.ini
plugins: django-3.5.1, asyncio-0.10.0
collected 1 item                                                                                      

mysite/chat/test_consumers.py::test_consumer 
-------------------------------------------- live log call --------------------------------------------
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:43 about to subscribe
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:47 sub:comm1
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:49 sub:comm_a
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:51 sub:comm_b
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:56 pub
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:67 about to recv comm
FAILED                                                                                          [100%]

============================================== FAILURES ===============================================
____________________________________________ test_consumer ____________________________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x105ff15c0>, timeout = 1

    async def receive_output(self, timeout=1):
        """
        Receives a single message from the application, with optional timeout.
        """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout(timeout):
>               return await self.output_queue.get()

.venv/lib/python3.7/site-packages/asgiref/testing.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Queue at 0x105fb1940 maxsize=0 tasks=1>

    async def get(self):
        """Remove and return an item from the queue.
    
        If queue is empty, wait until an item is available.
        """
        while self.empty():
            getter = self._loop.create_future()
            self._getters.append(getter)
            try:
>               await getter
E               concurrent.futures._base.CancelledError

/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159: CancelledError

During handling of the above exception, another exception occurred:

    @pytest.mark.django_db(transaction=True)
    @pytest.mark.asyncio
    async def test_consumer():
        # 1. setup connections
        communicator = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="a"))
        connected, _ = await communicator.connect()
        assert connected
    
        comm_a = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="b"))
        connected, _ = await comm_a.connect()
        assert connected
    
        comm_b = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="c"))
        connected, _ = await comm_b.connect()
        assert connected
    
        log.info("about to subscribe")
    
        # 3. subscribe to events
        subscribe = {"foo": "bar"}
        log.info("sub:comm1")
        await communicator.send_json_to(subscribe)
        log.info("sub:comm_a")
        await comm_a.send_json_to(subscribe)
        log.info("sub:comm_b")
        await comm_b.send_json_to(subscribe)
    
        # 3. publish events
        channel_layer = get_channel_layer()
        log.info("pub")
        await channel_layer.group_send(
            GROUP_NAME,
            {
                'type': 'broadcast_message',
            }
        )
    
        # 4. assert on received events
        expected_broadcast_msg = {"broadcast": "important update"}
    
        log.info("about to recv comm")
>       res = await communicator.receive_json_from()

mysite/chat/test_consumers.py:68: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.venv/lib/python3.7/site-packages/channels/testing/websocket.py:93: in receive_json_from
    payload = await self.receive_from(timeout)
.venv/lib/python3.7/site-packages/channels/testing/websocket.py:72: in receive_from
    response = await self.receive_output(timeout)
.venv/lib/python3.7/site-packages/asgiref/testing.py:86: in receive_output
    raise e
.venv/lib/python3.7/site-packages/asgiref/testing.py:75: in receive_output
    return await self.output_queue.get()
.venv/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__
    self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <asgiref.timeout.timeout object at 0x10622e908>
exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type: Type[BaseException]) -> None:
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

.venv/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError
---------------------------------------- Captured stderr setup ----------------------------------------
Creating test database for alias 'default'...
------------------------------------------ Captured log call ------------------------------------------
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:43 about to subscribe
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:47 sub:comm1
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:49 sub:comm_a
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:51 sub:comm_b
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:56 pub
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:67 about to recv comm
-------------------------------------- Captured stderr teardown ---------------------------------------
Destroying test database for alias 'default'...
========================================== warnings summary ===========================================
mysite/chat/test_consumers.py::test_consumer
  /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/generic/websocket.py:259: RuntimeWarning: coroutine 'get_user' was never awaited
    await self.receive_json(await self.decode_json(text_data), **kwargs)

-- Docs: https://docs.pytest.org/en/latest/warnings.html
==================================== 1 failed, 1 warnings in 1.71s ====================================
Task was destroyed but it is pending!
task: <Task pending coro=<double_to_single_callable.<locals>.new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object double_to_single_callable.<locals>.new_application at 0x105f84c48>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440]>
Task was destroyed but it is pending!
task: <Task pending coro=<double_to_single_callable.<locals>.new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object double_to_single_callable.<locals>.new_application at 0x105ff9248>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440]>
full trackback with `PYTHONASYNCIODEBUG=1`

❯ PYTHONASYNCIODEBUG=1 poetry run pytest -o log_cli=true mysite
============================================================================================= test session starts ==============================================================================================
platform darwin -- Python 3.7.3, pytest-5.1.1, py-1.8.0, pluggy-0.12.0
Django settings: mysite.settings (from ini file)
rootdir: /Users/steve/Downloads/channels_tut, inifile: tox.ini
plugins: django-3.5.1, asyncio-0.10.0
collected 1 item                                                                                                                                                                                               

mysite/chat/test_consumers.py::test_consumer 
------------------------------------------------------------------------------------------------ live log call -------------------------------------------------------------------------------------------------
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:43 about to subscribe
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:47 sub:comm1
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:49 sub:comm_a
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:51 sub:comm_b
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:56 pub
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:67 about to recv comm
FAILED                                                                                                                                                                                                   [100%]

=================================================================================================== FAILURES ===================================================================================================
________________________________________________________________________________________________ test_consumer _________________________________________________________________________________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x105600160>, timeout = 1

    async def receive_output(self, timeout=1):
        """
        Receives a single message from the application, with optional timeout.
        """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout(timeout):
>               return await self.output_queue.get()

.venv/lib/python3.7/site-packages/asgiref/testing.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Queue at 0x105620358 maxsize=0 tasks=1>

    async def get(self):
        """Remove and return an item from the queue.
    
        If queue is empty, wait until an item is available.
        """
        while self.empty():
            getter = self._loop.create_future()
            self._getters.append(getter)
            try:
>               await getter
E               concurrent.futures._base.CancelledError

/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159: CancelledError

During handling of the above exception, another exception occurred:

    @pytest.mark.django_db(transaction=True)
    @pytest.mark.asyncio
    async def test_consumer():
        # 1. setup connections
        communicator = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="a"))
        connected, _ = await communicator.connect()
        assert connected
    
        comm_a = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="b"))
        connected, _ = await comm_a.connect()
        assert connected
    
        comm_b = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="c"))
        connected, _ = await comm_b.connect()
        assert connected
    
        log.info("about to subscribe")
    
        # 3. subscribe to events
        subscribe = {"foo": "bar"}
        log.info("sub:comm1")
        await communicator.send_json_to(subscribe)
        log.info("sub:comm_a")
        await comm_a.send_json_to(subscribe)
        log.info("sub:comm_b")
        await comm_b.send_json_to(subscribe)
    
        # 3. publish events
        channel_layer = get_channel_layer()
        log.info("pub")
        await channel_layer.group_send(
            GROUP_NAME,
            {
                'type': 'broadcast_message',
            }
        )
    
        # 4. assert on received events
        expected_broadcast_msg = {"broadcast": "important update"}
    
        log.info("about to recv comm")
>       res = await communicator.receive_json_from()

mysite/chat/test_consumers.py:68: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.7/site-packages/channels/testing/websocket.py:93: in receive_json_from
    payload = await self.receive_from(timeout)
.venv/lib/python3.7/site-packages/channels/testing/websocket.py:72: in receive_from
    response = await self.receive_output(timeout)
.venv/lib/python3.7/site-packages/asgiref/testing.py:86: in receive_output
    raise e
.venv/lib/python3.7/site-packages/asgiref/testing.py:75: in receive_output
    return await self.output_queue.get()
.venv/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__
    self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asgiref.timeout.timeout object at 0x105a38748>, exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type: Type[BaseException]) -> None:
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

.venv/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError
-------------------------------------------------------------------------------------------- Captured stderr setup ---------------------------------------------------------------------------------------------
Creating test database for alias 'default'...
---------------------------------------------------------------------------------------------- Captured log call -----------------------------------------------------------------------------------------------
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:43 about to subscribe
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:47 sub:comm1
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:49 sub:comm_a
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:51 sub:comm_b
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:56 pub
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:67 about to recv comm
------------------------------------------------------------------------------------------- Captured stderr teardown -------------------------------------------------------------------------------------------
Destroying test database for alias 'default'...
=============================================================================================== warnings summary ===============================================================================================
mysite/chat/test_consumers.py::test_consumer
  /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/generic/websocket.py:259: RuntimeWarning: coroutine 'get_user' was never awaited
    await self.receive_json(await self.decode_json(text_data), **kwargs)

-- Docs: https://docs.pytest.org/en/latest/warnings.html
======================================================================================== 1 failed, 1 warnings in 1.69s =========================================================================================
Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/bin/pytest", line 10, in <module>
    sys.exit(main())
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/config/__init__.py", line 77, in main
    return config.hook.pytest_cmdline_main(config=config)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 228, in pytest_cmdline_main
    return wrap_session(config, _main)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 191, in wrap_session
    session.exitstatus = doit(config, session) or 0
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 235, in _main
    config.hook.pytest_runtestloop(session=session)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 256, in pytest_runtestloop
    item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 72, in pytest_runtest_protocol
    runtestprotocol(item, nextitem=nextitem)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 87, in runtestprotocol
    reports.append(call_and_report(item, "call", log))
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 167, in call_and_report
    call = call_runtest_hook(item, when, **kwds)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 192, in call_runtest_hook
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 220, in from_call
    result = func()
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 192, in <lambda>
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 117, in pytest_runtest_call
    item.runtest()
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/python.py", line 1423, in runtest
    self.ihook.pytest_pyfunc_call(pyfuncitem=self)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pytest_asyncio/plugin.py", line 158, in pytest_pyfunc_call
    pyfuncitem.obj(**testargs), loop=event_loop))
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 571, in run_until_complete
    self.run_forever()
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
    self._run_once()
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 1767, in _run_once
    handle._run()
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py", line 35, in test_consumer
    comm_a = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="b"))
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/testing/websocket.py", line 26, in __init__
    super().__init__(application, self.scope)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/testing.py", line 21, in __init__
    self.application(scope, self.input_queue.get, self.output_queue.put)
task: <Task pending coro=<double_to_single_callable.<locals>.new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=<Future cancelled created at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py:396> created at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/testing.py:21>
Exception ignored in: <coroutine object double_to_single_callable.<locals>.new_application at 0x1058704c8>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/bin/pytest", line 10, in <module>
    sys.exit(main())
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/config/__init__.py", line 77, in main
    return config.hook.pytest_cmdline_main(config=config)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 228, in pytest_cmdline_main
    return wrap_session(config, _main)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 191, in wrap_session
    session.exitstatus = doit(config, session) or 0
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 235, in _main
    config.hook.pytest_runtestloop(session=session)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 256, in pytest_runtestloop
    item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 72, in pytest_runtest_protocol
    runtestprotocol(item, nextitem=nextitem)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 87, in runtestprotocol
    reports.append(call_and_report(item, "call", log))
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 167, in call_and_report
    call = call_runtest_hook(item, when, **kwds)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 192, in call_runtest_hook
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 220, in from_call
    result = func()
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 192, in <lambda>
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 117, in pytest_runtest_call
    item.runtest()
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/python.py", line 1423, in runtest
    self.ihook.pytest_pyfunc_call(pyfuncitem=self)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pytest_asyncio/plugin.py", line 158, in pytest_pyfunc_call
    pyfuncitem.obj(**testargs), loop=event_loop))
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 571, in run_until_complete
    self.run_forever()
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
    self._run_once()
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 1767, in _run_once
    handle._run()
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application
    return await instance(receive, send)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
    return await self.inner(receive, self.send)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
    await inner_instance(receive, send)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__
    [receive, self.channel_receive], self.dispatch
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 53, in await_many_dispatch
    tasks[i] = asyncio.ensure_future(consumer_callables[i]())
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159> wait_for=<Future cancelled created at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py:396> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440] created at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py:53>
Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/bin/pytest", line 10, in <module>
    sys.exit(main())
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/config/__init__.py", line 77, in main
    return config.hook.pytest_cmdline_main(config=config)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 228, in pytest_cmdline_main
    return wrap_session(config, _main)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 191, in wrap_session
    session.exitstatus = doit(config, session) or 0
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 235, in _main
    config.hook.pytest_runtestloop(session=session)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 256, in pytest_runtestloop
    item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 72, in pytest_runtest_protocol
    runtestprotocol(item, nextitem=nextitem)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 87, in runtestprotocol
    reports.append(call_and_report(item, "call", log))
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 167, in call_and_report
    call = call_runtest_hook(item, when, **kwds)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 192, in call_runtest_hook
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 220, in from_call
    result = func()
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 192, in <lambda>
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 117, in pytest_runtest_call
    item.runtest()
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/python.py", line 1423, in runtest
    self.ihook.pytest_pyfunc_call(pyfuncitem=self)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pytest_asyncio/plugin.py", line 158, in pytest_pyfunc_call
    pyfuncitem.obj(**testargs), loop=event_loop))
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 571, in run_until_complete
    self.run_forever()
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
    self._run_once()
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 1767, in _run_once
    handle._run()
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py", line 39, in test_consumer
    comm_b = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="c"))
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/testing/websocket.py", line 26, in __init__
    super().__init__(application, self.scope)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/testing.py", line 21, in __init__
    self.application(scope, self.input_queue.get, self.output_queue.put)
task: <Task pending coro=<double_to_single_callable.<locals>.new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=<Future cancelled created at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py:396> created at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/testing.py:21>
Exception ignored in: <coroutine object double_to_single_callable.<locals>.new_application at 0x1059f23c8>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/bin/pytest", line 10, in <module>
    sys.exit(main())
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/config/__init__.py", line 77, in main
    return config.hook.pytest_cmdline_main(config=config)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 228, in pytest_cmdline_main
    return wrap_session(config, _main)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 191, in wrap_session
    session.exitstatus = doit(config, session) or 0
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 235, in _main
    config.hook.pytest_runtestloop(session=session)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 256, in pytest_runtestloop
    item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 72, in pytest_runtest_protocol
    runtestprotocol(item, nextitem=nextitem)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 87, in runtestprotocol
    reports.append(call_and_report(item, "call", log))
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 167, in call_and_report
    call = call_runtest_hook(item, when, **kwds)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 192, in call_runtest_hook
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 220, in from_call
    result = func()
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 192, in <lambda>
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 117, in pytest_runtest_call
    item.runtest()
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/python.py", line 1423, in runtest
    self.ihook.pytest_pyfunc_call(pyfuncitem=self)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall
    res = hook_impl.function(*args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pytest_asyncio/plugin.py", line 158, in pytest_pyfunc_call
    pyfuncitem.obj(**testargs), loop=event_loop))
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 571, in run_until_complete
    self.run_forever()
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
    self._run_once()
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 1767, in _run_once
    handle._run()
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application
    return await instance(receive, send)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
    return await self.inner(receive, self.send)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
    await inner_instance(receive, send)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__
    [receive, self.channel_receive], self.dispatch
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 53, in await_many_dispatch
    tasks[i] = asyncio.ensure_future(consumer_callables[i]())
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159> wait_for=<Future cancelled created at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py:396> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440] created at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py:53>

sbdchd avatar Aug 31 '19 04:08 sbdchd

So, I've replicated this enough to work out it's a side effect specific to channels_redis. If I replace the broadcast mechanism with something else, it functions fine.

andrewgodwin avatar Sep 04 '19 01:09 andrewgodwin

OK, so in my digging, this looks like it's related to the channels_redis mechanism which tries to share connections across coroutines in the same process. That code wasn't super amazing before, and something about what sync_to_async is doing is upsetting it and causing the entire tower of cards to fall over.

@sbdchd I know this isn't a very helpful suggestion, but if you can eliminate your use of channels_redis you will stop encountering this problem. I know that's probably not a realistic option, but if you were using it as a bridge until you invested in a real message bus, maybe now is the time.

I don't have the mental space to go digging into the channels_redis connection-sharing code right now, as I barely understood it as I wrote it, and it's honestly a lot of effort to solve the small problem of connection sharing and frankly the whole thing should probably be torn down for something simpler but slightly less efficient if nobody is able to debug issues like this!

andrewgodwin avatar Sep 04 '19 02:09 andrewgodwin

Hey @andrewgodwin. Thanks for the work. Do you have a reproduce set-up that demonstrates the issue? (Maybe you don't... 😬)

carltongibson avatar Sep 04 '19 06:09 carltongibson

Yup, I cloned the demo repo above and set it up and it fails consistently.

andrewgodwin avatar Sep 04 '19 06:09 andrewgodwin

Well that’s easy... 😀

I kind of meant that highlights the exact pain point, but don’t worry. Thank you for looking into it.

carltongibson avatar Sep 04 '19 07:09 carltongibson

Okay, I kind of see how it might come up with the channel_redis connection pool. I’ll have a play and see if I can pin it down.

carltongibson avatar Sep 04 '19 14:09 carltongibson

I changed the config to use rabbitmq via https://github.com/CJWorkbench/channels_rabbitmq/ however the tests still have issues with timeout errors.

It's possible that I don't have rabbitmq configured properly.

poetry run pytest mysite output and stack trace
❯ poetry run pytest mysite
========================================= test session starts =========================================
platform darwin -- Python 3.7.3, pytest-5.1.1, py-1.8.0, pluggy-0.12.0
Django settings: mysite.settings (from ini file)
rootdir: /Users/steve/Downloads/channels_tut, inifile: tox.ini
plugins: django-3.5.1, asyncio-0.10.0
collected 1 item                                                                                      

mysite/chat/test_consumers.py F                                                                 [100%]

============================================== FAILURES ===============================================
____________________________________________ test_consumer ____________________________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x105df7908>, timeout = 1

    async def receive_output(self, timeout=1):
        """
        Receives a single message from the application, with optional timeout.
        """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout(timeout):
>               return await self.output_queue.get()

.venv/lib/python3.7/site-packages/asgiref/testing.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Queue at 0x105d9e0f0 maxsize=0 tasks=1>

    async def get(self):
        """Remove and return an item from the queue.
    
        If queue is empty, wait until an item is available.
        """
        while self.empty():
            getter = self._loop.create_future()
            self._getters.append(getter)
            try:
>               await getter
E               concurrent.futures._base.CancelledError

/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159: CancelledError

During handling of the above exception, another exception occurred:

    @pytest.mark.django_db(transaction=True)
    @pytest.mark.asyncio
    async def test_consumer():
        # 1. setup connections
        communicator = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="a"))
        connected, _ = await communicator.connect()
        assert connected
    
        comm_a = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="b"))
        connected, _ = await comm_a.connect()
        assert connected
    
        comm_b = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="c"))
        connected, _ = await comm_b.connect()
        assert connected
    
        # 3. subscribe to events
        subscribe = {"foo": "bar"}
        await communicator.send_json_to(subscribe)
        await comm_a.send_json_to(subscribe)
        await comm_b.send_json_to(subscribe)
    
        # 3. publish events
        channel_layer = get_channel_layer()
        await channel_layer.group_send(
            GROUP_NAME,
            {
                'type': 'broadcast_message',
            }
        )
    
        # 4. assert on received events
        expected_broadcast_msg = {"broadcast": "important update"}
    
>       res = await communicator.receive_json_from()

mysite/chat/test_consumers.py:56: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.venv/lib/python3.7/site-packages/channels/testing/websocket.py:93: in receive_json_from
    payload = await self.receive_from(timeout)
.venv/lib/python3.7/site-packages/channels/testing/websocket.py:72: in receive_from
    response = await self.receive_output(timeout)
.venv/lib/python3.7/site-packages/asgiref/testing.py:86: in receive_output
    raise e
.venv/lib/python3.7/site-packages/asgiref/testing.py:75: in receive_output
    return await self.output_queue.get()
.venv/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__
    self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <asgiref.timeout.timeout object at 0x105f06cc0>
exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type: Type[BaseException]) -> None:
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

.venv/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError
------------------------------------------ Captured log call ------------------------------------------
WARNING  aioamqp.protocol:protocol.py:190 only PLAIN login_method is supported, falling back to AMQPLAIN
========================================== 1 failed in 1.64s ==========================================
Task was destroyed but it is pending!
task: <Task pending coro=<double_to_single_callable.<locals>.new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=<Future cancelled>>
Task was destroyed but it is pending!
task: <Task pending coro=<double_to_single_callable.<locals>.new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object double_to_single_callable.<locals>.new_application at 0x105d31ac8>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed
RuntimeError: Event loop is closed
Task exception was never retrieved
future: <Task finished coro=<RabbitmqChannelLayer.receive() done, defined at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/core.py:149> exception=ChannelClosed(0, 'Channel is closed')>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/core.py", line 162, in receive
    return await connection.receive(channel)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/connection.py", line 780, in receive
    return await self._incoming_messages.get(asgi_channel)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/connection.py", line 283, in get
    item = await self._out[asgi_channel].get()
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/connection.py", line 126, in get
    await getter  # raises ChannelClosed
aioamqp.exceptions.ChannelClosed: (0, 'Channel is closed')
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440]>
Exception ignored in: <coroutine object double_to_single_callable.<locals>.new_application at 0x105ee72c8>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed
RuntimeError: Event loop is closed
Task exception was never retrieved
future: <Task finished coro=<RabbitmqChannelLayer.receive() done, defined at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/core.py:149> exception=ChannelClosed(0, 'Channel is closed')>
Traceback (most recent call last):
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/core.py", line 162, in receive
    return await connection.receive(channel)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/connection.py", line 780, in receive
    return await self._incoming_messages.get(asgi_channel)
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/connection.py", line 283, in get
    item = await self._out[asgi_channel].get()
  File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/connection.py", line 126, in get
    await getter  # raises ChannelClosed
aioamqp.exceptions.ChannelClosed: (0, 'Channel is closed')
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440]>

edit: I pushed my changes to the rabbitmq-layer branch on https://github.com/sbdchd/django-channels-bug-repo

sbdchd avatar Sep 05 '19 23:09 sbdchd

That channel layer also uses a very complicated asyncio-based setup to share connections from a single process. Ideally asgiref would not disrupt whatever mechanism they're both using to work, but it needs to be pinned down to the exact failing call first before I can work out if I can make things compatible, unfortunately.

andrewgodwin avatar Sep 06 '19 01:09 andrewgodwin

I can comfirm use InMemoryChannelLayer also cause the problem.

ghost avatar Aug 25 '20 16:08 ghost

I'm also facing this issue while testing, And I'm not using channels_redis in consumer.

I have written custom auth middleware and wrapped it up around URLRouter. My middleware is querying database to validate the token using @database_sync_to_async. Which is triggering TimeoutError

Middleware:


@database_sync_to_async
def get_user(token: bytes):
    #validate token and return user or return AnonymousUser
     pass


class WebSocketAuthMiddleware:
    def __init__(self, app):
        # Store the ASGI application we were passed
        self.app = app

    def get_headers(self, scope) -> Dict[bytes, bytes]:
        return {h[0]: h[1] for h in scope.get("headers", ())}

    async def __call__(self, scope, receive, send):
        headers = self.get_headers(scope)
        scope["user"] = await get_user(headers.get(b"token", b""))

Asgi:

application = ProtocolTypeRouter(
    {
        "http": get_asgi_application(),
        "websocket": WebSocketAuthMiddleware(URLRouter(websocket_urlpatterns)),
    }
)

Let me know if you require a sample project to test this out. I'll upload it to Github and link it here.

libraries and versions: channels: 3.0.4 django: 4.0.1 pytest: 6.2.5 pytest-django: 4.5.2 pytest-asycnio: 0.17.2

Tested on: Mac OS Catalina 10.15.7 with python 3.8.6

kirankumbhar avatar Jan 31 '22 13:01 kirankumbhar