celery
celery copied to clipboard
Broadcasting with Redis broker
Hi, I am looking into possibility to broadcast with Redis and wonder if anything has changed since Ask's answer two years ago:
On Tuesday, May 28, 2013 at 5:03:19 PM UTC+2, Ask Solem wrote:
On May 27, 2013, at 5:04 PM, Maxime Verger wrote:
Hi everybody,
I'm trying to broadcast a task on every worker. Here is my configuration:
celery==3.0.12 gevent==0.13.8 redis==2.7.2
CELERY_CREATE_MISSING_QUEUES = True CELERY_QUEUES = (Broadcast('broadcast_tasks'), ) CELERY_ROUTES = { 'tasks.reload_cache': {'queue': 'broadcast_tasks'}, }
I'm launching celery workers using this command:
./manage.py celeryd_multi start celery1 celery2 celery3 priority export -Q:celery1 celery -Q:celery2 celery ..snip..
Moreover I saw that redis supports PUB / SUB mode for fanout. Maybe it's not working because of that, but I don't know how to configure it, if someone has a clue, I'd be happy !
I don't think the fanout exchange type works with Redis and Celery at this time. The fanout exchanges must be consumed on from a different connection (to use LISTEN), and the worker does not currently do this (it only does so for the remote control command, I guess fanout task queues could share the same connection as remote control).
Is broadcasting with Redis theoretically possible? If yes, what needs to be done? If no, why?
Also curious about this. I tried it and it didn't appear to be functioning.
BTW, the docs are saying The RabbitMQ and Redis broker transports are feature complete which probably needs to be changed. Is broadcasting the only missing part or there are more things Redis cannot support atm?
@ask Is gossip already implemented for Redis?
any word on this? still doesnt seem to be working
+1, just stumbled into not being able to send broadcast jobs to celery+redis.
Any updates on this?
my WA for redis:
Queue('bcast', Exchange('bcast', type='fanout'), routing_key='bcast')
Closing this, as we don't have the resources to complete this task.
I just realised that I need broadcast with redis broker.
I have played with configuration at length - config looks good/makes sense, but I can't get tasks to actually enqueue. What I don't really understand is that Broadcast in Kombu looks like it should work... I don't really understand where it is going wrong.
Can we please at least update the docs to say that currently Redis does not support broadcast? 🤔
EDIT: typo.
@lzfelix PRs welcome :)
Sure, thanks! I'll add that to my todo list =)
Seems like broadcast messages with Redis broker works with Celery 4.4.2 and Redis 6.0.3 (I used this one).
that's great to know! did you try it?
@auvipy yes, I'm building the app with Flask and Celery right now and tested that 12 hrs ago.
There is a problem which I found and the problem is that if Redis broker crashes while celery worker is running - Celery worker won't be able to receive broadcast tasks after it reconnects to the same broker again.
would be great if you can find out a solution and send an initial PR
@auvipy is this issue of "broadcasting with redis" fixed?
@auvipy is this issue of "broadcasting with redis" fixed?
I am not sure. But I don;t tink so. but youcan check https://github.com/celery/celery/issues/2638#issuecomment-631862957
We haven't actively worked on this. We'd appreciate some help on the matter.
@thedrow Are you able to scope this work ? I say that because there are quite a few questions surrounding the functionality of what this ticket is asking.
I think there is a big problem with documentation surrounding broadcast (really just a fanout exchange) and what that means for redis. I don't mind submitting a PR to update documentation, but also don't want to go wasting a bunch of time if the consensus is that it isn't a problem.
My biggest complaint would be that because redis doesn't adhere to the AMQP protocol it creates a lot of confusion when talking about "exchanges" "queues" and redis is the broker, because Kombu tries to mask these amqp concepts in redis. Thats where Pub/Sub and the fanout/topic exchanges do not work the same in terminologies. The concepts are similar but not the same. This is why this line breaks broadcast queues for redis backend, but not the rabbitmq or other backends adhering closer to the amqp standard.
The workaround provided by @bsavelev works great and is probably the easiest "solution" to anyone that wants fanout exchange functionality. If someone requires to use the Broadcast class which i don't see, they could create the producer ahead of time and declare the exchange like so:
broadcast_queue = Broadcast(name='broadcast_tasks', queue='bcast', unique=False)
producer = Producer(Connection('redis://:**@localhost:6379/10')
maybe_declare(broadcast_queue.exchange, producer.channel)
do_some_task.apply_async(producer=producer)
If you are open to the idea of switching up the logic in amqp.py
if declare is None and queue and not isinstance(queue, Broadcast): declare = [queue]
to something like
declare is None and queue and (not isinstance(queue, Broadcast) or isinstance(conn.transport, redis.Transport)): declare = [queue]
or something to that effect then i can open a PR, but i'm no celery core dev so i don't know all the implications involved.
you can come with a proof of concept draft PR
solution:
first solution
broadcast_tasks_queue = Broadcast('broadcast_tasks')
app.conf.task_queues = (broadcast_tasks_queue,)
app.conf.task_routes = {
'tasks.broadcast_fn': {
'queue': 'broadcast_tasks',
'exchange': 'broadcast_tasks',
+ 'routing_key': 'celery'
}
}
app.conf.worker_pool = 'solo'
app.conf.worker_concurrency = 1
send task:
broadcast_fn.apply_async(declare=[broadcast_tasks_queue])
second solution
broadcast_tasks_queue = Broadcast('broadcast_tasks')
app.conf.task_queues = (broadcast_tasks_queue,)
app.conf.task_routes = {
'tasks.broadcast_fn': {
'queue': 'broadcast_tasks',
'exchange': 'broadcast_tasks',
+ 'routing_key': 'celery'
}
}
app.conf.worker_pool = 'solo'
app.conf.worker_concurrency = 1
+# The following code only needs to be defined in the sender, not required in the task receiver
+with app.producer_or_acquire() as p:
+ broadcast_tasks_queue.declare(channel=p.channel)
send task:
broadcast_fn.delay()