channels_redis icon indicating copy to clipboard operation
channels_redis copied to clipboard

Update to v4 results in "RuntimeError: Event loop is closed"

Open stumpylog opened this issue 3 years ago • 5 comments

After upgrading to channels-redis==4.0.0, our celery tasks are all reporting the following traceback:

future: <Task finished name='Task-9' coro=<Connection.disconnect() done, defined at /usr/local/lib/python3.9/site-packages/red                                                                                                                                                                is/asyncio/connection.py:819> exception=RuntimeError('Event loop is closed')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/redis/asyncio/connection.py", line 828, in disconnect
    self._writer.close()  # type: ignore[union-attr]
  File "/usr/local/lib/python3.9/asyncio/streams.py", line 353, in close
    return self._transport.close()
  File "/usr/local/lib/python3.9/asyncio/selector_events.py", line 698, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 751, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 515, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
[2022-10-12 07:30:31,972] [ERROR] [asyncio] Task exception was never retrieved

Downgrading the image to channels-redis==3.4.1 resolves the issue, so I'm starting out here. This seems probably related to #312.

Image OS is Debian Bullseye, amd64. The django application is running with gunicorn.

Probably related packages:

celery==5.2.7
channels==3.0.5
channels-redis==4.0.0
hiredis==2.0.0
redis==4.3.4

Full Pipfile.lock: https://github.com/paperless-ngx/paperless-ngx/blob/dev/Pipfile.lock

stumpylog avatar Oct 12 '22 15:10 stumpylog

I’m seeing the exact same thing. Seem like the same issue as reported in https://github.com/django/channels_redis/issues/312

ljodal avatar Oct 12 '22 15:10 ljodal

There was lots of discussion on the other issue. If you could investigate that would be handy.

A minimal reproduce would help too. It's hard to say anything without one. 🤔

carltongibson avatar Oct 12 '22 16:10 carltongibson

The application is inherited, so I'm not really sure how the whole channels thing works yet or what connects where.

At best so far I can narrow it down to the only task which uses async_to_sync to send status updates. It will be called multiple times (roughly 10) to update a status from 0% to 100%

stumpylog avatar Oct 12 '22 17:10 stumpylog

If I use ahaltindis's workaround detailed here, my code works. So it definitely seems to be a conflict/issue with async_to_sync.

loop = asyncio.get_event_loop()
coroutine = channel_layer.group_send(
    group_name,
    {
        'type': 'task.message',
        'text': context
    })
loop.run_until_complete(coroutine)

Perhaps channels, or channels_redis documentation could be improved, to provide an example of how users should trigger a group_send from a synchronous context (such that several calls can be made)? Perhaps a 'sync' version of group_send could be made available?

aaronmader avatar Oct 12 '22 19:10 aaronmader

I'm pretty sure the issue we have is very similar to what's reported above. We do this async_to_sync(channel_layer.group_send)(group_name, channel_layer_payload) from a sync gunicorn worker and I think that's where this stems from. The stack trace doesn't go back to our code though, so kinda hard to tell exactly.

ljodal avatar Oct 13 '22 08:10 ljodal

I also get this issue after I upgrade to 4.0.0

nblam1994 avatar Oct 19 '22 20:10 nblam1994

Same here and sometimes

Task exception was never retrieved
future: <Task finished name='Task-378' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError("Task <Task pending name='Task-378' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 831, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-378' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop
Task exception was never retrieved
future: <Task finished name='Task-381' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError("Task <Task pending name='Task-381' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 831, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-381' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop
Task exception was never retrieved
future: <Task finished name='Task-382' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError("Task <Task pending name='Task-382' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 831, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-382' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop

Elabbasy00 avatar Oct 29 '22 18:10 Elabbasy00

Any chance of a minimal reproduce? It's hard to say anything with just a traceback

carltongibson avatar Oct 29 '22 19:10 carltongibson

asgiref==3.5.2
channels==4.0.0
channels-redis==4.0.0
daphne==4.0.0

views.py

# passing request.data to serializer
if serializer.is_valid():
                serializer.save(interpreter=request.user)
                channel_layer = get_channel_layer()
                async_to_sync(channel_layer.group_send)(
                    "admins", {"type": "chat_message",  'message': {"command": "admin_new_ticket","ticket": serializer.data}})
                return Response(serializer.data, status=status.HTTP_200_OK)

and just normal consumer


import json
from channels.generic.websocket import AsyncWebsocketConsumer


class AdminsConsumer(AsyncWebsocketConsumer):

    async def connect(self):
        self.room_group_name = 'admins'

        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()

    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(self.room_group_name, self.channel_name)

    # Receive message from WebSocket
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json["message"]

        # Send message to room group
        await self.channel_layer.group_send(
            self.room_group_name, {"type": "chat_message", "message": message}
        )

    # Receive message from room group
    async def chat_message(self, event):
        message = event["message"]

        # Send message to WebSocket
        await self.send(text_data=json.dumps({"message": message}))

Elabbasy00 avatar Oct 29 '22 19:10 Elabbasy00

same issue here,

Task exception was never retrieved
future: <Task finished name='Task-90' coro=<Connection.disconnect() done, defined at /usr/local/lib/python3.8/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError('Event loop is closed')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/redis/asyncio/connection.py", line 828, in disconnect
    self._writer.close()  # type: ignore[union-attr]
  File "/usr/local/lib/python3.8/asyncio/streams.py", line 353, in close
    return self._transport.close()
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 692, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

dependencies:

channels==4.0.0
channels-redis==4.0.0
daphne==4.0.0
Django==3.2.16
django-redis==5.0.0
redis==4.3.4

code:

async_to_sync(channel_layer.group_send)(
    group_name,
    {"type": "store.manager.handout", "data": {"type": request.POST["message_type"]}}
)

this comment did not worked either, I'm getting:

There is no current event loop in thread 'ThreadPoolExecutor-0_0'.

abtinmo avatar Nov 06 '22 17:11 abtinmo

Any update this issue. I'm still face this issue

bachloxo avatar Nov 07 '22 02:11 bachloxo

as a workaround Downgrades channels-redis to 3.4.1,

this work for me

Elabbasy00 avatar Nov 07 '22 03:11 Elabbasy00

Any update this issue. I'm still face this issue

me too

ilysenko avatar Nov 14 '22 16:11 ilysenko

@carltongibson You can also reproduce the error using this repo:

https://github.com/realsuayip/zaida/tree/490e0c5a49a750bc56a63f9cba5c9514ed91eee4

Steps to reproduce:

1 - Clone the repo 2 - Run "python3 docker.py up" 3 - Once all the containers are running, run "python3 docker.py test"

Hope it helps.

realsuayip avatar Nov 14 '22 19:11 realsuayip

Hoping we can resolve this soon

dvf avatar Nov 19 '22 04:11 dvf

Can we stop with the "me too" and "any update" comments please.

If you have significant new information to add then please do. (Great!) Otherwise it's just noise.

I'm planning new releases over the new year, and looking into this is part of that.

carltongibson avatar Nov 19 '22 08:11 carltongibson

@carltongibson this a major breaking issue so I'm just trying to bring it more attention.

Tomorrow I'll go through the source and try make a contribution.

dvf avatar Nov 19 '22 08:11 dvf

I am experimenting this issue within a daphne instance which only runs consumers for websockets using a customized (derived) version of RedisChannelLayer.

The code which is execute does (shall?) not have any calls to async_to_sync but it uses database_sync_to_async (multiple times while processing a single message).

Here are my requirements:

# python 3.9
asgiref==3.5.2
Django==3.2.16
channels==4.0.0
channels-redis==4.0.0

The produced error message is very similar to that reported in https://github.com/aio-libs/aioredis-py/issues/1103 (for what I have seen the fix for that problem was ported also in redis-py).

Maybe the problem is caused by the way this layer tries to close/clear the connection pool.

sevdog avatar Nov 23 '22 16:11 sevdog

For anyone else with this issue, it doesn't exhibit when using the newer RedisPubSubChannelLayer. It's not in the Channels docs because it's still in beta.

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.pubsub.RedisPubSubChannelLayer",
        "CONFIG": {
            "hosts": ["rediss://***:***@***.com:6379"],
        },
    },
}

dvf avatar Nov 23 '22 19:11 dvf

CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": ['redis://:[email protected]:6379/2'],
        }
    }
}

Not sure if this is related but with the above Redis broker config I get the following RuntimeError

Task exception was never retrieved
future: <Task finished name='Task-127' coro=<Connection.disconnect() done, defined at myapp/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:687> exception=RuntimeError("Task <Task pending name='Task-127' coro=<Connection.disconnect() running at myapp/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:700>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "myapp/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 700, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-127' coro=<Connection.disconnect() running at myapp/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:700>> got Future <Future pending> attached to a different loop

however with Redis sentinels for the channel layers config

CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [
                {
                    "sentinels": [('myserver1.example.com', 26379), ('myserver2.example.com'', 26379), ('myserver3.example.com'', 26379)],
                    "master_name": "mymaster",
                    "sentinel_kwargs": {'password': 'mysentinelpassword'},
                    "password": 'myredispassword',
                    "db": 2
                }

            ]

        }
    }
}

I get the following error:

Traceback (most recent call last):
  File "myapp/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 567, in connect
    await self.retry.call_with_retry(
  File "myapp/venv/lib/python3.10/site-packages/redis/asyncio/retry.py", line 59, in call_with_retry
    return await do()
  File "myapp/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 604, in _connect
    reader, writer = await asyncio.open_connection(
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/streams.py", line 47, in open_connection
    transport, _ = await loop.create_connection(
  File "uvloop/loop.pyx", line 2039, in create_connection
  File "uvloop/loop.pyx", line 2016, in uvloop.loop.Loop.create_connection
ConnectionRefusedError: [Errno 61] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "myapp/./server/viewsets/product.py", line 132, in post
    tasks.update_product_quantity(product_id=product_obj.id,
  File "myapp/./server/tasks.py", line 57, in update_product_quantity
    loop.run_until_complete(coroutine)
  File "uvloop/loop.pyx", line 1517, in uvloop.loop.Loop.run_until_complete
  File "myapp/venv/lib/python3.10/site-packages/channels_redis/core.py", line 548, in group_send
    await connection.zremrangebyscore(
  File "myapp/venv/lib/python3.10/site-packages/redis/asyncio/client.py", line 502, in execute_command
    conn = self.connection or await pool.get_connection(command_name, **options)
  File "myapp/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 1363, in get_connection
    await connection.connect()
  File "myapp/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 575, in connect
    raise ConnectionError(self._error_message(e))
redis.exceptions.ConnectionError: Error 61 connecting to localhost:6379. 61.

The above error occurs when I try to group_send a message:

channel_layer = get_channel_layer()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)


coroutine = channel_layer.group_send(f'my_message',
    {
        'type': 'send_update',
        'message': message
    })

loop.run_until_complete(coroutine)

I'm not sure why with sentinel it targets localhost and ignores the Redis sentinel config I provide (when this failure occurs).

K-MTG avatar Dec 29 '22 02:12 K-MTG

I'm not sure why with sentinel it targets localhost and ignores the Redis sentinel config I provide (when this failure occurs).

Did some digging, the reason we see a different error with Sentinel is due to the config getting modified (per #341). If I perform a deep copy (host = copy.deepcopy(self.hosts[index])) in create_pool then I get the same RuntimeError with Sentinel.

K-MTG avatar Dec 29 '22 02:12 K-MTG

The issue here is not calling channel_layer.close_pools().

See https://github.com/django/channels/issues/1966#issuecomment-1400903327

I've tracked it down. The move to redis-py requires closing connections explicitly

Utilizing asyncio Redis requires an explicit disconnect of the connection ...

So to get the shell example to pass cleanly we need to do something like this:

>>> import channels.layers
>>> from asgiref.sync import async_to_sync
>>>
>>> channel_layer = channels.layers.get_channel_layer()
>>>
>>> async def closing_send(channel_layer, channel, message):
...     await channel_layer.send(channel, message)
...     await channel_layer.close_pools()
>>>
>>> async_to_sync(closing_send)(channel_layer,'test_channel', {'type': 'hello'})
>>> async_to_sync(channel_layer.receive)('test_channel')
{'type': 'hello'}

Even then we should do the same for the receive call really. (A further call would hit the same issue.)

If you're using aync_to_sync() you'll need to wrap calls in a function that ensures the close_pools() call is made at the end, since the event loop is shutdown when the concurrent context ends.

carltongibson avatar Jan 23 '23 20:01 carltongibson

Thanks @carltongibson I can confirm this fixes the problem

glenaddict avatar Jan 24 '23 21:01 glenaddict

@carltongibson thank you for the suggestion. However I am a bit concerned that this may be a bit dirty to force users to call this method when using async_to_sync because the close_pools method is only defined in RedisChannelLayer and is not part of any specification.

Having to make a direct call to .close_pools method is going to be a problem when someone is willingly to move to an other layer implementation (ie: from RedisChannelLayer to RedisPubSubChannelLayer.

I belive that the RedisChannelLayer should have a method like that defined in the pub-sub to intercept loop close event: https://github.com/django/channels_redis/blob/a7094c58a15cbf6e621b2129253060fc80cfdfad/channels_redis/pubsub.py#L15-L27

This way it will be transparent to users and more robust.

sevdog avatar Jan 25 '23 10:01 sevdog

@sevdog That may be worth it yes 🤔 — have you got a cycle to put that in a PR? (As per progress here, I've begun working towards the next releases, so it would be timely.)

carltongibson avatar Jan 25 '23 10:01 carltongibson

@carltongibson I will see if I can get a time slot for this and open a PR.

I would like to find a way to merge the connection management algorithms from RedisPubSubChannelLayer and RedisChannelLayer (IMO it does not make sense to have two completely different implementations of those logics), but it could take more time to handle.

sevdog avatar Jan 25 '23 11:01 sevdog

@sevdog That would be nice. There's no urgency though. poco a poco

carltongibson avatar Jan 25 '23 11:01 carltongibson

I have the same issue when sending messages over channels. I tried the solution of @carltongibson but implies that I need to re-create the connection each time before sending the message. Is this correct? Thanks for help!

btel avatar Feb 08 '23 09:02 btel

The channel layer encapsulates the connection handling, but connections are per event loop, and sync_to_async() creates a new event loop each time, so...

(It shouldn't really be an issue... If it's performance critical it's likely you're already looking at avoiding sync_to_async at all.)

carltongibson avatar Feb 08 '23 10:02 carltongibson

Thanks for quick reply. So if I understood correctly, to send a message through the channel outsider a consumer I should do something like:

async def closing_send(channel, message):
     channel_layer = channels.layers.get_channel_layer()
     await channel_layer.send(channel, message)
     await channel_layer.close_pools()
async_to_sync(closing_send)('test-channel", {"type": "hello"})

but the connections returned from get_channel_layer are cached (in channels.layers.channel_layers object) so if there are other threads using the connection (like websockets) they will be also disconnected.

I tested this and it seems to work fine:

async def closing_send(channel, message):
     channel_layer = channels.layers.channel_layers.make_backed(DEFAULT_CHANNEL_LAYER)
     await channel_layer.send(channel, message)
     await channel_layer.close_pools()
async_to_sync(closing_send)('test-channel", {"type": "hello"})

Did I get it right?

btel avatar Feb 08 '23 12:02 btel