root
root copied to clipboard
C++ exceptions are not correctly propagated by Dask
Describe the bug
If an exception happens on the C++ side during the execution of an RDF task within a Dask worker process, this is not properly propagated. Most times it leads to some kind of unrepairable error. Take the following example
from dask.distributed import LocalCluster, Client
import ROOT
RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
def create_connection():
cluster = LocalCluster(n_workers=2, threads_per_worker=1, processes=False)
client = Client(cluster)
return client
if __name__ == "__main__":
connection = create_connection()
df = RDataFrame(100, daskclient=connection)
df = df.Define("x", "1").Define("y", "ROOT::RVecF{1., 2., 3.}")
g = df.Graph("x", "y")
gv = g.GetValue()
The expected error here is that the graph cannot be filled with columns of mixed scalar/vector types. Running the code above gives
$: python test.py
RDataFrame::Run: event loop was interrupted
Warning in <TBufferFile::WriteObjectAny>: since runtime_error has no public constructor
which can be called without argument, objects of this class
can not be read with the current library. You will need to
add a default constructor before attempting to read it.
Warning in <TStreamerInfo::Build>: runtime_error: base class exception has no streamer or dictionary it will not be saved
Warning in <TStreamerInfo::Build>: runtime_error: __cow_string has no streamer or dictionary, data member "_M_msg" will not be saved
Error in <TClass::New>: cannot create object of class runtime_error
Error in <TBufferFile::ReadObject>: could not create object of class runtime_error
*** Break *** bus error
Followed by the usual gdb stacktrace produced by ROOT. This happens because Dask tries to serialize the error object in order to send it back to the user. But the exception cannot be serialized by ROOT as stated in the warning.
Expected behavior
The C++ exceptions should be properly propagated to the user.
Setup
ROOT master GCC 12 Fedora 36
The Spark backend doesn't suffer from this issue because Spark does not try to serialize the C++ error object instance before sending it to the user. The error raised is more verbose (it includes the full Java stacktrace), but correctly reports the original C++ error:
$: python test_spark.py
[...]
22/07/27 16:51:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/vpadulan/programs/rootproject/rootbuild/master-distrdf-debug/lib/DistRDF/Backends/Base.py", line 112, in distrdf_mapper
mergeables = get_mergeable_values(rdf_plus.rdf, current_range.id, computation_graph_callable, optimized)
File "/home/vpadulan/programs/rootproject/rootbuild/master-distrdf-debug/lib/DistRDF/Backends/Base.py", line 69, in get_mergeable_values
resultptr_list = computation_graph_callable(starting_node, range_id)
File "/home/vpadulan/programs/rootproject/rootbuild/master-distrdf-debug/lib/DistRDF/ComputationGraphGenerator.py", line 219, in trigger_computation_graph
ROOT.Internal.RDF.TriggerRun(rnode)
cppyy.gbl.std.runtime_error: void ROOT::Internal::RDF::TriggerRun(ROOT::RDF::RInterface<ROOT::Detail::RDF::RNodeBase,void>& node) =>
runtime_error: Graph was applied to a mix of scalar values and collections. This is not supported.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/vpadulan/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
process()
File "/home/vpadulan/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
serializer.dump_stream(out_iter, outfile)
File "/home/vpadulan/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/home/vpadulan/.local/lib/python3.10/site-packages/pyspark/rdd.py", line 995, in func
except StopIteration:
File "/home/vpadulan/.local/lib/python3.10/site-packages/pyspark/rdd.py", line 1152, in aggregatePartition
acc = seqOp(acc, obj)
File "/home/vpadulan/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
return f(*args, **kwargs)
File "/home/vpadulan/programs/rootproject/rootbuild/master-distrdf-debug/lib/DistRDF/Backends/Spark/Backend.py", line -1, in spark_mapper
File "/home/vpadulan/programs/rootproject/rootbuild/master-distrdf-debug/lib/DistRDF/Backends/Base.py", line 116, in distrdf_mapper
raise RuntimeError(f"C++ exception thrown:\n\t{type(e).__name__}: {e.what()}")
RuntimeError: C++ exception thrown:
runtime_error: Graph was applied to a mix of scalar values and collections. This is not supported.
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[...]