channels icon indicating copy to clipboard operation
channels copied to clipboard

WARNING - server - Application instance took too long to shut down and was killed

Open sohamnavadiya opened this issue 5 years ago β€’ 37 comments

first message is succeed but further messages throw this exception:

2018-08-09 15:25:32,937 - WARNING - server - Application instance <Task pending coro=<__call__() running at /home/soham/PycharmProjects/lead-generation/venv/lib/python3.5/site-packages/channels/sessions.py:175> wait_for=<Future pending cb=[Task._wakeup()]>> for connection <WebSocketProtocol client=['127.0.0.1', 35996] path=b'/lead/'> took too long to shut down and was killed.

My consumer class

class LeadConsumer(AsyncWebsocketConsumer):
    async def websocket_connect(self, event):
        print("Connected", event)
        self.room_group_name = 'lead_list'

        # join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
        await self.accept()

    async def websocket_receive(self, event):
        print("Receive", event)
        await self.send({
            "type": "websocket.send",
            "text": "From receive..."
        })

    async def websocket_disconnect(self, event):
        print("Disconnect", event)
        await self.send({
            "type": "websocket.close"
        })

    async def lead_list(self, event):
        _contact = event['contact']
        _campaign = event['campaign']
        _company = event['company']
        _contact_person = event['contact_person']

        await self.send(text_data=json.dumps({
            'contact': _contact,
            'campaign': _campaign,
            'contact_person': _contact_person,
            'company': _company
        }))

requirements.txt

aioredis==1.1.0
asgiref==2.3.2
async-timeout==2.0.1
attrs==18.1.0
autobahn==18.7.1
Automat==0.7.0
certifi==2018.4.16
channels==2.1.2
channels-redis==2.2.1
chardet==3.0.4
constantly==15.1.0
daphne==2.2.1
diff-match-patch==20121119
Django==2.0.7
django-import-export==1.0.1
django-rest-framework==0.1.0
djangorestframework==3.8.2
docutils==0.14
et-xmlfile==1.0.1
hiredis==0.2.0
hyperlink==18.0.0
idna==2.7
incremental==17.5.0
jdcal==1.4
msgpack==0.5.6
odfpy==1.3.6
openpyxl==2.5.4
PyHamcrest==1.9.0
python-decouple==3.1
pytz==2018.5
PyYAML==3.13
requests==2.19.1
six==1.11.0
tablib==0.12.1
Twisted==18.7.0
txaio==18.7.1
unicodecsv==0.14.1
urllib3==1.23
xlrd==1.1.0
xlwt==1.3.0
zope.interface==4.5.0

How to solve my issue?
Thanks in advance.

sohamnavadiya avatar Aug 09 '18 15:08 sohamnavadiya

Here is my log:

System check identified no issues (0 silenced).
August 09, 2018 - 15:38:13
Django version 2.0.7, using settings 'leadgeneration.settings'
Starting ASGI/Channels version 2.1.2 development server at http://0.0.0.0:8082/
Quit the server with CONTROL-C.
2018-08-09 15:38:13,066 - INFO - server - HTTP/2 support not enabled (install the http2 and tls Twisted extras)
2018-08-09 15:38:13,066 - INFO - server - Configuring endpoint tcp:port=8082:interface=0.0.0.0
2018-08-09 15:38:13,067 - INFO - server - Listening on TCP address 0.0.0.0:8082
[2018/08/09 15:38:19] WebSocket HANDSHAKING /lead/ [127.0.0.1:45618]
Connected {'type': 'websocket.connect'}
[2018/08/09 15:38:19] WebSocket CONNECT /lead/ [127.0.0.1:45618]
[2018/08/09 15:38:21] HTTP GET /lead-call/XXXXXXXXXX/ 200 [0.03, 127.0.0.1:45650]
[2018/08/09 15:38:21] WebSocket DISCONNECT /lead/ [127.0.0.1:45618]
Disconnect {'type': 'websocket.disconnect', 'code': 1001}
2018-08-09 15:38:32,087 - WARNING - server - Application instance <Task pending coro=<__call__() running at /home/soham/PycharmProjects/lead-generation/venv/lib/python3.5/site-packages/channels/sessions.py:175> wait_for=<Future pending cb=[Task._wakeup()]>> for connection <WebSocketProtocol client=['127.0.0.1', 45618] path=b'/lead/'> took too long to shut down and was killed.

sohamnavadiya avatar Aug 09 '18 15:08 sohamnavadiya

This means something in your consumer is not exiting after the disconnect (you don't need to send websocket.close, so get rid of that) - it's not a problem in Channels from what I can tell.

If you can trace it down to a clear problem in channels with steps to reproduce from a fresh project, please re-open the bug - you can see more about where to ask questions at http://channels.readthedocs.io/en/latest/support.html

andrewgodwin avatar Aug 09 '18 22:08 andrewgodwin

In my case, the problem was with redis.

check if there are tcp connections in CLOSE_WAIT state.

netstat | grep 6379 | fgrep -c CLOSE_WAIT

In my case I had around 50+ in such state. Had to update redis.

annshress avatar Oct 03 '19 04:10 annshress

I'm gonna say this here as it may help others. In my case the problem was just a silly mistake. In the code I was maintaining websocket_disconnect was defined without call to super() so StopConsumer() was never raised. The right way would be to override disconnect method which is wrapped inside websocket_disconnect in AsyncWebsocketConsumer. Just avoiding to override websocket_disconnect solved my problem.

In the example above the same pattern exists. However I am not sure if that was the case back then in 2018 or this is a new style in Channels' code.

smohsensh avatar Aug 18 '20 12:08 smohsensh

I have a similar situation, but not the same I have:

  • django== 2.2.9
  • channels == 2.4.0
  • daphne == 2.5.0

I am using Redis as backend with the code:

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [(CELERY_BROKER_URL)],
            'capacity': 300
        },
    },
}
ASGI_THREADS = 1000

Using Daphne for Http and WebSocket

My asgi file:

import django
django.setup()
import chat.routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')
from channels.http import AsgiHandler
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
application = ProtocolTypeRouter({
	'websocket': AuthMiddlewareStack(
	        URLRouter(
	            chat.routing.websocket_urlpatterns
	        )
	    ),
})

Here is the error message: Application instance <Task pending coro=<AsgiHandler.__call__() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7f301c57f738>()]>> for connection <WebRequest at 0x7f301c56a710 method=POST uri=/api/v1/analytics/divided/meteo/ clientproto=HTTP/1.0> took too long to shut down and was killed. This happens when Fron-end sends requests to the Django. Sometimes it returns the response immediately, sometimes it takes a long time, frequently it throws the above error. I thought that it because of worker's concurrency, which is responsible for HTTP requests.

It is not the same case as mentioned above, because:

  1. I am using generic "WebsocketConsumer" because querying data from the database via Django's ORM
  2. Error occurs because of AsgiHandler.__call__() on HTTP request. Websocket connection is not even established

Can you explain why I have this issue? I thought the reason is django's version incompatibility

ReadMost avatar Nov 04 '20 13:11 ReadMost

I have a similar situation, but not the same I have:

  • django== 2.2.9
  • channels == 2.4.0
  • daphne == 2.5.0

I am using Redis as backend with the code:

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [(CELERY_BROKER_URL)],
            'capacity': 300
        },
    },
}
ASGI_THREADS = 1000

Using Daphne for Http and WebSocket

My asgi file:

import django
django.setup()
import chat.routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')
from channels.http import AsgiHandler
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
application = ProtocolTypeRouter({
	'websocket': AuthMiddlewareStack(
	        URLRouter(
	            chat.routing.websocket_urlpatterns
	        )
	    ),
})

Here is the error message: Application instance <Task pending coro=<AsgiHandler.__call__() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7f301c57f738>()]>> for connection <WebRequest at 0x7f301c56a710 method=POST uri=/api/v1/analytics/divided/meteo/ clientproto=HTTP/1.0> took too long to shut down and was killed. This happens when Fron-end sends requests to the Django. Sometimes it returns the response immediately, sometimes it takes a long time, frequently it throws the above error. I thought that it because of worker's concurrency, which is responsible for HTTP requests.

It is not the same case as mentioned above, because:

  1. I am using generic "WebsocketConsumer" because querying data from the database via Django's ORM
  2. Error occurs because of AsgiHandler.__call__() on HTTP request. Websocket connection is not even established

Can you explain why I have this issue? I thought the reason is django's version incompatibility

I'm facing the same issue in my project.

ThreshHNS avatar Nov 16 '20 14:11 ThreshHNS

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

MrVhek avatar Nov 17 '20 18:11 MrVhek

worker's

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

I could not solve the problem as I wanted. Instead, I run two containers:

  1. The first container runs the Django without asgi (in wsgi mode) via the command ./manage.py runserver --noasgi
  2. The second container run the Django for the only WebSocket via daphne via the command daphne -u /tmp/daphne.sock -b 0.0.0.0 -p 8088 app.asgi:application -v 2

So, I used Nginx as the reverse proxy to navigate all requests with "ws/" path to the second container, and the rest to the first container. Here is my conf.template code:

map $http_upgrade $connection_upgrade {
    default upgrade;
    '' close;
}

upstream websocket {
        server app_websocket:8088;
    }
server {
...............
location / {
        proxy_pass ${PROXY_PASS};
        proxy_set_header Host ${DOMAIN_NAME};
        #client_max_body_size 100M;
        #proxy_buffering on;
        #proxy_buffers 256 4k;
        #proxy_busy_buffers_size 4k;
        #proxy_read_timeout 60s;
        #proxy_send_timeout 60s;
        #proxy_max_temp_file_size 1024m;
        #proxy_temp_file_write_size 4k;
    }
     location /ws/ {
        proxy_pass http://websocket;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
        proxy_set_header Host ${DOMAIN_NAME};
    }
............
}

ReadMost avatar Nov 17 '20 21:11 ReadMost

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

This solved the issue for us. The app started to time out requests and emitting those warnings.

Pip freezing seems to be a must.

fbove avatar Dec 23 '20 16:12 fbove

I have the same issue, I tried the solution from @MrVhek and unfortunately it didn't work. Any other hint ? Thanks

hafid-d avatar Jan 15 '21 18:01 hafid-d

@andrewgodwin I can reproduce this by changing the chat example in document:

import json
import asyncio
import logging
from channels.generic.websocket import AsyncWebsocketConsumer


class ChatConsumer(AsyncWebsocketConsumer):
    def __init__(self, *args, **kwargs):
        super(ChatConsumer, self).__init__(*args, **kwargs)
        self.room_name = None
        self.room_group_name = None
        self.disconnected = True

    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'

        await self.channel_layer.group_add(self.room_group_name, self.channel_name)
        await self.accept()
        self.disconnected = False  # connected, set flag to False

    async def disconnect(self, code):
        await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
        self.disconnected = True  # disconnected, set flag to True

    async def receive(self, text_data=None, bytes_data=None):
        text_json = json.loads(text_data)
        message = text_json['message']

        await self.channel_layer.group_send(self.room_group_name, {
            'type': 'chat_message',
            'message': message,
        })

    async def chat_message(self, event):
        message = event['message']

        # now simulate subscribe to redis, get the newest state, and send to user
        # if disconnected, stop sending states
        i = 0
        while not self.disconnected:
            await asyncio.sleep(1)  # simulate sub to redis and wait new state
            i += 1
            logging.error('sending...')
            await self.send(text_data=json.dumps({
                'message': f'new state {i}'
            }))

and then

  1. start the dev server
  2. start a client to connect to a chat room
  3. send a chat message
  4. server start to push new states
  5. close the client
  6. wait several seconds and the exception happens

It seems that the server push code (here the chat_message) preventing the call to disconnect(), so the self.disconnected flag have no chance to be set to True.

And here is my log output:

System check identified no issues (0 silenced).
January 31, 2021 - 17:48:26
Django version 3.1.5, using settings 'light.settings'
Starting ASGI/Channels version 3.0.3 development server at http://127.0.0.1:8000/
Quit the server with CONTROL-C.
WebSocket HANDSHAKING /ws/chat/cc/ [127.0.0.1:43922]
WebSocket CONNECT /ws/chat/cc/ [127.0.0.1:43922]
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
WebSocket DISCONNECT /ws/chat/cc/ [127.0.0.1:43922]          <<<<<=== Client closed here. but still sending states to client
INFO:django.channels.server:WebSocket DISCONNECT /ws/chat/cc/ [127.0.0.1:43922]
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
WARNING:daphne.server:Application instance <Task pending name='Task-1' coro=<StaticFilesWrapper.__call__() running at /home/felix/PycharmProjects/light/venv/lib/python3.9/site-packages/channels/staticfiles.py:44> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f36dc9dea30>()]>> for connection <WebSocketProtocol client=['127.0.0.1', 43922] path=b'/ws/chat/cc/'> took too long to shut down and was killed.


tinylambda avatar Jan 31 '21 17:01 tinylambda

@tinylambda thanks for the extra info. Not sure yet if this is a usage issue we should document or something else, but I will reopen to investigate

carltongibson avatar Jan 31 '21 18:01 carltongibson

@tinylambda thanks for the extra info. Not sure yet if this is a usage issue we should document or something else, but I will reopen to investigate

@carltongibson Thanks for the reply. In my use case, I want to use django-channels as an access layer, and keep sending new game state to user either by time interval (every 1 second) or states (state changed) from redis server. I think it should be a typical use case when it comes to websockets.

tinylambda avatar Jan 31 '21 18:01 tinylambda

@tinylambda thanks for the extra info. Not sure yet if this is a usage issue we should document or something else, but I will reopen to investigate

@carltongibson Thanks for the reply. In my use case, I want to use django-channels as an access layer, and keep sending new game state to user either by time interval (every 1 second) or states (state changed) from redis server. I think it should be a typical use case when it comes to websockets.

If the coroutine (here chat_message) handle the message takes too long time to complete (to broadcast messages for example), it will block every message with the same type process until previous coroutine completes (maybe never).

As a potential solution, how about dispatching every handler coroutine (chat_message) in tasks (loop.create_task), and trace every task instance in a central place (for a consumer instance), and check the tasks at some point (when and how?) to clear completed ones, and when websocket disconnected, just cancel all tasks remaining active.

We can add a MAX_ACTIVE_TASKS to limit the max tasks can be created to avoid creating too many slow tasks.

If set MAX_ACTIVE_TASKS=1, we can use django-channels server as a broadcast service. If set MAX_ACTIVE_TASKS > 1 we can process more same type of messages concurrently.

It's comparable: Create more tasks to handle some type of messages in a consumer instance (for one connection with states). And Create more threads or processes to handle user requests in a HTTP web server (for all user requests without states).

tinylambda avatar Feb 02 '21 18:02 tinylambda

@tinylambda thanks for the extra info. Not sure yet if this is a usage issue we should document or something else, but I will reopen to investigate

@carltongibson Thanks for the reply. In my use case, I want to use django-channels as an access layer, and keep sending new game state to user either by time interval (every 1 second) or states (state changed) from redis server. I think it should be a typical use case when it comes to websockets.

If the coroutine (here chat_message) handle the message takes too long time to complete (to broadcast messages for example), it will block every message with the same type process until previous coroutine completes (maybe never).

As a potential solution, how about dispatching every handler coroutine (chat_message) in tasks (loop.create_task), and trace every task instance in a central place (for a consumer instance), and check the tasks at some point (when and how?) to clear completed ones, and when websocket disconnected, just cancel all tasks remaining active.

We can add a MAX_ACTIVE_TASKS to limit the max tasks can be created to avoid creating too many slow tasks.

If set MAX_ACTIVE_TASKS=1, we can use django-channels server as a broadcast service. If set MAX_ACTIVE_TASKS > 1 we can process more same type of messages concurrently.

It's comparable: Create more tasks to handle some type of messages in a consumer instance (for one connection with states). And Create more threads or processes to handle user requests in a HTTP web server (for all user requests without states).

I implemented a Consumer to dispatch message handler as tasks (only affect user defined tasks):

import asyncio
import copy
import collections
import functools
import json
from channels.consumer import get_handler_name
from channels.generic.websocket import AsyncWebsocketConsumer


class ChatConsumer(AsyncWebsocketConsumer):
    MAX_ACTIVE_TASKS = 2

    def __init__(self, *args, **kwargs):
        super(ChatConsumer, self).__init__(*args, **kwargs)
        self.handler_tasks = collections.defaultdict(list)
        self.joined_groups = set()

        self.room_name = None
        self.room_group_name = None

    def complete_task(self, task_instance, handler_name):
        print(f'Complete task for handler {handler_name}, task instance {task_instance}')
        self.handler_tasks[handler_name].remove(task_instance)
        print(
            f'There are still {len(self.handler_tasks[handler_name])} active tasks for'
            f' handler {handler_name}'
        )

    async def dispatch(self, message):
        handler_name = get_handler_name(message)
        handler = getattr(self, handler_name, None)
        if handler:
            if handler_name.startswith('chat_'):
                # Create a task to process message
                loop = asyncio.get_event_loop()
                if len(self.handler_tasks[handler_name]) >= self.MAX_ACTIVE_TASKS:
                    await self.send(text_data=json.dumps({
                        'message': 'MAX_ACTIVE_TASKS reached'
                    }))
                else:
                    handler_task = loop.create_task(handler(message))
                    # don't forget to remove the task from self.handler_tasks
                    # when task completed
                    handler_task.add_done_callback(
                        functools.partial(self.complete_task, handler_name=handler_name)
                    )
                    self.handler_tasks[handler_name].append(handler_task)
            else:
                # The old way to process message
                await handler(message)
        else:
            raise ValueError("No handler for message type %s" % message["type"])

    async def clear_handler_tasks(self):
        for handler_name in self.handler_tasks:
            task_instances = self.handler_tasks[handler_name]
            for task_instance in task_instances:
                task_instance.cancel()
                # try:
                #     await task_instance
                # except asyncio.CancelledError:
                #     print('Cancelled handler task', task_instance)

    async def disconnect(self, code):
        joined_groups = copy.copy(self.joined_groups)
        for group_name in joined_groups:
            await self.leave_group(group_name)
        self.joined_groups.clear()
        await self.clear_handler_tasks()

    async def leave_group(self, group_name):
        await self.channel_layer.group_discard(
            group_name, self.channel_name
        )
        self.joined_groups.remove(group_name)

    async def join_group(self, group_name):
        await self.channel_layer.group_add(
            group_name, self.channel_name
        )
        self.joined_groups.add(group_name)

    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'

        await self.join_group(self.room_group_name)
        await self.accept()

    async def receive(self, text_data=None, bytes_data=None):
        text_json = json.loads(text_data)
        message = text_json['message'].strip()
        if message.endswith('1'):

            await self.channel_layer.group_send(self.room_group_name, {
                'type': 'chat_message',
                'message': message,
            })
        elif message.endswith('2'):
            await self.channel_layer.group_send(self.room_group_name, {
                'type': 'chat_message2',
                'message': message,
            })
        else:
            await self.send(text_data=json.dumps({
                'message': 'invalid data'
            }))

    async def chat_message(self, event):
        message = event['message']

        while True:
            print('sending...')
            await self.send(text_data=json.dumps({
                'message': message
            }))
            await asyncio.sleep(1)

    async def chat_message2(self, event):
        message = event['message']
        await self.send(text_data=json.dumps({
            'message': message
        }))

In this implementation, you can send message such as "g1" to start the long running handler(chat_message), and still you can send "g2" to start the chat_message2 handler, if you send other data not endswith 1 or 2, you will receive a "invalid message" reply.

Client intput and output:

(venv) localhost:light Felix$ python manage.py chat_client
connected
g1
Received:  {"message": "g1"}
gReceived:  {"message": "g1"}
Received:  {"message": "g1"}
2
Received:  {"message": "g2"}
Received:  {"message": "g1"}
g2
Received:  {"message": "g2"}
Received:  {"message": "g1"}
g2
Received:  {"message": "g2"}
Received:  {"message": "g1"}
g1
Received:  {"message": "MAX_ACTIVE_TASKS reached"}
Received:  {"message": "g1"}
gReceived:  {"message": "g1"}
Received:  {"message": "g1"}
g1
Received:  {"message": "MAX_ACTIVE_TASKS reached"}
Received:  {"message": "g1"}
gReceived:  {"message": "g1"}
x
Received:  {"message": "invalid data"}
Received:  {"message": "g1"}
gxReceived:  {"message": "g1"}

Received:  {"message": "invalid data"}
Received:  {"message": "g1"}
Received:  {"message": "g1"}
^CDone

Server input and output:


Django version 3.1.5, using settings 'light.settings'
Starting ASGI/Channels version 3.0.3 development server at http://127.0.0.1:8000/
Quit the server with CONTROL-C.
WebSocket HANDSHAKING /ws/chat/cc/ [127.0.0.1:57667]
WebSocket CONNECT /ws/chat/cc/ [127.0.0.1:57667]
sending...
sending...
sending...
Complete task for handler chat_message2, task instance <Task finished name='Task-17' coro=<ChatConsumer.chat_message2() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:116> result=None>
There are still 0 active tasks for handler chat_message2
sending...
Complete task for handler chat_message2, task instance <Task finished name='Task-23' coro=<ChatConsumer.chat_message2() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:116> result=None>
There are still 0 active tasks for handler chat_message2
sending...
Complete task for handler chat_message2, task instance <Task finished name='Task-29' coro=<ChatConsumer.chat_message2() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:116> result=None>
There are still 0 active tasks for handler chat_message2
sending...
sending...
sending...
sending...
sending...
sending...
sending...
sending...
sending...
sending...
WebSocket DISCONNECT /ws/chat/cc/ [127.0.0.1:57667]
Complete task for handler chat_message, task instance <Task cancelled name='Task-11' coro=<ChatConsumer.chat_message() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:106>>
There are still 0 active tasks for handler chat_message
Cancelled handler task <Task cancelled name='Task-11' coro=<ChatConsumer.chat_message() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:106>>

It's a full duplex websocket consumer now, we can send data to websocket server at any time and get response, and websocket server can send data at any time too. The long running consumer handler will not block the consumer in the new implementation.

So, are you interested to implement it at the channels framework level ? @carltongibson @andrewgodwin

tinylambda avatar Feb 03 '21 08:02 tinylambda

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

You Saved me from a lot of troubles, May I ask why asgiref is the issue and how you found it ?

jahanzeb-pixarsart avatar May 19 '21 06:05 jahanzeb-pixarsart

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

Thanks!! This worked for me.

codingfarhan avatar Jun 30 '21 12:06 codingfarhan

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

#205, same issue I met, maybe there's something about djangorestframework+channels+django+asgiref(new version)

belongwqz avatar Jul 02 '21 02:07 belongwqz

We're having the same problem using GraphQL so I'd remove djangorestframework from the picture πŸ€”

lgelmi avatar Jul 22 '21 18:07 lgelmi

We're having the same problem using GraphQL so I'd remove djangorestframework from the picture πŸ€”

Experiencing the same with GraphQL 😭

benedictkioko avatar Jul 28 '21 14:07 benedictkioko

Has this been solved, because i have the same issue and i haven't been able to fix it

husam-lune avatar Aug 29 '21 04:08 husam-lune

in my case it was that the disk space was full and i just freed up some space and it was solved !

mohsenkamini avatar Oct 25 '21 10:10 mohsenkamini

For anyone else with this error: my solution was to downgrade aioredis to 1.3.0. The newest version (2.0) is a complete rewrite and requires restructure of code, which I believe channels-redis has not taken into account when requiring aiosredis.

neethan avatar Dec 01 '21 13:12 neethan

I have a Django project. I'm getting cpu spikes to 100% and many many of these logs at the same time,

2021-12-05 13:18:34,912 WARNING  Application instance <Task pending coro=<AsgiHandler.__call__() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7fe6bf3625e8>()]>> for connection <WebRequest at 0x7fe6bf361898 method=GET uri=/**********  
 clientproto=HTTP/1.1> took too long to shut down and was killed.


2021-12-05 13:18:39,921 WARNING  Application instance <Task pending coro=<AsgiHandler.__call__() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7fe6be55b6a8>()]>> for connection <WebRequest at 0x7fe6be55a898 method=GET uri=/********** 
 clientproto=HTTP/1.1> took too long to shut down and was killed.


2021-12-05 13:18:39,922 WARNING  Application instance <Task pending coro=<AsgiHandler.__call__() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7fe6be300ac8>()]>> for connection <WebRequest at 0x7fe6be30fd68 method=GET uri=/********** 
 clientproto=HTTP/1.1> took too long to shut down and was killed.

requirements.txt

django==2.2
djangorestframework
mysqlclient==1.3.13
django-push-notifications
markdown
python-dotenv
twilio==6.10.0
gunicorn==19.8.1
django-modeladmin-reorder
django-cors-headers==3.2.1
python-dateutil==2.8.1
channels==2.4.0
channels-redis==2.4.2
daphne==2.4.1
redis
pycountry==19.8.18
country-list==1.0.0
django-admin-rangefilter==0.6.0
django-import-export==2.1.0
django-storages==1.9.
Pillow==8.2.0
boto3==1.17.102
apns2==0.5.0

pip freeze

aioredis==1.3.1
apns2==0.5.0
asgiref==3.4.0
async-timeout==3.0.1
attrs==21.2.0
autobahn==21.2.1
Automat==20.2.0
boto3==1.17.102
botocore==1.20.102
certifi==2021.5.30
cffi==1.14.5
channels==2.4.0
channels-redis==2.4.2
chardet==4.0.0
constantly==15.1.0
country-list==1.0.0
cryptography==3.4.7
daphne==2.4.1
defusedxml==0.7.1
diff-match-patch==20200713
Django==2.2
django-admin-rangefilter==0.6.0
django-cors-headers==3.2.1
django-import-export==2.1.0
django-modeladmin-reorder==0.3.1
django-push-notifications==2.0.0
django-storages==1.9
djangorestframework==3.12.4
et-xmlfile==1.1.0
gunicorn==19.8.1
h2==2.6.2
hiredis==2.0.0
hpack==3.0.0
http-ece==1.1.0
hyper==0.7.0
hyperframe==3.2.0
hyperlink==21.0.0
idna==2.10
importlib-metadata==4.6.0
incremental==21.3.0
jmespath==0.10.0
Markdown==3.3.4
MarkupPy==1.14
msgpack==0.6.2
mysqlclient==1.3.13
odfpy==1.4.1
openpyxl==3.0.7
Pillow==8.2.0
py-vapid==1.8.2
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycountry==19.8.18
pycparser==2.20
PyJWT==2.1.0
pyOpenSSL==20.0.1
PySocks==1.7.1
python-dateutil==2.8.1
python-dotenv==0.18.0
pytz==2021.1
pywebpush==1.13.0
PyYAML==5.4.1
redis==3.5.3
requests==2.25.1
s3transfer==0.4.2
service-identity==21.1.0
six==1.16.0
sqlparse==0.4.1
tablib==3.0.0
twilio==6.10.0
Twisted==21.2.0
txaio==21.2.1
typing-extensions==3.10.0.0
urllib3==1.26.6
xlrd==2.0.1
xlwt==1.3.0
zipp==3.4.1
zope.interface==5.4.0

Please help me out over here, Thanks!

Avramo avatar Dec 05 '21 14:12 Avramo

I have a similar situation, but not the same I have:

django== 2.2.9 channels == 2.4.0 daphne == 2.5.0 I am using Redis as backend with the code:

CHANNEL_LAYERS = { "default": { "BACKEND": "channels_redis.core.RedisChannelLayer", "CONFIG": { "hosts": [(CELERY_BROKER_URL)], 'capacity': 300 }, }, } ASGI_THREADS = 1000 Using Daphne for Http and WebSocket

My asgi file:

import django django.setup() import chat.routing os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings') from channels.http import AsgiHandler from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter application = ProtocolTypeRouter({ 'websocket': AuthMiddlewareStack( URLRouter( chat.routing.websocket_urlpatterns ) ), }) Here is the error message: Application instance <Task pending coro=<AsgiHandler.call() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7f301c57f738>()]>> for connection <WebRequest at 0x7f301c56a710 method=POST uri=/api/v1/analytics/divided/meteo/ clientproto=HTTP/1.0> took too long to shut down and was killed. This happens when Fron-end sends requests to the Django. Sometimes it returns the response immediately, sometimes it takes a long time, frequently it throws the above error. I thought that it because of worker's concurrency, which is responsible for HTTP requests.

It is not the same case as mentioned above, because:

I am using generic "WebsocketConsumer" because querying data from the database via Django's ORM Error occurs because of AsgiHandler.call() on HTTP request. Websocket connection is not even established Can you explain why I have this issue? I thought the reason is django's version incompatibility

@ReadMost I am facing for some weeks the same problem. How did you solve it?

alvaro-alonso avatar Dec 07 '21 07:12 alvaro-alonso

@alvaro-alonso I have written my solution on the top or here is link

ReadMost avatar Dec 07 '21 09:12 ReadMost

I managed to resolve this issue in local dev without needing to use both wsgi and asgi servers together (channels==3.0.4, asgiref==3.5.0, Django==4.0.3). I tried a combination of:

  1. using Reconnecting WebSockets on the client side, as suggested by destelio. For example:
const options = {
  WebSocket: WebSocket,
  connectionTimeout: 10000,
  maxRetries: 3,
  minUptime: 2000,
  maxEnqueuedMessages: 1000000,
};

const newSocket = new ReconnectingWebSocket(
  'ws://' +
  window.location.host +
  '/ws/somepath/' +
  roomName +
  '/' +
  '?' +
  queryString,
  [],
  options
);
  1. Setting thread_sensitive to False globally, and setting it to True manually for ORM operations. i.e. using @sync_to_async(thread_sensitive=True) as a decorator kwarg for sync functions involving writing to the database.

  2. If you have a connect() method that involves long running tasks, simply call await self.accept() as early as possible within the method, right below await self.channel_layer.group_add().

Initially, this worked fine for connection timeouts. However, in my particular case, my disconnect method requires a relatively lengthy ORM operation involving celery workers. Therefore, after switching to a remote database, the disconnect method became an issue once again.

Fortunately, there is a simple workaround.

Instead of using the development server β€” i.e. python manage.py runserver β€” run it in production mode with Daphne. That way, you can specify a timeout interval via the --application-close-timeout flag.

Simply run:

daphne -t 60 --application-close-timeout 60 your_project.asgi:application

The -t flag is for --http-timeout. It may not be necessary.

Of course, with this approach, you will need to serve any local static files through `STATIC_ROOT'. Whitenoise can be used to serve static files. It will work with an ASGI server (within Django anyway. Outside of Django I don't think it supports ASGI). If you don't want file caching, replace their suggested STATICFILES_STORAGE with:

STATICFILES_STORAGE = 'whitenoise.storage.CompressedStaticFilesStorage'

Hope this helps someone. Thanks to everyone for such an awesome projectπŸ™ πŸ’―

Saran33 avatar Mar 24 '22 16:03 Saran33

We're having the same problem using GraphQL so I'd remove djangorestframework from the picture πŸ€”

Experiencing the same with GraphQL 😭

@benedictkioko I have the same problem using graphene_django + channels, i solved it use the command "daphne -b 0.0.0.0 -p 8000 --application-close-timeout 60 --proxy-headers core.asgi:application", hope this can help someone.

daphne help doc: --application-close-timeout APPLICATION_CLOSE_TIMEOUT The number of seconds an ASGI application has to exit after client disconnect before it is killed

Davelyb avatar Jun 23 '22 03:06 Davelyb

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

I changed from asgiref==3.2.7 to asgiref==3.2.10 worked for me on django 2.2

razzaghi avatar Jun 23 '22 11:06 razzaghi

I am not sure if my issue is the same, but it results in a similar error. In my case, I am running my Django Channels ASGI app using Daphne, and I have a couple API calls that call an external API. I found that when that external API errored out and I did not have a timeout set on the requests call, it would hang the Daphne server. While I did have a flaw in my code, I do not think that such a flaw should render the entire server unresponsive and not able to be restarted through systemctl. Note that when I switched to uvicorn, I no longer had an issue even with the misbehaving code. In my real application, I have of course fixed my external API calls to have an explicit timeout. Here is a simplified repro case based on my real application: https://github.com/jessamynsmith/daphne_hang/

Here is my exact error: 2023-02-14 05:01:42,735 WARNING Application instance <Task pending name='Task-6' coro=<ProtocolTypeRouter.call() running at /Users/jessamynsmith/Development/daphne_hang/venv/lib/python3.11/site-packages/channels/routing.py:62> wait_for=<Future pending cb=[_chain_future.._call_check_cancel() at /Users/jessamynsmith/.pyenv/versions/3.11.0/lib/python3.11/asyncio/futures.py:387, Task.task_wakeup()]>> for connection <WebRequest at 0x108603890 method=GET uri=/api/v1/hang/ clientproto=HTTP/1.1> took too long to shut down and was killed.

jessamynsmith avatar Feb 25 '23 15:02 jessamynsmith