waimak
waimak copied to clipboard
Allow the fail behaviour of the Parallel Scheduler to be configurable
Expected Behaviour
Parallel scheduler would be configured to behave in the case of failures to:
- Fail all actions currently in progress (actions are terminated mid-execution)
- Allow currently running parallel actions to finish before failing the flow (semi-graceful)
- Possibly allow all DAGs independent of the one the failure occurred on to finish before failing the flow (graceful)
Actual Behavior
As soon as an action fails all other currently executing actions are terminated mid-execution
I'm not sure the current fail behaviour works 100% of the time:
19/04/26 11:26:12 INFO dataflow.ParallelActionScheduler: Waiting for an action to finish to continue. Running actions: 20
Exception in thread "pool-990-thread-3" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOfRange(Arrays.java:3664)
at java.lang.StringBuffer.toString(StringBuffer.java:669)
at java.net.URI.toString(URI.java:1945)
at java.net.URI.<init>(URI.java:742)
at org.apache.hadoop.fs.Path.<init>(Path.java:109)
at org.apache.hadoop.fs.Path.<init>(Path.java:94)
at org.apache.hadoop.fs.adl.AdlFileSystem.toFileStatuses(AdlFileSystem.java:583)
at org.apache.hadoop.fs.adl.AdlFileSystem.listStatus(AdlFileSystem.java:468)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1566)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1609)
at com.coxautodata.waimak.filesystem.FSUtils$.removeSubFoldersPresentInList(FSUtils.scala:169)
at com.coxautodata.waimak.filesystem.FSUtils$.moveAll(FSUtils.scala:191)
...........
at com.coxautodata.waimak.dataflow.spark.SimpleAction$$anonfun$performAction$1.apply(SimpleAction.scala:32)
at com.coxautodata.waimak.dataflow.spark.SimpleAction$$anonfun$performAction$1.apply(SimpleAction.scala:30)
at scala.util.Try$.apply(Try.scala:192)
at com.coxautodata.waimak.dataflow.spark.SimpleAction.performAction(SimpleAction.scala:30)
at com.coxautodata.waimak.dataflow.spark.SparkDataFlowAction$class.performAction(SparkDataFlowAction.scala:10)
at com.coxautodata.waimak.dataflow.spark.SimpleAction.performAction(SimpleAction.scala:21)
at com.coxautodata.waimak.dataflow.ParallelActionScheduler$$anonfun$9$$anonfun$10.apply(ParallelActionScheduler.scala:104)
at com.coxautodata.waimak.dataflow.ParallelActionScheduler$$anonfun$9$$anonfun$10.apply(ParallelActionScheduler.scala:104)
at scala.util.Try$.apply(Try.scala:192)
at com.coxautodata.waimak.dataflow.ParallelActionScheduler$$anonfun$9.apply(ParallelActionScheduler.scala:104)
at com.coxautodata.waimak.dataflow.ParallelActionScheduler$$anonfun$9.apply(ParallelActionScheduler.scala:100)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I'm not sure the uncaught exceptions in child threads are actually raised to the parent. I have a job hanging waiting on this failed action to finish.
@alexeipab @vavison what do you think?