channels_redis
channels_redis copied to clipboard
(under load) getting CancelledError with latest version of channels/redis acquiring lock
django==2.2.13 channels==3.0.3 channels-redis==3.2.0 daphne==3.0.2
NGINX for HTTP and DAPHNE to WSS, Python 3.6.4 AWS Linux
We are seeing sporadically in the middle of the day (our tenants busiest times) exceptions that break our Daphne Websocket implementation with the following exception:
Exception inside application: Lock is not acquired.
Traceback (most recent call last):
File "/opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/channels_redis/core.py", line 469, in receive
real_channel
File "/opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/channels_redis/core.py", line 518, in receive_single
await self.receive_clean_locks.acquire(channel_key)
File "/opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/channels_redis/core.py", line 160, in acquire
return await self.locks[channel].acquire()
File "/opt/python3.6/lib/python3.6/asyncio/locks.py", line 176, in acquire
yield from fut
concurrent.futures._base.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/channels/routing.py", line 71, in __call__
return await application(scope, receive, send)
File "/opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/channels/sessions.py", line 47, in __call__
return await self.inner(dict(scope, cookies=cookies), receive, send)
File "/opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/channels/sessions.py", line 254, in __call__
return await self.inner(wrapper.scope, receive, wrapper.send)
File "/opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/channels/auth.py", line 181, in __call__
return await super().__call__(scope, receive, send)
File "/opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/channels/middleware.py", line 26, in __call__
return await self.inner(scope, receive, send)
File "/opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/channels/routing.py", line 160, in __call__
send,
File "/opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/channels/consumer.py", line 94, in app
return await consumer(scope, receive, send)
File "/opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/channels/consumer.py", line 59, in __call__
[receive, self.channel_receive], self.dispatch
File "/opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/channels/utils.py", line 58, in await_many_dispatch
await task
File "/opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/channels_redis/core.py", line 481, in receive
self.receive_lock.release()
File "/opt/python3.6/lib/python3.6/asyncio/locks.py", line 201, in release
raise RuntimeError('Lock is not acquired.')
RuntimeError: Lock is not acquired.
I have also seen this error:
*** BEGIN MESSAGE ***
Jun 24 11:08:05 internal-applications-web1 toogo: [pid 7002] [version 121.4.20.5315] [tenant_id -] [domain_name -] [pathname /opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/daphne/server.py] [lineno 293] [priority ERROR] [funcname application_checker] [request_path -] [request_method -] [request_data -] [request_user -] [request_stack -] Exception inside application: Lock is not acquired.
Traceback (most recent call last):
File "/opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/channels_redis/core.py", line 469, in receive
real_channel
File "/opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/channels_redis/core.py", line 524, in receive_single
index, channel_key, timeout=self.brpop_timeout
File "/opt/toogo/releases/r121.4.20.5315/env/lib/python3.6/site-packages/channels_redis/core.py", line 361, in _brpop_with_clean
result = await connection.bzpopmin(channel, timeout=timeout)
concurrent.futures._base.CancelledError
I am able to create this error, using this code.
import sys
import asyncio
import websockets
async def test_socket(n, uri):
print('Coro {0}'.format(n))
async with websockets.connect(uri, close_timeout=5, ping_interval=5, ping_timeout=5) as websocket:
payload = '{"stream":"ping", "payload":{}}'
await websocket.send(payload)
response = await asyncio.wait_for(websocket.recv(), timeout=5)
print('Channels/daphne for {0} responded {1}'.format(uri, response))
async def main():
uri = sys.argv[1]
num = 100
funcs = []
for i in range(0, num):
funcs.append(asyncio.create_task(test_socket(i, uri)))
await asyncio.gather(*funcs)
asyncio.run(main())
Has anyone seen this and can shed some light on the problem?