celery-message-consumer icon indicating copy to clipboard operation
celery-message-consumer copied to clipboard

Can celery-message-consumer run concurrency

Open nhapq opened this issue 5 years ago • 5 comments

Hello,

I would like to know if celery-message-consumer can run concurrency with multiple workers, and if so, how can I do it?

Best,

nhapq avatar Apr 09 '19 08:04 nhapq

It's just a celery worker, the answer is exactly the same as for Celery (yes it can)

anentropic avatar Apr 09 '19 09:04 anentropic

Actually I noticed your consumer is not making use of the Celery pool. This means even if you set --concurrency, the processes will get created but the other processes will never be used. You can update your __call__ function in AMQPRetryHandler to do something like this

                    self.pool.apply_async(
                        func,
                        args=(msg,),
                        # accept_callback=self.on_accepted,
                        # timeout_callback=self.on_timeout,
                        # callback=message.ack,
                        # error_callback=self.on_failure,
                        # soft_timeout=soft_time_limit or task.soft_time_limit,
                        # timeout=time_limit or task.time_limit,
                        # correlation_id=task_id,
                    )

You can get pool in the start method of AMQPRetryConsumerStep

        def start(self, c):
            self.pool = c.pool
            ...

This would also be valid for any other type of concurrency

nucklehead avatar Oct 09 '20 01:10 nucklehead

@nucklehead I had the same issue right now, it seems to be the library doesn't user the workers pool and instead all of the messages are handled on the main process, which makes the worker_signals useless, and if you connect and disconnect to your database in those signals, the connections are not present on the main process which leads to errors and bugs.

victorct-pronto avatar Dec 10 '20 21:12 victorct-pronto

So if this lib actually does not use the Celery worker pool, what is its use? It looks like all it does is setup a kombu consumer and then let Celery drain the connection.

erdnaxeli avatar Apr 08 '21 16:04 erdnaxeli

Well without concurrency it is quite worthless. Strange that celery doesn't support topic exchanges and wildcard bindings out of the box :(

DennyWeinberg avatar Sep 29 '23 08:09 DennyWeinberg