kombu icon indicating copy to clipboard operation
kombu copied to clipboard

[WIP] added initial redis cluster transport support code

Open auvipy opened this issue 5 years ago • 21 comments

working on new tests

auvipy avatar Mar 11 '19 06:03 auvipy

I've got a weird error when using SimpleQueue.get() with this new transport:

In [8]: m = q.get(timeout=1)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-8-662154a51631> in <module>
----> 1 m = q.get(timeout=1)

/usr/local/lib/python3.7/dist-packages/kombu/simple.py in get(self, block, timeout)
     59                 #    messages are sent over the same socket; also POSIX makes
     60                 #    no guarantees against socket calls returning early.
---> 61                 self.channel.connection.client.drain_events(timeout=remaining)
     62             except socket.timeout:
     63                 raise self.Empty()

/usr/local/lib/python3.7/dist-packages/kombu/connection.py in drain_events(self, **kwargs)
    313             socket.timeout: if the timeout is exceeded.
    314         """
--> 315         return self.transport.drain_events(self.connection, **kwargs)
    316
    317     def maybe_close_channel(self, channel):

/usr/local/lib/python3.7/dist-packages/kombu/transport/virtual/base.py in drain_events(self, connection, timeout)
    961         while 1:
    962             try:
--> 963                 get(self._deliver, timeout=timeout)
    964             except Empty:
    965                 if timeout is not None and monotonic() - time_start >= timeout:

TypeError: get() got multiple values for argument 'timeout'

It works fine when using kombu.transport.redis.

iwinux avatar Mar 20 '19 07:03 iwinux

Seems that MultiChannelPoller.get should have signature get(self, callback, timeout=None) instead of get(self, timeout=None)?

iwinux avatar Mar 20 '19 07:03 iwinux

thanks for testing and feedback. I will check and try to push a fix ASAP. keep in mind its work in progress right now

auvipy avatar Mar 20 '19 07:03 auvipy

I managed to trim down the code, inherit most stuff from kombu.transport.redis and fixed some corner cases (see #1025), resulting in a 250-line module. Some hardcoded things (e.g.: pipeline(True) in QoS) forced code duplication, though.

What works so far:

  1. celery worker seems to work well on Redis Cluster (3 masters + 3 replica) with multi producers + multi consumers.

  2. all celery inspect commands are broken for now because the PUBSUB part is not compatible (yet).

  3. there're some memory leak issues related to heartbeat (might be PUBSUB's fault too) (see https://github.com/celery/celery/issues/5047).

Thank you for your PR very much. Otherwise I won't even know where to get started. 👍

iwinux avatar Mar 22 '19 01:03 iwinux

@iwinux thanks for trying this. If you don't mind could you push your improvements in my branch? as we both are working with it, why should duplicate the effort? lets polish it together? does that sound ok? we will get the credit for this.

auvipy avatar Mar 23 '19 09:03 auvipy

That sounds great. I'll send you a pull request :)

iwinux avatar Mar 24 '19 02:03 iwinux

amazing! thanks!

auvipy avatar Mar 24 '19 04:03 auvipy

The pull request to this pull request is here: https://github.com/auvipy/kombu/pull/2

I've tried to simplify it further but got stuck here. I'm not sure how to implement a new transport properly - useful information is buried deep in existing code, obscured by event polling / callbacks, backend specifics and error handling etc.

Also, would it be a good idea to refactor kombu.transport.redis so that common code could be shared more cleanly, compared to current hacky method overrides?

iwinux avatar Mar 25 '19 12:03 iwinux

guys, i hope you didn't give up with that change :)

yakhira avatar Jul 16 '19 10:07 yakhira

this is on list, plz donate for celery

auvipy avatar Jul 16 '19 12:07 auvipy

is this still active?

aboganas avatar Feb 12 '20 10:02 aboganas

is this still active?

can you test this on local machine?

auvipy avatar Feb 12 '20 10:02 auvipy

is this still active?

can you test this on local machine?

I want to test it on my machine, but I find it didn't work: package path error、class name error and so on. After that I tried to fix these simple errors, it occurs that "ERROR sending 'cluster slots' command to redis server" . So, is it still active?

youguanxinqing avatar Aug 23 '20 14:08 youguanxinqing

is this still active?

NullYing avatar Dec 18 '20 04:12 NullYing

I removed my personal fork so this need copy-paste if someone wants to try & contribute

auvipy avatar Dec 18 '20 05:12 auvipy

Has anyone fixed this issue

File "/usr/local/lib/python3.6/site-packages/celery/worker/worker.py", line 205, in start self.blueprint.start(self) File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start step.start(parent) File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 369, in start return self.obj.start() File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 318, in start blueprint.start(self) File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start step.start(parent) File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 596, in start c.loop(*c.loop_args()) File "/usr/local/lib/python3.6/site-packages/celery/worker/loops.py", line 113, in synloop connection.drain_events(timeout=2.0) File "/usr/local/lib/python3.6/site-packages/kombu/connection.py", line 323, in drain_events return self.transport.drain_events(self.connection, **kwargs) File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 963, in drain_events get(self._deliver, timeout=timeout) TypeError: get() got multiple values for argument 'timeout'

ukm21 avatar May 18 '21 13:05 ukm21

is this still active ? would be really nice to have.

dligthart avatar Jul 09 '21 09:07 dligthart

@dligthart In my experience, the celery team is stretched pretty thin and @auvipy here is suggesting someone jump in and contribute to the project if they want this done.

Also, contributors seem to be more hesitant to get into the redis backend, as it needs more work.

jeffreybrowning avatar Jul 09 '21 13:07 jeffreybrowning

i might revisit it in near future If no one take this over

auvipy avatar Jul 09 '21 14:07 auvipy

Is there a way to financially contribute to such a feature?

aboganas avatar Jul 09 '21 16:07 aboganas

Is there a way to financially contribute to such a feature?

you can reach me out at: [email protected]

auvipy avatar Jul 10 '21 06:07 auvipy

any news on this one ;) ?

pabclsn avatar Nov 08 '22 11:11 pabclsn

i would love to see this implemented, thanks for your work so far :)

benedikt-bartscher avatar Dec 01 '22 14:12 benedikt-bartscher

@auvipy I see this FR added milestone 5.1.0 and 6.0. Is it implemented somewhere else? This will be a really useful feature for many use cases.

haobibo avatar Dec 20 '22 19:12 haobibo

@auvipy I see this FR added milestone 5.1.0 and 6.0. Is it implemented somewhere else? This will be a really useful feature for many use cases.

this will be moved to somewhere else for now

auvipy avatar Dec 21 '22 07:12 auvipy

Is this in progress or being tracked somewhere else in this repo?

Thanks

brunotacca avatar Feb 12 '24 16:02 brunotacca