root icon indicating copy to clipboard operation
root copied to clipboard

C++ exceptions are not correctly propagated by Dask

Open vepadulano opened this issue 2 years ago • 1 comments

Describe the bug

If an exception happens on the C++ side during the execution of an RDF task within a Dask worker process, this is not properly propagated. Most times it leads to some kind of unrepairable error. Take the following example

from dask.distributed import LocalCluster, Client
import ROOT
RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame

def create_connection():
    cluster = LocalCluster(n_workers=2, threads_per_worker=1, processes=False)
    client = Client(cluster)
    return client

if __name__ == "__main__":
    connection = create_connection()
    df = RDataFrame(100, daskclient=connection)
    df = df.Define("x", "1").Define("y", "ROOT::RVecF{1., 2., 3.}")
    g = df.Graph("x", "y")
    gv = g.GetValue()

The expected error here is that the graph cannot be filled with columns of mixed scalar/vector types. Running the code above gives

$: python test.py
RDataFrame::Run: event loop was interrupted
Warning in <TBufferFile::WriteObjectAny>: since runtime_error has no public constructor
	which can be called without argument, objects of this class
	can not be read with the current library. You will need to
	add a default constructor before attempting to read it.
Warning in <TStreamerInfo::Build>: runtime_error: base class exception has no streamer or dictionary it will not be saved
Warning in <TStreamerInfo::Build>: runtime_error: __cow_string has no streamer or dictionary, data member "_M_msg" will not be saved
Error in <TClass::New>: cannot create object of class runtime_error
Error in <TBufferFile::ReadObject>: could not create object of class runtime_error
 *** Break *** bus error

Followed by the usual gdb stacktrace produced by ROOT. This happens because Dask tries to serialize the error object in order to send it back to the user. But the exception cannot be serialized by ROOT as stated in the warning.

Expected behavior

The C++ exceptions should be properly propagated to the user.

Setup

ROOT master GCC 12 Fedora 36

vepadulano avatar Jul 26 '22 07:07 vepadulano

The Spark backend doesn't suffer from this issue because Spark does not try to serialize the C++ error object instance before sending it to the user. The error raised is more verbose (it includes the full Java stacktrace), but correctly reports the original C++ error:

$: python test_spark.py
[...]
22/07/27 16:51:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/vpadulan/programs/rootproject/rootbuild/master-distrdf-debug/lib/DistRDF/Backends/Base.py", line 112, in distrdf_mapper
    mergeables = get_mergeable_values(rdf_plus.rdf, current_range.id, computation_graph_callable, optimized)
  File "/home/vpadulan/programs/rootproject/rootbuild/master-distrdf-debug/lib/DistRDF/Backends/Base.py", line 69, in get_mergeable_values
    resultptr_list = computation_graph_callable(starting_node, range_id)
  File "/home/vpadulan/programs/rootproject/rootbuild/master-distrdf-debug/lib/DistRDF/ComputationGraphGenerator.py", line 219, in trigger_computation_graph
    ROOT.Internal.RDF.TriggerRun(rnode)
cppyy.gbl.std.runtime_error: void ROOT::Internal::RDF::TriggerRun(ROOT::RDF::RInterface<ROOT::Detail::RDF::RNodeBase,void>& node) =>
    runtime_error: Graph was applied to a mix of scalar values and collections. This is not supported.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/vpadulan/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/home/vpadulan/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/vpadulan/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/vpadulan/.local/lib/python3.10/site-packages/pyspark/rdd.py", line 995, in func
    except StopIteration:
  File "/home/vpadulan/.local/lib/python3.10/site-packages/pyspark/rdd.py", line 1152, in aggregatePartition
    acc = seqOp(acc, obj)
  File "/home/vpadulan/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/home/vpadulan/programs/rootproject/rootbuild/master-distrdf-debug/lib/DistRDF/Backends/Spark/Backend.py", line -1, in spark_mapper
  File "/home/vpadulan/programs/rootproject/rootbuild/master-distrdf-debug/lib/DistRDF/Backends/Base.py", line 116, in distrdf_mapper
    raise RuntimeError(f"C++ exception thrown:\n\t{type(e).__name__}: {e.what()}")
RuntimeError: C++ exception thrown:
	runtime_error: Graph was applied to a mix of scalar values and collections. This is not supported.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
[...]

vepadulano avatar Jul 27 '22 15:07 vepadulano