celery icon indicating copy to clipboard operation
celery copied to clipboard

Broadcasting with Redis broker

Open andreif opened this issue 10 years ago • 22 comments

Hi, I am looking into possibility to broadcast with Redis and wonder if anything has changed since Ask's answer two years ago:

On Tuesday, May 28, 2013 at 5:03:19 PM UTC+2, Ask Solem wrote:

On May 27, 2013, at 5:04 PM, Maxime Verger wrote:

Hi everybody,

I'm trying to broadcast a task on every worker. Here is my configuration:

celery==3.0.12 gevent==0.13.8 redis==2.7.2

CELERY_CREATE_MISSING_QUEUES = True CELERY_QUEUES = (Broadcast('broadcast_tasks'), ) CELERY_ROUTES = { 'tasks.reload_cache': {'queue': 'broadcast_tasks'}, }

I'm launching celery workers using this command:

./manage.py celeryd_multi start celery1 celery2 celery3 priority export -Q:celery1 celery -Q:celery2 celery ..snip..

Moreover I saw that redis supports PUB / SUB mode for fanout. Maybe it's not working because of that, but I don't know how to configure it, if someone has a clue, I'd be happy !

I don't think the fanout exchange type works with Redis and Celery at this time. The fanout exchanges must be consumed on from a different connection (to use LISTEN), and the worker does not currently do this (it only does so for the remote control command, I guess fanout task queues could share the same connection as remote control).

Is broadcasting with Redis theoretically possible? If yes, what needs to be done? If no, why?

andreif avatar Jun 01 '15 19:06 andreif

Also curious about this. I tried it and it didn't appear to be functioning.

ericholscher avatar Jun 02 '15 15:06 ericholscher

BTW, the docs are saying The RabbitMQ and Redis broker transports are feature complete which probably needs to be changed. Is broadcasting the only missing part or there are more things Redis cannot support atm?

andreif avatar Jun 03 '15 08:06 andreif

@ask Is gossip already implemented for Redis?

thedrow avatar Jul 11 '15 09:07 thedrow

any word on this? still doesnt seem to be working

tekneeq avatar Dec 13 '15 08:12 tekneeq

+1, just stumbled into not being able to send broadcast jobs to celery+redis.

colbygk avatar Jan 27 '16 21:01 colbygk

Any updates on this?

genxstylez avatar May 27 '16 20:05 genxstylez

my WA for redis: Queue('bcast', Exchange('bcast', type='fanout'), routing_key='bcast')

bsavelev avatar Jun 01 '16 17:06 bsavelev

Closing this, as we don't have the resources to complete this task.

ask avatar Jun 24 '16 00:06 ask

I just realised that I need broadcast with redis broker.

I have played with configuration at length - config looks good/makes sense, but I can't get tasks to actually enqueue. What I don't really understand is that Broadcast in Kombu looks like it should work... I don't really understand where it is going wrong.

DDevine avatar Aug 07 '17 05:08 DDevine

Can we please at least update the docs to say that currently Redis does not support broadcast? 🤔

EDIT: typo.

lzfelix avatar Oct 07 '19 13:10 lzfelix

@lzfelix PRs welcome :)

thedrow avatar Oct 12 '19 17:10 thedrow

Sure, thanks! I'll add that to my todo list =)

lzfelix avatar Oct 13 '19 18:10 lzfelix

Seems like broadcast messages with Redis broker works with Celery 4.4.2 and Redis 6.0.3 (I used this one).

zentavr avatar May 21 '20 03:05 zentavr

that's great to know! did you try it?

auvipy avatar May 21 '20 08:05 auvipy

@auvipy yes, I'm building the app with Flask and Celery right now and tested that 12 hrs ago.

There is a problem which I found and the problem is that if Redis broker crashes while celery worker is running - Celery worker won't be able to receive broadcast tasks after it reconnects to the same broker again.

zentavr avatar May 21 '20 15:05 zentavr

would be great if you can find out a solution and send an initial PR

auvipy avatar May 21 '20 15:05 auvipy

@auvipy is this issue of "broadcasting with redis" fixed?

Himanshi1997 avatar Feb 28 '21 13:02 Himanshi1997

@auvipy is this issue of "broadcasting with redis" fixed?

I am not sure. But I don;t tink so. but youcan check https://github.com/celery/celery/issues/2638#issuecomment-631862957

auvipy avatar Mar 02 '21 07:03 auvipy

We haven't actively worked on this. We'd appreciate some help on the matter.

thedrow avatar Mar 29 '21 08:03 thedrow

@thedrow Are you able to scope this work ? I say that because there are quite a few questions surrounding the functionality of what this ticket is asking.

I think there is a big problem with documentation surrounding broadcast (really just a fanout exchange) and what that means for redis. I don't mind submitting a PR to update documentation, but also don't want to go wasting a bunch of time if the consensus is that it isn't a problem.

My biggest complaint would be that because redis doesn't adhere to the AMQP protocol it creates a lot of confusion when talking about "exchanges" "queues" and redis is the broker, because Kombu tries to mask these amqp concepts in redis. Thats where Pub/Sub and the fanout/topic exchanges do not work the same in terminologies. The concepts are similar but not the same. This is why this line breaks broadcast queues for redis backend, but not the rabbitmq or other backends adhering closer to the amqp standard.

The workaround provided by @bsavelev works great and is probably the easiest "solution" to anyone that wants fanout exchange functionality. If someone requires to use the Broadcast class which i don't see, they could create the producer ahead of time and declare the exchange like so:

broadcast_queue = Broadcast(name='broadcast_tasks', queue='bcast', unique=False)
producer = Producer(Connection('redis://:**@localhost:6379/10')
maybe_declare(broadcast_queue.exchange, producer.channel)

do_some_task.apply_async(producer=producer)

If you are open to the idea of switching up the logic in amqp.py

if declare is None and queue and not isinstance(queue, Broadcast): declare = [queue]

to something like

declare is None and queue and (not isinstance(queue, Broadcast) or isinstance(conn.transport, redis.Transport)): declare = [queue]

or something to that effect then i can open a PR, but i'm no celery core dev so i don't know all the implications involved.

flipyap avatar Jul 26 '22 04:07 flipyap

you can come with a proof of concept draft PR

auvipy avatar Jul 26 '22 16:07 auvipy

solution:

first solution

 broadcast_tasks_queue = Broadcast('broadcast_tasks')
 app.conf.task_queues = (broadcast_tasks_queue,)
 app.conf.task_routes = {
     'tasks.broadcast_fn': {
         'queue': 'broadcast_tasks',
         'exchange': 'broadcast_tasks',
+        'routing_key': 'celery'
     }
 }
 app.conf.worker_pool = 'solo'
 app.conf.worker_concurrency = 1

send task:

broadcast_fn.apply_async(declare=[broadcast_tasks_queue])

second solution

 broadcast_tasks_queue = Broadcast('broadcast_tasks')
 app.conf.task_queues = (broadcast_tasks_queue,)
 app.conf.task_routes = {
     'tasks.broadcast_fn': {
         'queue': 'broadcast_tasks',
         'exchange': 'broadcast_tasks',
+        'routing_key': 'celery'
     }
 }
 app.conf.worker_pool = 'solo'
 app.conf.worker_concurrency = 1
+# The following code only needs to be defined in the sender, not required in the task receiver
+with app.producer_or_acquire() as p:
+    broadcast_tasks_queue.declare(channel=p.channel)

send task:

broadcast_fn.delay()

akkuman avatar Feb 04 '24 09:02 akkuman