kestra
kestra copied to clipboard
Gracefully close realtime trigger when possible
Issue description
When possible, realtime triggers must be closed gracefully to be able to release resources.
Currently, realtime triggers uses an infinite loop and when killed/restarted their thread is interrupted which lead to resources like client connections to not be released properly.
For example, the Redis realtime trigger log this warning: 2024-05-15 10:01:48,986 WARN Finalizer i.l.c.r.DefaultClientResources io.lettuce.core.resource.DefaultClientResources was not shut down properly, shutdown() was not called before it's garbage-collected. Call shutdown() or shutdown(long,long,TimeUnit)
Ideally, all Realtime triggers must evolves so that:
- It's not using an infinite loop but a loop with a boolean that can be used to signal that the trigger must be stopped.
- For case when we want to restart a realtime trigger: it needs to be done gracefully and not with an interrupt.
- For case when we want to stop the worker, it can be discuss whether or not we need to interrupt the thread.
There is a slight risk that a realtime trigger is "blocked" so we may investigate whether or not we want to do this with a timeout and interrupt the thread as a last resort.
We could add a new stop
method to the new interface WorkerJobLifecycle
with a default noop implementation. Then, all realtime triggers should implement that method (IMHO: quickwin) - the stop method must be non-blocking (e.g. just doing isStopped.set(true
) on an AtomicBoolean
).
In addition, I think that not properly stopping realtime triggers on worker shutdown there is a high risk of data loss. Futhermore, for the Kafka realtime trigger, if the consumer is not properly closed, this will delay the time when the currently assigned partitions will be available for consumption (default value is 5 minutes), could be the same for Pulsar.
@fhussonnois I think you update all existing realtime trigger and I do this for Debezium so we can close it.