gazelle_plugin icon indicating copy to clipboard operation
gazelle_plugin copied to clipboard

java.lang.ClassCastException: org.apache.spark.sql.execution.vectorized.OnHeapColumnVector cannot be cast to com.intel.oap.vectorized.ArrowWritableColumnVector

Open Manoj-red-hat opened this issue 2 years ago • 5 comments

Describe the bug lost task 4.0 in stage 0.0 (TID 4) (172.30.18.3 executor driver): java.lang.ClassCastException: org.apache.spark.sql.execution.vectorized.OnHeapColumnVector cannot be cast to com.intel.oap.vectorized.ArrowWritableColumnVector at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.$anonfun$hasNext$4(ColumnarConditionProjector.scala:236) at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.$anonfun$hasNext$4$adapted(ColumnarConditionProjector.scala:235) at scala.collection.immutable.List.map(List.scala:293) at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.hasNext(ColumnarConditionProjector.scala:235) at com.intel.oap.vectorized.CloseableColumnBatchIterator.hasNext(CloseableColumnBatchIterator.scala:47) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:96) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) Caused by: java.lang.ClassCastException: org.apache.spark.sql.execution.vectorized.OnHeapColumnVector cannot be cast to com.intel.oap.vectorized.ArrowWritableColumnVector at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.$anonfun$hasNext$4(ColumnarConditionProjector.scala:236) at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.$anonfun$hasNext$4$adapted(ColumnarConditionProjector.scala:235) at scala.collection.immutable.List.map(List.scala:293) at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.hasNext(ColumnarConditionProjector.scala:235) at com.intel.oap.vectorized.CloseableColumnBatchIterator.hasNext(CloseableColumnBatchIterator.scala:47) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:96) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

To Reproduce PATH_TO_JAR=/home/legion/miniconda3/envs/oapenv/oap_jars ; ${SPARK_HOME}/bin/spark-sql --verbose --driver-memory 10G --conf spark.plugins=com.intel.oap.GazellePlugin --conf spark.driver.extraClassPath=$PATH_TO_JAR/spark-arrow-datasource-standard-1.3.1-jar-with-dependencies.jar:$PATH_TO_JAR/spark-columnar-core-1.3.1-jar-with-dependencies.jar:$PATH_TO_JAR/spark-sql-columnar-shims-common-1.3.1.jar:$PATH_TO_JAR/spark-sql-columnar-shims-spark321-1.3.1.jar --conf spark.executor.extraClassPath=$PATH_TO_JAR/spark-arrow-datasource-standard-1.3.1-jar-with-dependencies.jar:$PATH_TO_JAR/spark-columnar-core-1.3.1-jar-with-dependencies.jar:$PATH_TO_JAR/spark-sql-columnar-shims-common-1.3.1.jar:$PATH_TO_JAR/spark-sql-columnar-shims-spark321-1.3.1.jar --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager --conf spark.driver.cores=1 --conf spark.executor.instances=12 --conf spark.executor.cores=6 --conf spark.executor.memory=20G --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=20G --conf spark.task.cpus=1 --conf spark.locality.wait=0s --conf spark.sql.shuffle.partitions=72 --conf spark.executorEnv.ARROW_LIBHDFS3_DIR="$PATH_TO_LIBHDFS3_DIR/" --conf spark.executorEnv.LD_LIBRARY_PATH="$PATH_TO_LIBHDFS3_DEPENDENCIES_DIR" --jars $PATH_TO_JAR/spark-arrow-datasource-standard-1.3.1-jar-with-dependencies.jar,$PATH_TO_JAR/spark-columnar-core-1.3.1-jar-with-dependencies.jar

A simple query based on 2G on TPCH

select sum(l_extendedprice(1-l_discount)) from parq.lineitem_2g, parq.orders_2g where l_orderkey = o_orderkey and o_orderdate >= date '1994-01-01' and o_orderdate < date '1995-01-01';*

Expected behavior Query should run and give revenue as output

Additional context Add any other context about the problem here.

Manoj-red-hat avatar Apr 13 '22 17:04 Manoj-red-hat

When I disable WSCG set spark.sql.codegen.wholeStage=false;

Query works fine

Manoj-red-hat avatar Apr 13 '22 18:04 Manoj-red-hat

I resolved this issue by using Arrow Data Source.

Here is my observation

  1. if Arrow data source is not enabled, please disable set spark.sql.codegen.wholeStage=false; (spark-sql) in order to run gazelle

@zhouyuan could you please review my observation

Manoj-red-hat avatar May 16 '22 06:05 Manoj-red-hat

@Manoj-red-hat, sorry for this late reply! Is there any reason for not using arrow data source on your side? According to the given exception message, spark columnar batch data was feed into gazelle's columnar projection. But actually, spark columnar format is not compatible with gazelle columnar format.

PHILO-HE avatar May 16 '22 14:05 PHILO-HE

Hi @PHILO-HE ,

Ya I understand you guys are working hard on this, evident from daily commits.

Is there any reason for not using arrow data source on your side?

I have ORC database and arrow data source is only for parquet. Earlier I was on hive-llap that's why have ORC based data.

After going through the source code, I figure out this incompatibilty thus, made another set-up of tpds parquet based data and its work.

For the sake of community just want other if they face such issue than better to go for parquet else use row_to_columanr(by disabling spark.sql.codegen.wholeStage)

Just want to confirm from the community is my understanding is correct, thanks

Manoj-red-hat avatar May 16 '22 15:05 Manoj-red-hat

@Manoj-red-hat, thanks for your discussion.
I think the configuration for spark.sql.codegen.wholeStage=false may just occasionally avoid this cast exception. You may need more tests to verify it. BTW, ORC format is already supported in Gazelle. Please refer to the below source code. gazelle_plugin/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/Orc_TPCDSSuite.scala

PHILO-HE avatar May 17 '22 13:05 PHILO-HE