nussknacker icon indicating copy to clipboard operation
nussknacker copied to clipboard

High model timeout can kill Flink TaskManagers

Open piotrp opened this issue 4 years ago • 1 comments

High model timeout (CompiledProcessWithDeps#processTimeout) can cause task cancellation to stall:

"Source: TEST_PROCESS-[...]" Id=774 BLOCKED on java.lang.Object@4c77d96 owned by "Legacy Source Thread - Source: TEST_PROCESS-[...]" Id=775
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:492)
	-  blocked on java.lang.Object@4c77d96
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:748)

https://github.com/apache/flink/blob/release-1.9.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L492

because Nussknacker is still waiting on interpretation result (https://github.com/TouK/nussknacker/blob/v0.2.0/engine/flink/process/src/main/scala/pl/touk/nussknacker/engine/process/FlinkProcessRegistrar.scala#L47):

"Legacy Source Thread - Source: TEST_PROCESS-[...]" Id=775 TIMED_WAITING on scala.concurrent.impl.Promise$CompletionLatch@6028e5a7
	at sun.misc.Unsafe.park(Native Method)
	-  waiting on scala.concurrent.impl.Promise$CompletionLatch@6028e5a7
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
        (... continuation from MAT dump)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(Lscala/Function0;Lscala/concurrent/CanAwait;)Ljava/lang/Object; (BlockContext.scala:53)
        at scala.concurrent.Await$.result(Lscala/concurrent/Awaitable;Lscala/concurrent/duration/Duration;)Ljava/lang/Object; (package.scala:190)
        at pl.touk.nussknacker.engine.process.FlinkProcessRegistrar$SyncInterpretationFunction.flatMap(Lpl/touk/nussknacker/engine/api/Context;Lorg/apache/flink/util/Collector;)V (FlinkProcessRegistrar.scala:47)

When this exceeds task.cancellation.timeout (180s by default) Flink's TaskCancelerWatchDog kills entire TaskManager. If job was cancelled due to some failure in its execution it will get to another TM with free slots and kill it too.

I'm not sure what caused the process above to hang, but I believe that this value should at least be validated to be a bit below the threshold that causes Ffink to kill itself.

piotrp avatar Sep 07 '20 11:09 piotrp

Possible solution: chcecking if model timeout is higher than Flink timeout

dswiecki avatar Jun 22 '21 10:06 dswiecki