kombu icon indicating copy to clipboard operation
kombu copied to clipboard

support redis cluster transport

Open zs-neo opened this issue 1 year ago • 17 comments

Attempt to address https://github.com/celery/kombu/pull/1021

Thank you very much for your code, it helps us a lot. @auvipy

We use redis-py instead of redis-py-cluster because redis-py-cluster has been merged into redis-py. Celery works fine on our cluster with multi producers and multi consumers, when a node goes down, it can automatically switch.

celery==5.4.0
Django==4.1
django-celery-beat==2.7.0
django-celery-results==2.5.1
django-filter==24.3
django-redis==5.4.0
django-split-settings==0.3.0
django-timezone-field==5.1
djangorestframework==3.15.2
gunicorn==22.0.0
pyOpenSSL==21.0.0
redis==5.0.0
requests==2.22.0
uWSGI==2.0.18
app = Celery('celery_test', broker='rediscluster://:[email protected]:7024')

zs-neo avatar Dec 10 '24 13:12 zs-neo

Codecov Report

Attention: Patch coverage is 86.76123% with 56 lines in your changes missing coverage. Please review.

Project coverage is 81.89%. Comparing base (13e6938) to head (543cc93). Report is 12 commits behind head on main.

Files with missing lines Patch % Lines
kombu/transport/rediscluster.py 86.76% 34 Missing and 22 partials :warning:
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2204      +/-   ##
==========================================
+ Coverage   81.60%   81.89%   +0.28%     
==========================================
  Files          77       78       +1     
  Lines        9540     9963     +423     
  Branches     1162     1238      +76     
==========================================
+ Hits         7785     8159     +374     
- Misses       1563     1591      +28     
- Partials      192      213      +21     

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov[bot] avatar Dec 10 '24 13:12 codecov[bot]

Only one test is failing, due to connection failure

auvipy avatar Dec 25 '24 07:12 auvipy

@auvipy @Nusnus ping :) I fixed the issues I found recently and now this PR is ready. I'm not a kombu expert, any suggestions are welcome

zs-neo avatar Dec 25 '24 08:12 zs-neo

Check linter error :)

Nusnus avatar Dec 25 '24 09:12 Nusnus

Sorry for the failure, I will fix it and improve the test coverage.

zs-neo avatar Dec 25 '24 09:12 zs-neo

I am following it and already had reviewed it twice. Will have an in depth review again tomorrow. No worries. Thanks for picking my work

auvipy avatar Dec 25 '24 10:12 auvipy

https://github.com/celery/kombu/actions/runs/12492830091/job/34860983164?pr=2204 SigmaOS 2024-12-25 14 40 56

Nusnus avatar Dec 25 '24 12:12 Nusnus

https://github.com/celery/kombu/actions/runs/12492830091/job/34860983164?pr=2204 SigmaOS 2024-12-25 14 40 56

Added docs for kombu.transport.rediscluster, now I'm sure there won't be problems anymore

zs-neo avatar Dec 26 '24 03:12 zs-neo

Would this also support Redis cluster for backend?

lan17 avatar Jan 18 '25 21:01 lan17

@zs-neo would love to see this land 🙏🏾

scouredimage avatar Apr 04 '25 20:04 scouredimage

Hey, @zs-neo I found an issue after using your patch in production for a few days. The observation is that when a worker process dies, a new worker process is forked, and then the controller process gets stuck. Both the new worker process and the controller process are reading the same socket created by BRPOP.

How to reproduce the issue?

  1. start the celery server celery -A celery_app worker --concurrency=2 --loglevel=INFO --prefetch-multiplier=1 --queues=<queue-name> --without-mingle -Ofair --max-tasks-per-child=3
  2. Send a ping message celery -A celery_app inspect ping -d celery@$HOSTNAME
  3. Enqueue a few messages.

Below is stack trace I captured in the new forked worker

  File "/usr/local/lib/python3.11/site-packages/celery/concurrency/asynpool.py", line 496, in _event_process_exit
    self.maintain_pool()
  File "/usr/local/lib/python3.11/site-packages/billiard/pool.py", line 1351, in maintain_pool
    self._maintain_pool()
  File "/usr/local/lib/python3.11/site-packages/billiard/pool.py", line 1343, in _maintain_pool
    self._repopulate_pool(joined)
  File "/usr/local/lib/python3.11/site-packages/billiard/pool.py", line 1328, in _repopulate_pool
    self._create_worker_process(self._avail_index())
  File "/usr/local/lib/python3.11/site-packages/celery/concurrency/asynpool.py", line 491, in _create_worker_process
    return super()._create_worker_process(i)
  File "/usr/local/lib/python3.11/site-packages/billiard/pool.py", line 1158, in _create_worker_process
    w.start()
  File "/usr/local/lib/python3.11/site-packages/billiard/process.py", line 120, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.11/site-packages/billiard/context.py", line 331, in _Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.11/site-packages/billiard/popen_fork.py", line 22, in __init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.11/site-packages/billiard/popen_fork.py", line 77, in _launch
    code = process_obj._bootstrap()
  File "/usr/local/lib/python3.11/site-packages/billiard/process.py", line 316, in _bootstrap
    util._run_after_forkers()
  File "/usr/local/lib/python3.11/multiprocessing/util.py", line 170, in _run_after_forkers
    func(obj)
  File "/usr/local/lib/python3.11/site-packages/kombu/resource.py", line 21, in _after_fork_cleanup_resource
    resource.force_close_all()
  File "/usr/local/lib/python3.11/site-packages/kombu/resource.py", line 181, in force_close_all
    self.collect_resource(res)
  File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 1082, in collect_resource
    return resource.collect(socket_timeout)
  File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 384, in collect
    self._do_close_self()
  File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 354, in _do_close_self
    self.maybe_close_channel(self._default_channel)
  File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 346, in maybe_close_channel
    channel.close()
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/rediscluster.py", line 597, in close
    self._brpop_read(**{"conn": conn})

You can see the after-fork hook calls rediscluster.Transport.close which has logic to do _brpop_read.

I think the problem is that the after_fork function does not clean up _in_poll_connections. I propose following fix. What do you think?

$ git --no-pager diff -U10
diff --git a/kombu/transport/rediscluster.py b/kombu/transport/rediscluster.py 
index d0653fcf..549edf32 100644
--- a/kombu/transport/rediscluster.py
+++ b/kombu/transport/rediscluster.py
@@ -454,20 +454,21 @@ class Channel(RedisChannel):

     def _after_fork(self):
         self._disconnect_pools()

     def _disconnect_pools(self):
         client = self._client
         if client is not None:
             client.disconnect_connection_pools()
             client.close()
         self._client = None
+        self._in_poll_connections.clear()

dingxiong avatar Jun 30 '25 18:06 dingxiong

@zs-neo @auvipy Could you please provide a status of this PR? We already use this code in production, but it would be great to see it in upcoming release. Thanks for work!

artemvang avatar Aug 04 '25 15:08 artemvang

it is scheduled for v5.7 release

auvipy avatar Aug 05 '25 07:08 auvipy

It would be nice to see this land sooon as I'm trying to use AWS serverless valkey and it runs in a cluster mode so Celery won't run on it. It's going to become more common for people to want to run their projects on serverless versions since they tend to offer scalable storage and not a fixed amount like you would get when you use a specific instance type.

I guess my workaround for now will be to spin up a normal version just for celery to use.

rfenner avatar Sep 11 '25 02:09 rfenner

Hello team, any update on this one? Looks like it's very close to get shipped to next release?

Ragnarow avatar Oct 16 '25 20:10 Ragnarow

@auvipy Looks like it's a documentation issue, is it transient, or is there something else needed?

Ragnarow avatar Oct 28 '25 11:10 Ragnarow

this isslated for v5.7.0 release

auvipy avatar Oct 29 '25 15:10 auvipy