gazelle_plugin icon indicating copy to clipboard operation
gazelle_plugin copied to clipboard

[TPCDS Q4] Wrong Answer on 300G TPCDS partitioned Data v1.4.0

Open Manoj-red-hat opened this issue 2 years ago • 14 comments

Describe the bug [TPCDS Q4] Wrong Answer on 300G TPCDS partitioned Data v1.4.0

To Reproduce

spark.serializer                 org.apache.spark.serializer.KryoSerializer
# gazelle and oap parameters
spark.sql.execution.arrow.maxRecordsPerBatch 64512
spark.sql.parquet.columnarReaderBatchSize 64512
spark.sql.inMemoryColumnarStorage.batchSize 64512
spark.oap.sql.columnar.tmp_dir /hadoop/tmp
# shuffle and joins
spark.shuffle.manager           org.apache.spark.shuffle.sort.ColumnarShuffleManager
spark.sql.shuffle.partitions 90
spark.sql.join.preferSortMergeJoin false
spark.sql.dynamicPartitionPruning.enabled true
spark.sql.optimizer.dynamicPartitionPruning.enforceBroadcastReuse true
spark.sql.adaptive.advisoryPartitionSizeInBytes 100M
spark.sql.autoBroadcastJoinThreshold 100M
spark.sql.files.maxPartitionBytes 500M
spark.oap.sql.columnar.shuffle.preferSpil false
spark.oap.sql.columnar.wholestagecodegen.breakdownTime false

If we set spark.sql.join.preferSortMergeJoin=true we get correct answer else wrong

spark-sql> set spark.sql.join.preferSortMergeJoin;
set spark.sql.join.preferSortMergeJoin
spark.sql.join.preferSortMergeJoin      true
Time taken: 0.019 seconds, Fetched 1 row(s)

Expected behavior Answer should be corrrect in bo

Additional context Add any other context about the problem here. [Correct Plan] correct_q4 q4_correct.pdf

[Wrong plan] wrong_q4_plan wrong_q4_plan.pdf

Manoj-red-hat avatar Aug 26 '22 08:08 Manoj-red-hat

Hi @weiting-chen @PHILO-HE

[Analysis]

spark.sql.join.preferSortMergeJoin

When we enable above setting, it will favour shuffled hash join. So before that it try to coalesce shuffled batches. And it seems issue is durning coalesce operator, it was not honouring max_batch size limit which is 64512.

As a result uint16 limit durning join get hit and produced wrong result

Please validate above analysis and let me know if something else is required

image

Manoj-red-hat avatar Aug 26 '22 08:08 Manoj-red-hat

Q6 also impacted looks multiple queries impacted by this coalesce issue wrong q6

Manoj-red-hat avatar Aug 26 '22 09:08 Manoj-red-hat

@PHILO-HE Could you please validate my analysis ?

Manoj-red-hat avatar Aug 31 '22 14:08 Manoj-red-hat

@Manoj-red-hat, sorry for the late reply. We noted this bug and has a patch to display the warning message in main branch. See link. Could you have a try with main branch? @zhixingheyi-tian, please leave your comment if needed.

PHILO-HE avatar Sep 07 '22 03:09 PHILO-HE

Hi @PHILO-HE, @weiting-chen , I get your point about the warning message.

In my opinion there will be only 2 fix

  1. Remove limit of 64k on batch size, but again for this we need to support disk based sorting or the Memory requirement will increase
  2. Put a limit on coalesce operator, that it should always produce batches less than 64k.

Above 2 only give us proper solution. If you want we can discuss on this and I can fix this issue and will raise PR for you guys

Manoj-red-hat avatar Sep 07 '22 05:09 Manoj-red-hat

@Manoj-red-hat, thanks for your feedback! Could you please try again with main branch? I was told this issue has been fixed in main branch. BTW, code contribution is always welcome! Thanks!

PHILO-HE avatar Sep 07 '22 07:09 PHILO-HE

@PHILO-HE I am on it will update you.

I more point could you please update on driver side hash table build

Because of driver side issue some querries getting 10X slower compared to vanila spark

Manoj-red-hat avatar Sep 07 '22 11:09 Manoj-red-hat

@PHILO-HE query working fine on Q4 using main branch, I will do complete testing across multiple querries (TPCDS), once all validated. I will close this issue

Manoj-red-hat avatar Sep 08 '22 14:09 Manoj-red-hat

Nice to hear that. Thanks for your feedback!

PHILO-HE avatar Sep 09 '22 01:09 PHILO-HE

@PHILO-HE

When I compiled with recent main branch I get jar gazelle-plugin-1.5.0-SNAPSHOT-spark-3.2.1.jar.

When I try to run that, am getting following error.

Could you please help me in getting this run ?

spark-sql> source /shared/tpcds/q04.sql;

WITH year_total AS (
 SELECT c_customer_id customer_id,
        c_first_name customer_first_name,
        c_last_name customer_last_name,
        c_preferred_cust_flag customer_preferred_cust_flag,
        c_birth_country customer_birth_country,
        c_login customer_login,
        c_email_address customer_email_address,
        d_year dyear,
        sum(((ss_ext_list_price-ss_ext_wholesale_cost-ss_ext_discount_amt)+ss_ext_sales_price)/2) year_total,
        's' sale_type
 FROM customer, store_sales, date_dim
 WHERE c_customer_sk = ss_customer_sk AND ss_sold_date_sk = d_date_sk
 GROUP BY c_customer_id,
          c_first_name,
          c_last_name,
          c_preferred_cust_flag,
          c_birth_country,
          c_login,
          c_email_address,
          d_year
 UNION ALL
 SELECT c_customer_id customer_id,
        c_first_name customer_first_name,
        c_last_name customer_last_name,
        c_preferred_cust_flag customer_preferred_cust_flag,
        c_birth_country customer_birth_country,
        c_login customer_login,
        c_email_address customer_email_address,
        d_year dyear,
        sum((((cs_ext_list_price-cs_ext_wholesale_cost-cs_ext_discount_amt)+cs_ext_sales_price)/2) ) year_total,
        'c' sale_type
 FROM customer, catalog_sales, date_dim
 WHERE c_customer_sk = cs_bill_customer_sk AND cs_sold_date_sk = d_date_sk
 GROUP BY c_customer_id,
          c_first_name,
          c_last_name,
          c_preferred_cust_flag,
          c_birth_country,
          c_login,
          c_email_address,
          d_year
 UNION ALL
 SELECT c_customer_id customer_id
       ,c_first_name customer_first_name
       ,c_last_name customer_last_name
       ,c_preferred_cust_flag customer_preferred_cust_flag
       ,c_birth_country customer_birth_country
       ,c_login customer_login
       ,c_email_address customer_email_address
       ,d_year dyear
       ,sum((((ws_ext_list_price-ws_ext_wholesale_cost-ws_ext_discount_amt)+ws_ext_sales_price)/2) ) year_total
       ,'w' sale_type
 FROM customer, web_sales, date_dim
 WHERE c_customer_sk = ws_bill_customer_sk AND ws_sold_date_sk = d_date_sk
 GROUP BY c_customer_id,
          c_first_name,
          c_last_name,
          c_preferred_cust_flag,
          c_birth_country,
          c_login,
          c_email_address,
          d_year)
 SELECT
   t_s_secyear.customer_id,
   t_s_secyear.customer_first_name,
   t_s_secyear.customer_last_name,
   t_s_secyear.customer_preferred_cust_flag
 FROM year_total t_s_firstyear, year_total t_s_secyear, year_total t_c_firstyear,
      year_total t_c_secyear, year_total t_w_firstyear, year_total t_w_secyear
 WHERE t_s_secyear.customer_id = t_s_firstyear.customer_id
   and t_s_firstyear.customer_id = t_c_secyear.customer_id
   and t_s_firstyear.customer_id = t_c_firstyear.customer_id
   and t_s_firstyear.customer_id = t_w_firstyear.customer_id
   and t_s_firstyear.customer_id = t_w_secyear.customer_id
   and t_s_firstyear.sale_type = 's'
   and t_c_firstyear.sale_type = 'c'
   and t_w_firstyear.sale_type = 'w'
   and t_s_secyear.sale_type = 's'
   and t_c_secyear.sale_type = 'c'
   and t_w_secyear.sale_type = 'w'
   and t_s_firstyear.dyear = 2001
   and t_s_secyear.dyear = 2001+1
   and t_c_firstyear.dyear = 2001
   and t_c_secyear.dyear = 2001+1
   and t_w_firstyear.dyear = 2001
   and t_w_secyear.dyear = 2001+1
   and t_s_firstyear.year_total > 0
   and t_c_firstyear.year_total > 0
   and t_w_firstyear.year_total > 0
   and case when t_c_firstyear.year_total > 0 then t_c_secyear.year_total / t_c_firstyear.year_total else null end
           > case when t_s_firstyear.year_total > 0 then t_s_secyear.year_total / t_s_firstyear.year_total else null end
   and case when t_c_firstyear.year_total > 0 then t_c_secyear.year_total / t_c_firstyear.year_total else null end
           > case when t_w_firstyear.year_total > 0 then t_w_secyear.year_total / t_w_firstyear.year_total else null end
 ORDER BY
   t_s_secyear.customer_id,
   t_s_secyear.customer_first_name,
   t_s_secyear.customer_last_name,
   t_s_secyear.customer_preferred_cust_flag
 LIMIT 100
            

/hadoop/mkumar/gazelle_source/gazelle_plugin/arrow-data-source/script/build/arrow_ep/cpp/src/gandiva/cache.cc:42: Creating gandiva cache with capacity: 500
/hadoop/mkumar/gazelle_source/gazelle_plugin/arrow-data-source/script/build/arrow_ep/cpp/src/gandiva/engine.cc:109: Detected CPU Name : skylake-avx512
/hadoop/mkumar/gazelle_source/gazelle_plugin/arrow-data-source/script/build/arrow_ep/cpp/src/gandiva/engine.cc:110: Detected CPU Features: +sse2 +cx16 +sahf -tbm -avx512ifma -sha -gfni -fma4 -vpclmulqdq +prfchw +bmi2 -cldemote +fsgsbase -ptwrite +xsavec +popcnt +aes -avx512bitalg -movdiri +xsaves -avx512er -avx512vnni -avx512vpopcntdq -pconfig +clwb +avx512f -clzero +pku +mmx -lwp -rdpid -xop +rdseed -waitpkg -movdir64b -sse4a +avx512bw +clflushopt +xsave -avx512vbmi2 +avx512vl +invpcid +avx512cd +avx -vaes +rtm +fma +bmi +rdrnd -mwaitx +sse4.1 +sse4.2 +avx2 -wbnoinvd +sse +lzcnt +pclmul -prefetchwt1 +f16c +ssse3 -sgx -shstk +cmov -avx512vbmi +movbe +xsaveopt +avx512dq +adx -avx512pf +sse3
/hadoop/tmp/spark_columnar_plugin_7969435093693454778/nativesql_include/precompile/wscgapi.hpp:1:9: warning: #pragma once in main file
 #pragma once
         ^~~~
2022-09-13 11:41:40,282 ERROR scheduler.TaskSetManager: Task 12 in stage 22.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 22.0 failed 4 times, most recent failure: Lost task 12.3 in stage 22.0 (TID 11324) (localhost executor 1): java.lang.RuntimeException: nativeProcessAndCache: ResultIterator process next failed - ActionBase Evaluate is abstract.
        at com.intel.oap.vectorized.BatchIterator.nativeProcessAndCacheOne(Native Method)
        at com.intel.oap.vectorized.BatchIterator.processAndCacheOne(BatchIterator.java:190)
        at com.intel.oap.vectorized.BatchIterator.processAndCacheOne(BatchIterator.java:157)
        at com.intel.oap.execution.ColumnarWholeStageCodegenExec$$anon$2.process(ColumnarWholeStageCodegenExec.scala:510)
        at com.intel.oap.execution.ColumnarWholeStageCodegenExec$$anon$2.hasNext(ColumnarWholeStageCodegenExec.scala:518)
        at com.intel.oap.vectorized.CloseableColumnBatchIterator.hasNext(CloseableColumnBatchIterator.scala:47)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:96)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.RuntimeException: nativeProcessAndCache: ResultIterator process next failed - ActionBase Evaluate is abstract.
        at com.intel.oap.vectorized.BatchIterator.nativeProcessAndCacheOne(Native Method)
        at com.intel.oap.vectorized.BatchIterator.processAndCacheOne(BatchIterator.java:190)
        at com.intel.oap.vectorized.BatchIterator.processAndCacheOne(BatchIterator.java:157)
        at com.intel.oap.execution.ColumnarWholeStageCodegenExec$$anon$2.process(ColumnarWholeStageCodegenExec.scala:510)
        at com.intel.oap.execution.ColumnarWholeStageCodegenExec$$anon$2.hasNext(ColumnarWholeStageCodegenExec.scala:518)
        at com.intel.oap.vectorized.CloseableColumnBatchIterator.hasNext(CloseableColumnBatchIterator.scala:47)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:96)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Time taken: 62.793 seconds
spark-sql> 

Manoj-red-hat avatar Sep 13 '22 11:09 Manoj-red-hat

@zhouyuan any idea why i am not able to run latest main branch on my system ? Getting above error

Manoj-red-hat avatar Sep 16 '22 14:09 Manoj-red-hat

@PHILO-HE @zhouyuan wscg codegen file is empty image

Manoj-red-hat avatar Sep 16 '22 14:09 Manoj-red-hat

Hi @PHILO-HE , @zhouyuan,

Strangely the solution is clean all tmp folder restart spark and magically the problem will be resolved 👍

Manoj-red-hat avatar Sep 16 '22 14:09 Manoj-red-hat

Thanks for the feedback! It looks like inconsistent code issue.

PHILO-HE avatar Sep 20 '22 03:09 PHILO-HE