nussknacker
nussknacker copied to clipboard
High model timeout can kill Flink TaskManagers
High model timeout (CompiledProcessWithDeps#processTimeout
) can cause task cancellation to stall:
"Source: TEST_PROCESS-[...]" Id=774 BLOCKED on java.lang.Object@4c77d96 owned by "Legacy Source Thread - Source: TEST_PROCESS-[...]" Id=775
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:492)
- blocked on java.lang.Object@4c77d96
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
https://github.com/apache/flink/blob/release-1.9.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L492
because Nussknacker is still waiting on interpretation result (https://github.com/TouK/nussknacker/blob/v0.2.0/engine/flink/process/src/main/scala/pl/touk/nussknacker/engine/process/FlinkProcessRegistrar.scala#L47):
"Legacy Source Thread - Source: TEST_PROCESS-[...]" Id=775 TIMED_WAITING on scala.concurrent.impl.Promise$CompletionLatch@6028e5a7
at sun.misc.Unsafe.park(Native Method)
- waiting on scala.concurrent.impl.Promise$CompletionLatch@6028e5a7
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
(... continuation from MAT dump)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(Lscala/Function0;Lscala/concurrent/CanAwait;)Ljava/lang/Object; (BlockContext.scala:53)
at scala.concurrent.Await$.result(Lscala/concurrent/Awaitable;Lscala/concurrent/duration/Duration;)Ljava/lang/Object; (package.scala:190)
at pl.touk.nussknacker.engine.process.FlinkProcessRegistrar$SyncInterpretationFunction.flatMap(Lpl/touk/nussknacker/engine/api/Context;Lorg/apache/flink/util/Collector;)V (FlinkProcessRegistrar.scala:47)
When this exceeds task.cancellation.timeout
(180s by default) Flink's TaskCancelerWatchDog
kills entire TaskManager. If job was cancelled due to some failure in its execution it will get to another TM with free slots and kill it too.
I'm not sure what caused the process above to hang, but I believe that this value should at least be validated to be a bit below the threshold that causes Ffink to kill itself.
Possible solution: chcecking if model timeout is higher than Flink timeout