support redis cluster transport
Attempt to address https://github.com/celery/kombu/pull/1021
Thank you very much for your code, it helps us a lot. @auvipy
We use redis-py instead of redis-py-cluster because redis-py-cluster has been merged into redis-py. Celery works fine on our cluster with multi producers and multi consumers, when a node goes down, it can automatically switch.
celery==5.4.0
Django==4.1
django-celery-beat==2.7.0
django-celery-results==2.5.1
django-filter==24.3
django-redis==5.4.0
django-split-settings==0.3.0
django-timezone-field==5.1
djangorestframework==3.15.2
gunicorn==22.0.0
pyOpenSSL==21.0.0
redis==5.0.0
requests==2.22.0
uWSGI==2.0.18
app = Celery('celery_test', broker='rediscluster://:[email protected]:7024')
Codecov Report
Attention: Patch coverage is 86.76123% with 56 lines in your changes missing coverage. Please review.
Project coverage is 81.89%. Comparing base (
13e6938) to head (543cc93). Report is 12 commits behind head on main.
| Files with missing lines | Patch % | Lines |
|---|---|---|
| kombu/transport/rediscluster.py | 86.76% | 34 Missing and 22 partials :warning: |
Additional details and impacted files
@@ Coverage Diff @@
## main #2204 +/- ##
==========================================
+ Coverage 81.60% 81.89% +0.28%
==========================================
Files 77 78 +1
Lines 9540 9963 +423
Branches 1162 1238 +76
==========================================
+ Hits 7785 8159 +374
- Misses 1563 1591 +28
- Partials 192 213 +21
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
Only one test is failing, due to connection failure
@auvipy @Nusnus ping :) I fixed the issues I found recently and now this PR is ready. I'm not a kombu expert, any suggestions are welcome
Check linter error :)
Sorry for the failure, I will fix it and improve the test coverage.
I am following it and already had reviewed it twice. Will have an in depth review again tomorrow. No worries. Thanks for picking my work
https://github.com/celery/kombu/actions/runs/12492830091/job/34860983164?pr=2204
https://github.com/celery/kombu/actions/runs/12492830091/job/34860983164?pr=2204
Added docs for kombu.transport.rediscluster, now I'm sure there won't be problems anymore
Would this also support Redis cluster for backend?
@zs-neo would love to see this land 🙏🏾
Hey, @zs-neo I found an issue after using your patch in production for a few days.
The observation is that when a worker process dies, a new worker process is forked, and then the controller process gets stuck. Both the new worker process and the controller process are reading the same socket created by BRPOP.
How to reproduce the issue?
- start the celery server
celery -A celery_app worker --concurrency=2 --loglevel=INFO --prefetch-multiplier=1 --queues=<queue-name> --without-mingle -Ofair --max-tasks-per-child=3 - Send a ping message
celery -A celery_app inspect ping -d celery@$HOSTNAME - Enqueue a few messages.
Below is stack trace I captured in the new forked worker
File "/usr/local/lib/python3.11/site-packages/celery/concurrency/asynpool.py", line 496, in _event_process_exit
self.maintain_pool()
File "/usr/local/lib/python3.11/site-packages/billiard/pool.py", line 1351, in maintain_pool
self._maintain_pool()
File "/usr/local/lib/python3.11/site-packages/billiard/pool.py", line 1343, in _maintain_pool
self._repopulate_pool(joined)
File "/usr/local/lib/python3.11/site-packages/billiard/pool.py", line 1328, in _repopulate_pool
self._create_worker_process(self._avail_index())
File "/usr/local/lib/python3.11/site-packages/celery/concurrency/asynpool.py", line 491, in _create_worker_process
return super()._create_worker_process(i)
File "/usr/local/lib/python3.11/site-packages/billiard/pool.py", line 1158, in _create_worker_process
w.start()
File "/usr/local/lib/python3.11/site-packages/billiard/process.py", line 120, in start
self._popen = self._Popen(self)
File "/usr/local/lib/python3.11/site-packages/billiard/context.py", line 331, in _Popen
return Popen(process_obj)
File "/usr/local/lib/python3.11/site-packages/billiard/popen_fork.py", line 22, in __init__
self._launch(process_obj)
File "/usr/local/lib/python3.11/site-packages/billiard/popen_fork.py", line 77, in _launch
code = process_obj._bootstrap()
File "/usr/local/lib/python3.11/site-packages/billiard/process.py", line 316, in _bootstrap
util._run_after_forkers()
File "/usr/local/lib/python3.11/multiprocessing/util.py", line 170, in _run_after_forkers
func(obj)
File "/usr/local/lib/python3.11/site-packages/kombu/resource.py", line 21, in _after_fork_cleanup_resource
resource.force_close_all()
File "/usr/local/lib/python3.11/site-packages/kombu/resource.py", line 181, in force_close_all
self.collect_resource(res)
File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 1082, in collect_resource
return resource.collect(socket_timeout)
File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 384, in collect
self._do_close_self()
File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 354, in _do_close_self
self.maybe_close_channel(self._default_channel)
File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 346, in maybe_close_channel
channel.close()
File "/usr/local/lib/python3.11/site-packages/kombu/transport/rediscluster.py", line 597, in close
self._brpop_read(**{"conn": conn})
You can see the after-fork hook calls rediscluster.Transport.close which has logic to do _brpop_read.
I think the problem is that the after_fork function does not clean up _in_poll_connections. I propose following fix.
What do you think?
$ git --no-pager diff -U10
diff --git a/kombu/transport/rediscluster.py b/kombu/transport/rediscluster.py
index d0653fcf..549edf32 100644
--- a/kombu/transport/rediscluster.py
+++ b/kombu/transport/rediscluster.py
@@ -454,20 +454,21 @@ class Channel(RedisChannel):
def _after_fork(self):
self._disconnect_pools()
def _disconnect_pools(self):
client = self._client
if client is not None:
client.disconnect_connection_pools()
client.close()
self._client = None
+ self._in_poll_connections.clear()
@zs-neo @auvipy Could you please provide a status of this PR? We already use this code in production, but it would be great to see it in upcoming release. Thanks for work!
it is scheduled for v5.7 release
It would be nice to see this land sooon as I'm trying to use AWS serverless valkey and it runs in a cluster mode so Celery won't run on it. It's going to become more common for people to want to run their projects on serverless versions since they tend to offer scalable storage and not a fixed amount like you would get when you use a specific instance type.
I guess my workaround for now will be to spin up a normal version just for celery to use.
Hello team, any update on this one? Looks like it's very close to get shipped to next release?
@auvipy Looks like it's a documentation issue, is it transient, or is there something else needed?
this isslated for v5.7.0 release
