kombu
kombu copied to clipboard
Redis transport causes multiple Celery job runs if eta exceeds visibility timeout
Caveat - I initially tracked down and patched this problem over a year ago, but never opened an Issue on it. Looking at the Kombu 3 code it appears to still be present, and I'd like to figure out if it can be fixed or if I've misunderstood the code.
Problem:
When moving a job from the queue into the unacked sorted set, the Redis transport sets the "score" to the current time (time.time()).
In restore_visible(), the transport uses this score to determine when to move a job from the unacked queue back into the regular queue, effectively with the heuristic if enqueue_time < time.time() - visibility_timeout.
This makes sense for jobs which don't have an ETA (aren't scheduled for the future, that is). For jobs that do have an ETA, if that ETA is further in the future from the time of enqueue than the visibility timeout, the job will be copied back into the regular queue (after visibility_timeout seconds), despite still being resident in some worker's memory, from whence it can be pulled by another worker and eventually executed twice.
Consider the following timeline:
visibility_timeout=2
T0: job1 is enqueued with ETA = T5T1: job1 is pulled from the queue by worker1; it goes into the unacked queue with a score of T1 (the current time), and then sits around in worker1's memory.T3: worker2 (or is it the master process? never been clear on this) callsrestore_visible, looking for jobs with a score of <= T1 (the current time minus the visibility timeout). It finds job1 and moves it back to the regular queue.T4: worker2 pulls job1 from the regular queue (also sending it to the unacked queue)T5: worker1 and worker2, both with job1 in their internal queue, execute the job simultaneously.
If you have a reasonably short visibility timeout and a reasonably long ETA, this can cause the same job to be executed hundreds of times. This is a violation of the property that one expects by //not// using acks_late - namely, the job can be executed zero times, one time, or multiple times, rather than a guaranteed zero-or-one.
The fix I came up with was patching the Redis transport to set the score in the unacked sorted set relative to the ETA rather than to the current time. This required calling message.decode(), introspecting on the ETA and the timezone thereof - which I'm sure violates all sorts of barriers between Celery and Kombu. But it was all I could think of at the time: 9df3dcaf2461a8266a6691ca2c089518649cd9d5.
It did just occur to me that another possible solution was to have the workers lock around the unacked queue itself - if a worker is able to successfully remove the job from the unacked queue (sorted set, technically) then it has permission to execute the job, otherwise it does not. Not sure if that even makes sense, just spitballing.
Thanks for the detailed report!
This is a known issue, mentioned in the Celery documentation: http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#caveats
I have been considering using the ETA too, but haven't had the time to implement it yet. As you mention unpacking the message is not acceptable as that would violate kombu's boundaries to the application layer. I was thinking the message could simply have a visibility_timeout header that Celery sets when sending an ETA task, which is at least keeps the separation.
Also to clarify: Only the main process handles messages, the main process is the consumer that reserves, acknowledges and delegates tasks to the pool workers.
And secondly there is no at-least-once, at-most-once or exactly-once delivery guarantees, any such requirement will require effort from the user to implement and also depends on the transport used.
For the time being, kombu could issue a logger.warn if ETA was bigger than visability_timeout.
Yeah, but that may not even be enough, since even if you configure the visibility timeout to be increased
you could start something like a celery flower instance but not using the same configuration, e.g.:
celery flower -b amqp://broker
vs
celery -A proj flower
and flower would be restoring messages based on the default visibility timeout, since flower is also a consumer and they all help with this.
The worker also does not know about visibility_timeouts per se, but I guess we have some transport specific hacks in the already based on the transport.driver_type attribute ;)
did anyone every make any progress on this issue? We've been forward-porting this hack I made on new versions of Kombu for like...3 years.
@bigo Just got burned by this myself -- though fortunately not affecting any end users. Looks like suggestion elsewhere is to use RabbitMQ with celery to avoid this ETA/visibility timeout issue. For my part, I am just having the problematic task grab a 5-minute Lock first (so other tasks fail; although also means task should take far less than 5-minutes to execute!). Another devops-ish option could be to ensure a (rolling?) graceful restart of all workers at an interval less than the visibility_timeout so that they give up their tasks naturally.
I do think Celery/Kombu should at least throw an error if an ETA exceeds the set visibility timeout when registering the task, however.
@ask Hello. I see "Not funded" label on this issue.
How much for a fix? :)
Why is there not a fix for this yet? I just sent about 1500 texts to a bunch of end users -.-
we have to figure out the design decision first. btw the more you pay as donation the more time core team member like me could allocate dedicated time for celery :)
@auvipy are you talking about https://opencollective.com/celery or is there a separate channel for kombu ?
yes!! this is all inclusive!!
is there any downside to increasing the visibility_timeout value?
@btai24 The downside is that if your worker crashes it will take 'visibility_timeout' time for the message to be redelivered to another worker. Quite a huge downside if you have weeks'-ahead tasks :(
@auvipy
If you folks come up with a design design, I'm happy to implement. The alternative for us is polling at frequency lower than visibility_timeout and launching the tasks whose etas are within the visibility_timeout during the poll.
I'd much rather fix celery.
Is there a working demo for the bug? ~It seems I've just had the issue in production, but I had no luck reproducing it locally.~
~It's fired a single ETA task about 40 times, and that might be related with how many times I restarted flower within the same ETA window, so I tried emulating that with visibility_timeout = 1.~
The live config is extremely simple: redis + all-default celery config (i.e. I believeacks_late is False) with a single worker + flower.
Edit:
I was looking for a magic number, that could explain the number of accumulated tasks, and it turns out to create one copy every 5 minutes, since the task was first enqueued. Which is exactly equal to unacked_mutex_expire.
How does RabbitMQ handle this scenario?
@DylanYoung it's not an issue for RabbitMQ – it has ACK support out of the box, whereas Redis transport has to emulate it.
@killthekitten Yeah I get that. My question is how ("How does RabbitMQ handle this scenario?"). If you can give me the low-down on ack-support in RabbitMQ and how it works, then we can move into designing a suitable replacement for redis.
@DylanYoung so, here's how I understand it after reading through code and debugging, keep in mind reality might be different.
RabbitMQ transport
You could compare rabbitmq to TCP: until it receives a confirmation of delivery back from the consumer, it retries the delivery. It uses heartbeats and other means to detect network failure or consumer crashes, and guarantees "at least once" delivery. There's more to it but I never actually used it 😅
Redis transport
Kombu uses Redis pubsub as a transport, and it is different. If one of the subscribers for whatever reason fails to process the message, redis doesn't care. Then, the tradeoff is to have a visibility_timeout (defaults to 1 hour), and a sorted set of unacked tasks, to be able to redistribute it to a different worker if timeout's reached.
This is why it is such a bulky implementation we have here. I think it's possible to implement a lighter redis transport, using BRPOP for example, but at a price of dropping support for many celery features.
This is also something @bigo was proposing above.
ETA
Now, neither RabbitMQ, nor redis, have no native support for ETA tasks anyway. So, what celery does – it uses QoS, so workers can prefetch tasks and run them when time has come. For the transport it means that worker can grab a handful of task but not acknowledge them for visibility_timeout (i.e. 1 hour).
If ETA is larger than that, that means worker does not ack the task for more than a visibility_timeout, and in case of redis, it just reschedules the task as if it was a lost message.
It is not a problem for rabbit, since it doesn't need the visibility_timeout workaround and has a native QoS support. Another conversation around it you can read here.
Ok, after reading a bit more about this, it seems like there are two options (though do correct me if I'm wrong):
-
Use a custom header to specify the delay (there are a number of ways to implement this). This is simplest and most robust and natively supported by RabbitMQ via dead letter exchanges (though not amqp) as well as a (newish) plugin.
-
Try to emulate RabbitMQ's QoS. In particular, we need to be able to detect when a worker dies so that we can expire its "claim" on the task (in redis transport lingo: move the task from the unacked queue to the main queue). A naive approach (basically copying RabbitMQ's) would be to generate a uid that represents the "TCP Channel" (in RabbitMQ terminology) and use a heartbeat approach. If the channel heartbeat isn't updated in a (configurable) amount of time, then the tasks associated with that channel are restored from
unackedto the main queue.
Any problems with this approach? There are some celery-level implications: the workers need to be written in such a way that they can issue their heartbeat even when running tasks and, moreover, need to be able to detect themselves when the "task-running part" goes down and stop the heartbeat (if it isn't already).
I can think of one alternative solution using nacks:
- Instead of the worker timing the task to be executed at ETA, wrap the task in
check_and_rescheduletask that is timed to execute at something likemin(now()+visibility_timeout, ETA). Then send a NACK if the task is not ready to execute, returning it to the main queue to be grabbed by another worker which will do the same. If the task is ready execute, ACK and execute as usual. NOTE: probably want to always NACK when acks_late is True to guarantee the task has at least visibility_timeout time to execute before being requeued.
I don't think I'm qualified currently to implement (1). (2) I could probably do with some assistance.
(3) is easy pickings provided the redis transport already emulates nack support.
@killthekitten Thanks for the pointers!
The downside of (3) it that it adds a lot of overhead to ETA tasks (task gets sent to a worker roughly (ETA-now())/visibility_timeout times, but considering they don't really work right now (for any reasonable definition of "work"), that might be acceptable.
With a bit of hacking on the transport (I'm not sure what exactly the barriers are between Kombu and Celery) this overhead could be reduced via a reclaim action, which would simply update the expire time on unacked entry to now() + visibility_timeout. So we still have the overhead of a worker check_and_reschedule running roughly (ETA-now())/visibility_timeout times, but no longer have to send the task to a new worker (unless the current worker crashes).
Note that this solution is very much like a task-specific heartbeat :)
Solution 3 would also solve https://github.com/celery/celery/issues/2541
can you come with a PR?
No movement on this one in a while. Is there any PR or plans for one in the near future?
@ahopkins someone has to take over this, as no active development is happening at the moment.
It has a long history of discussion if you'd like to dive deeper, worth noting https://github.com/celery/celery/issues/3274 and https://github.com/celery/celery/issues/2541.
Sounds good. I have a bit on my plate now, but if the idea is to go with the polling task option, that doesn't sound terribly difficult to implement.
@ahopkins I haven't had any time to devote to this, but if you ever go to look at it and need another brain to bounce things off of, I'm sure I can make time for that.
We worked around this in Django land by intercepting tasks that were scheduled past the visibility timeout and throwing them onto a db queue:
https://github.com/ZeitOnline/celery_longterm_scheduler/issues/4#issuecomment-824471330
TBH I'm happier with this for for future anyway than a 'proper' Celery solution, since then the tasks don't have to be held in the workers while we're waiting for them to come up.
have to recheck if borrowing some idea from long term scheduler into celery