gazelle_plugin
gazelle_plugin copied to clipboard
java.lang.ClassCastException: org.apache.spark.sql.execution.vectorized.OnHeapColumnVector cannot be cast to com.intel.oap.vectorized.ArrowWritableColumnVector
Describe the bug lost task 4.0 in stage 0.0 (TID 4) (172.30.18.3 executor driver): java.lang.ClassCastException: org.apache.spark.sql.execution.vectorized.OnHeapColumnVector cannot be cast to com.intel.oap.vectorized.ArrowWritableColumnVector at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.$anonfun$hasNext$4(ColumnarConditionProjector.scala:236) at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.$anonfun$hasNext$4$adapted(ColumnarConditionProjector.scala:235) at scala.collection.immutable.List.map(List.scala:293) at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.hasNext(ColumnarConditionProjector.scala:235) at com.intel.oap.vectorized.CloseableColumnBatchIterator.hasNext(CloseableColumnBatchIterator.scala:47) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:96) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) Caused by: java.lang.ClassCastException: org.apache.spark.sql.execution.vectorized.OnHeapColumnVector cannot be cast to com.intel.oap.vectorized.ArrowWritableColumnVector at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.$anonfun$hasNext$4(ColumnarConditionProjector.scala:236) at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.$anonfun$hasNext$4$adapted(ColumnarConditionProjector.scala:235) at scala.collection.immutable.List.map(List.scala:293) at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.hasNext(ColumnarConditionProjector.scala:235) at com.intel.oap.vectorized.CloseableColumnBatchIterator.hasNext(CloseableColumnBatchIterator.scala:47) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:96) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
To Reproduce PATH_TO_JAR=/home/legion/miniconda3/envs/oapenv/oap_jars ; ${SPARK_HOME}/bin/spark-sql --verbose --driver-memory 10G --conf spark.plugins=com.intel.oap.GazellePlugin --conf spark.driver.extraClassPath=$PATH_TO_JAR/spark-arrow-datasource-standard-1.3.1-jar-with-dependencies.jar:$PATH_TO_JAR/spark-columnar-core-1.3.1-jar-with-dependencies.jar:$PATH_TO_JAR/spark-sql-columnar-shims-common-1.3.1.jar:$PATH_TO_JAR/spark-sql-columnar-shims-spark321-1.3.1.jar --conf spark.executor.extraClassPath=$PATH_TO_JAR/spark-arrow-datasource-standard-1.3.1-jar-with-dependencies.jar:$PATH_TO_JAR/spark-columnar-core-1.3.1-jar-with-dependencies.jar:$PATH_TO_JAR/spark-sql-columnar-shims-common-1.3.1.jar:$PATH_TO_JAR/spark-sql-columnar-shims-spark321-1.3.1.jar --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager --conf spark.driver.cores=1 --conf spark.executor.instances=12 --conf spark.executor.cores=6 --conf spark.executor.memory=20G --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=20G --conf spark.task.cpus=1 --conf spark.locality.wait=0s --conf spark.sql.shuffle.partitions=72 --conf spark.executorEnv.ARROW_LIBHDFS3_DIR="$PATH_TO_LIBHDFS3_DIR/" --conf spark.executorEnv.LD_LIBRARY_PATH="$PATH_TO_LIBHDFS3_DEPENDENCIES_DIR" --jars $PATH_TO_JAR/spark-arrow-datasource-standard-1.3.1-jar-with-dependencies.jar,$PATH_TO_JAR/spark-columnar-core-1.3.1-jar-with-dependencies.jar
A simple query based on 2G on TPCH
select sum(l_extendedprice(1-l_discount)) from parq.lineitem_2g, parq.orders_2g where l_orderkey = o_orderkey and o_orderdate >= date '1994-01-01' and o_orderdate < date '1995-01-01';*
Expected behavior Query should run and give revenue as output
Additional context Add any other context about the problem here.
When I disable WSCG set spark.sql.codegen.wholeStage=false;
Query works fine
I resolved this issue by using Arrow Data Source.
Here is my observation
- if Arrow data source is not enabled, please disable set spark.sql.codegen.wholeStage=false; (spark-sql) in order to run gazelle
@zhouyuan could you please review my observation
@Manoj-red-hat, sorry for this late reply! Is there any reason for not using arrow data source on your side? According to the given exception message, spark columnar batch data was feed into gazelle's columnar projection. But actually, spark columnar format is not compatible with gazelle columnar format.
Hi @PHILO-HE ,
Ya I understand you guys are working hard on this, evident from daily commits.
Is there any reason for not using arrow data source on your side?
I have ORC database and arrow data source is only for parquet. Earlier I was on hive-llap that's why have ORC based data.
After going through the source code, I figure out this incompatibilty thus, made another set-up of tpds parquet based data and its work.
For the sake of community just want other if they face such issue than better to go for parquet else use row_to_columanr(by disabling spark.sql.codegen.wholeStage)
Just want to confirm from the community is my understanding is correct, thanks
@Manoj-red-hat, thanks for your discussion.
I think the configuration for spark.sql.codegen.wholeStage=false
may just occasionally avoid this cast exception. You may need more tests to verify it.
BTW, ORC format is already supported in Gazelle. Please refer to the below source code.
gazelle_plugin/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/Orc_TPCDSSuite.scala