celery-message-consumer
celery-message-consumer copied to clipboard
Can celery-message-consumer run concurrency
Hello,
I would like to know if celery-message-consumer can run concurrency with multiple workers, and if so, how can I do it?
Best,
It's just a celery worker, the answer is exactly the same as for Celery (yes it can)
Actually I noticed your consumer is not making use of the Celery pool. This means even if you set --concurrency
, the processes will get created but the other processes will never be used.
You can update your __call__
function in AMQPRetryHandler
to do something like this
self.pool.apply_async(
func,
args=(msg,),
# accept_callback=self.on_accepted,
# timeout_callback=self.on_timeout,
# callback=message.ack,
# error_callback=self.on_failure,
# soft_timeout=soft_time_limit or task.soft_time_limit,
# timeout=time_limit or task.time_limit,
# correlation_id=task_id,
)
You can get pool in the start
method of AMQPRetryConsumerStep
def start(self, c):
self.pool = c.pool
...
This would also be valid for any other type of concurrency
@nucklehead I had the same issue right now, it seems to be the library doesn't user the workers pool and instead all of the messages are handled on the main process, which makes the worker_signals
useless, and if you connect and disconnect to your database in those signals, the connections are not present on the main process which leads to errors and bugs.
So if this lib actually does not use the Celery worker pool, what is its use? It looks like all it does is setup a kombu consumer and then let Celery drain the connection.
Well without concurrency it is quite worthless. Strange that celery doesn't support topic exchanges and wildcard bindings out of the box :(