gazelle_plugin
gazelle_plugin copied to clipboard
[TPCDS Q4] Wrong Answer on 300G TPCDS partitioned Data v1.4.0
Describe the bug [TPCDS Q4] Wrong Answer on 300G TPCDS partitioned Data v1.4.0
To Reproduce
spark.serializer org.apache.spark.serializer.KryoSerializer
# gazelle and oap parameters
spark.sql.execution.arrow.maxRecordsPerBatch 64512
spark.sql.parquet.columnarReaderBatchSize 64512
spark.sql.inMemoryColumnarStorage.batchSize 64512
spark.oap.sql.columnar.tmp_dir /hadoop/tmp
# shuffle and joins
spark.shuffle.manager org.apache.spark.shuffle.sort.ColumnarShuffleManager
spark.sql.shuffle.partitions 90
spark.sql.join.preferSortMergeJoin false
spark.sql.dynamicPartitionPruning.enabled true
spark.sql.optimizer.dynamicPartitionPruning.enforceBroadcastReuse true
spark.sql.adaptive.advisoryPartitionSizeInBytes 100M
spark.sql.autoBroadcastJoinThreshold 100M
spark.sql.files.maxPartitionBytes 500M
spark.oap.sql.columnar.shuffle.preferSpil false
spark.oap.sql.columnar.wholestagecodegen.breakdownTime false
If we set spark.sql.join.preferSortMergeJoin=true we get correct answer else wrong
spark-sql> set spark.sql.join.preferSortMergeJoin;
set spark.sql.join.preferSortMergeJoin
spark.sql.join.preferSortMergeJoin true
Time taken: 0.019 seconds, Fetched 1 row(s)
Expected behavior Answer should be corrrect in bo
Additional context
Add any other context about the problem here.
[Correct Plan]
q4_correct.pdf
[Wrong plan]
wrong_q4_plan.pdf
Hi @weiting-chen @PHILO-HE
[Analysis]
spark.sql.join.preferSortMergeJoin
When we enable above setting, it will favour shuffled hash join. So before that it try to coalesce shuffled batches. And it seems issue is durning coalesce operator, it was not honouring max_batch size limit which is 64512.
As a result uint16 limit durning join get hit and produced wrong result
Please validate above analysis and let me know if something else is required
Q6 also impacted looks multiple queries impacted by this coalesce issue
@PHILO-HE Could you please validate my analysis ?
@Manoj-red-hat, sorry for the late reply. We noted this bug and has a patch to display the warning message in main branch. See link. Could you have a try with main branch? @zhixingheyi-tian, please leave your comment if needed.
Hi @PHILO-HE, @weiting-chen , I get your point about the warning message.
In my opinion there will be only 2 fix
- Remove limit of 64k on batch size, but again for this we need to support disk based sorting or the Memory requirement will increase
- Put a limit on coalesce operator, that it should always produce batches less than 64k.
Above 2 only give us proper solution. If you want we can discuss on this and I can fix this issue and will raise PR for you guys
@Manoj-red-hat, thanks for your feedback! Could you please try again with main branch? I was told this issue has been fixed in main branch. BTW, code contribution is always welcome! Thanks!
@PHILO-HE I am on it will update you.
I more point could you please update on driver side hash table build
Because of driver side issue some querries getting 10X slower compared to vanila spark
@PHILO-HE query working fine on Q4 using main branch, I will do complete testing across multiple querries (TPCDS), once all validated. I will close this issue
Nice to hear that. Thanks for your feedback!
@PHILO-HE
When I compiled with recent main branch I get jar gazelle-plugin-1.5.0-SNAPSHOT-spark-3.2.1.jar.
When I try to run that, am getting following error.
Could you please help me in getting this run ?
spark-sql> source /shared/tpcds/q04.sql;
WITH year_total AS (
SELECT c_customer_id customer_id,
c_first_name customer_first_name,
c_last_name customer_last_name,
c_preferred_cust_flag customer_preferred_cust_flag,
c_birth_country customer_birth_country,
c_login customer_login,
c_email_address customer_email_address,
d_year dyear,
sum(((ss_ext_list_price-ss_ext_wholesale_cost-ss_ext_discount_amt)+ss_ext_sales_price)/2) year_total,
's' sale_type
FROM customer, store_sales, date_dim
WHERE c_customer_sk = ss_customer_sk AND ss_sold_date_sk = d_date_sk
GROUP BY c_customer_id,
c_first_name,
c_last_name,
c_preferred_cust_flag,
c_birth_country,
c_login,
c_email_address,
d_year
UNION ALL
SELECT c_customer_id customer_id,
c_first_name customer_first_name,
c_last_name customer_last_name,
c_preferred_cust_flag customer_preferred_cust_flag,
c_birth_country customer_birth_country,
c_login customer_login,
c_email_address customer_email_address,
d_year dyear,
sum((((cs_ext_list_price-cs_ext_wholesale_cost-cs_ext_discount_amt)+cs_ext_sales_price)/2) ) year_total,
'c' sale_type
FROM customer, catalog_sales, date_dim
WHERE c_customer_sk = cs_bill_customer_sk AND cs_sold_date_sk = d_date_sk
GROUP BY c_customer_id,
c_first_name,
c_last_name,
c_preferred_cust_flag,
c_birth_country,
c_login,
c_email_address,
d_year
UNION ALL
SELECT c_customer_id customer_id
,c_first_name customer_first_name
,c_last_name customer_last_name
,c_preferred_cust_flag customer_preferred_cust_flag
,c_birth_country customer_birth_country
,c_login customer_login
,c_email_address customer_email_address
,d_year dyear
,sum((((ws_ext_list_price-ws_ext_wholesale_cost-ws_ext_discount_amt)+ws_ext_sales_price)/2) ) year_total
,'w' sale_type
FROM customer, web_sales, date_dim
WHERE c_customer_sk = ws_bill_customer_sk AND ws_sold_date_sk = d_date_sk
GROUP BY c_customer_id,
c_first_name,
c_last_name,
c_preferred_cust_flag,
c_birth_country,
c_login,
c_email_address,
d_year)
SELECT
t_s_secyear.customer_id,
t_s_secyear.customer_first_name,
t_s_secyear.customer_last_name,
t_s_secyear.customer_preferred_cust_flag
FROM year_total t_s_firstyear, year_total t_s_secyear, year_total t_c_firstyear,
year_total t_c_secyear, year_total t_w_firstyear, year_total t_w_secyear
WHERE t_s_secyear.customer_id = t_s_firstyear.customer_id
and t_s_firstyear.customer_id = t_c_secyear.customer_id
and t_s_firstyear.customer_id = t_c_firstyear.customer_id
and t_s_firstyear.customer_id = t_w_firstyear.customer_id
and t_s_firstyear.customer_id = t_w_secyear.customer_id
and t_s_firstyear.sale_type = 's'
and t_c_firstyear.sale_type = 'c'
and t_w_firstyear.sale_type = 'w'
and t_s_secyear.sale_type = 's'
and t_c_secyear.sale_type = 'c'
and t_w_secyear.sale_type = 'w'
and t_s_firstyear.dyear = 2001
and t_s_secyear.dyear = 2001+1
and t_c_firstyear.dyear = 2001
and t_c_secyear.dyear = 2001+1
and t_w_firstyear.dyear = 2001
and t_w_secyear.dyear = 2001+1
and t_s_firstyear.year_total > 0
and t_c_firstyear.year_total > 0
and t_w_firstyear.year_total > 0
and case when t_c_firstyear.year_total > 0 then t_c_secyear.year_total / t_c_firstyear.year_total else null end
> case when t_s_firstyear.year_total > 0 then t_s_secyear.year_total / t_s_firstyear.year_total else null end
and case when t_c_firstyear.year_total > 0 then t_c_secyear.year_total / t_c_firstyear.year_total else null end
> case when t_w_firstyear.year_total > 0 then t_w_secyear.year_total / t_w_firstyear.year_total else null end
ORDER BY
t_s_secyear.customer_id,
t_s_secyear.customer_first_name,
t_s_secyear.customer_last_name,
t_s_secyear.customer_preferred_cust_flag
LIMIT 100
/hadoop/mkumar/gazelle_source/gazelle_plugin/arrow-data-source/script/build/arrow_ep/cpp/src/gandiva/cache.cc:42: Creating gandiva cache with capacity: 500
/hadoop/mkumar/gazelle_source/gazelle_plugin/arrow-data-source/script/build/arrow_ep/cpp/src/gandiva/engine.cc:109: Detected CPU Name : skylake-avx512
/hadoop/mkumar/gazelle_source/gazelle_plugin/arrow-data-source/script/build/arrow_ep/cpp/src/gandiva/engine.cc:110: Detected CPU Features: +sse2 +cx16 +sahf -tbm -avx512ifma -sha -gfni -fma4 -vpclmulqdq +prfchw +bmi2 -cldemote +fsgsbase -ptwrite +xsavec +popcnt +aes -avx512bitalg -movdiri +xsaves -avx512er -avx512vnni -avx512vpopcntdq -pconfig +clwb +avx512f -clzero +pku +mmx -lwp -rdpid -xop +rdseed -waitpkg -movdir64b -sse4a +avx512bw +clflushopt +xsave -avx512vbmi2 +avx512vl +invpcid +avx512cd +avx -vaes +rtm +fma +bmi +rdrnd -mwaitx +sse4.1 +sse4.2 +avx2 -wbnoinvd +sse +lzcnt +pclmul -prefetchwt1 +f16c +ssse3 -sgx -shstk +cmov -avx512vbmi +movbe +xsaveopt +avx512dq +adx -avx512pf +sse3
/hadoop/tmp/spark_columnar_plugin_7969435093693454778/nativesql_include/precompile/wscgapi.hpp:1:9: warning: #pragma once in main file
#pragma once
^~~~
2022-09-13 11:41:40,282 ERROR scheduler.TaskSetManager: Task 12 in stage 22.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 22.0 failed 4 times, most recent failure: Lost task 12.3 in stage 22.0 (TID 11324) (localhost executor 1): java.lang.RuntimeException: nativeProcessAndCache: ResultIterator process next failed - ActionBase Evaluate is abstract.
at com.intel.oap.vectorized.BatchIterator.nativeProcessAndCacheOne(Native Method)
at com.intel.oap.vectorized.BatchIterator.processAndCacheOne(BatchIterator.java:190)
at com.intel.oap.vectorized.BatchIterator.processAndCacheOne(BatchIterator.java:157)
at com.intel.oap.execution.ColumnarWholeStageCodegenExec$$anon$2.process(ColumnarWholeStageCodegenExec.scala:510)
at com.intel.oap.execution.ColumnarWholeStageCodegenExec$$anon$2.hasNext(ColumnarWholeStageCodegenExec.scala:518)
at com.intel.oap.vectorized.CloseableColumnBatchIterator.hasNext(CloseableColumnBatchIterator.scala:47)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:96)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.RuntimeException: nativeProcessAndCache: ResultIterator process next failed - ActionBase Evaluate is abstract.
at com.intel.oap.vectorized.BatchIterator.nativeProcessAndCacheOne(Native Method)
at com.intel.oap.vectorized.BatchIterator.processAndCacheOne(BatchIterator.java:190)
at com.intel.oap.vectorized.BatchIterator.processAndCacheOne(BatchIterator.java:157)
at com.intel.oap.execution.ColumnarWholeStageCodegenExec$$anon$2.process(ColumnarWholeStageCodegenExec.scala:510)
at com.intel.oap.execution.ColumnarWholeStageCodegenExec$$anon$2.hasNext(ColumnarWholeStageCodegenExec.scala:518)
at com.intel.oap.vectorized.CloseableColumnBatchIterator.hasNext(CloseableColumnBatchIterator.scala:47)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:96)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Time taken: 62.793 seconds
spark-sql>
@zhouyuan any idea why i am not able to run latest main branch on my system ? Getting above error
@PHILO-HE @zhouyuan
wscg codegen file is empty
Hi @PHILO-HE , @zhouyuan,
Strangely the solution is clean all tmp folder restart spark and magically the problem will be resolved 👍
Thanks for the feedback! It looks like inconsistent code issue.