kombu
kombu copied to clipboard
[WIP] added initial redis cluster transport support code
working on new tests
I've got a weird error when using SimpleQueue.get()
with this new transport:
In [8]: m = q.get(timeout=1)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-8-662154a51631> in <module>
----> 1 m = q.get(timeout=1)
/usr/local/lib/python3.7/dist-packages/kombu/simple.py in get(self, block, timeout)
59 # messages are sent over the same socket; also POSIX makes
60 # no guarantees against socket calls returning early.
---> 61 self.channel.connection.client.drain_events(timeout=remaining)
62 except socket.timeout:
63 raise self.Empty()
/usr/local/lib/python3.7/dist-packages/kombu/connection.py in drain_events(self, **kwargs)
313 socket.timeout: if the timeout is exceeded.
314 """
--> 315 return self.transport.drain_events(self.connection, **kwargs)
316
317 def maybe_close_channel(self, channel):
/usr/local/lib/python3.7/dist-packages/kombu/transport/virtual/base.py in drain_events(self, connection, timeout)
961 while 1:
962 try:
--> 963 get(self._deliver, timeout=timeout)
964 except Empty:
965 if timeout is not None and monotonic() - time_start >= timeout:
TypeError: get() got multiple values for argument 'timeout'
It works fine when using kombu.transport.redis
.
Seems that MultiChannelPoller.get
should have signature get(self, callback, timeout=None)
instead of get(self, timeout=None)
?
thanks for testing and feedback. I will check and try to push a fix ASAP. keep in mind its work in progress right now
I managed to trim down the code, inherit most stuff from kombu.transport.redis
and fixed some corner cases (see #1025), resulting in a 250-line module. Some hardcoded things (e.g.: pipeline(True)
in QoS
) forced code duplication, though.
What works so far:
-
celery worker
seems to work well on Redis Cluster (3 masters + 3 replica) with multi producers + multi consumers. -
all
celery inspect
commands are broken for now because the PUBSUB part is not compatible (yet). -
there're some memory leak issues related to heartbeat (might be PUBSUB's fault too) (see https://github.com/celery/celery/issues/5047).
Thank you for your PR very much. Otherwise I won't even know where to get started. 👍
@iwinux thanks for trying this. If you don't mind could you push your improvements in my branch? as we both are working with it, why should duplicate the effort? lets polish it together? does that sound ok? we will get the credit for this.
That sounds great. I'll send you a pull request :)
amazing! thanks!
The pull request to this pull request is here: https://github.com/auvipy/kombu/pull/2
I've tried to simplify it further but got stuck here. I'm not sure how to implement a new transport properly - useful information is buried deep in existing code, obscured by event polling / callbacks, backend specifics and error handling etc.
Also, would it be a good idea to refactor kombu.transport.redis
so that common code could be shared more cleanly, compared to current hacky method overrides?
guys, i hope you didn't give up with that change :)
this is on list, plz donate for celery
is this still active?
is this still active?
can you test this on local machine?
is this still active?
can you test this on local machine?
I want to test it on my machine, but I find it didn't work: package path error、class name error and so on. After that I tried to fix these simple errors, it occurs that "ERROR sending 'cluster slots' command to redis server" . So, is it still active?
is this still active?
I removed my personal fork so this need copy-paste if someone wants to try & contribute
Has anyone fixed this issue
File "/usr/local/lib/python3.6/site-packages/celery/worker/worker.py", line 205, in start self.blueprint.start(self) File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start step.start(parent) File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 369, in start return self.obj.start() File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 318, in start blueprint.start(self) File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start step.start(parent) File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 596, in start c.loop(*c.loop_args()) File "/usr/local/lib/python3.6/site-packages/celery/worker/loops.py", line 113, in synloop connection.drain_events(timeout=2.0) File "/usr/local/lib/python3.6/site-packages/kombu/connection.py", line 323, in drain_events return self.transport.drain_events(self.connection, **kwargs) File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 963, in drain_events get(self._deliver, timeout=timeout) TypeError: get() got multiple values for argument 'timeout'
is this still active ? would be really nice to have.
@dligthart In my experience, the celery team is stretched pretty thin and @auvipy here is suggesting someone jump in and contribute to the project if they want this done.
Also, contributors seem to be more hesitant to get into the redis backend, as it needs more work.
i might revisit it in near future If no one take this over
Is there a way to financially contribute to such a feature?
Is there a way to financially contribute to such a feature?
you can reach me out at: [email protected]
any news on this one ;) ?
i would love to see this implemented, thanks for your work so far :)
@auvipy I see this FR added milestone 5.1.0 and 6.0. Is it implemented somewhere else? This will be a really useful feature for many use cases.
@auvipy I see this FR added milestone 5.1.0 and 6.0. Is it implemented somewhere else? This will be a really useful feature for many use cases.
this will be moved to somewhere else for now
Is this in progress or being tracked somewhere else in this repo?
Thanks