spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-52024][SQL] Support cancel ShuffleQueryStage when propagate empty relations

Open summaryzb opened this issue 7 months ago • 6 comments

What changes were proposed in this pull request?

a.This pr introduce cancel queryStage mechanism.

  1. Cancel shuffle queryStage only When it's not reused nor reusing others
  2. Mark the cancellable queryStage when it's not completed yet in reOptimize stage
  3. When cancellable queryStage is determined to be cancelled by costEvaluator(cancellable queryStage in EmptyPropagate should always be cancelled), the queryStage has no relation with current logical plan nor physical plan any more
  4. Cancel the queryStage after the corresponding job is completed, will effect nothing
  5. Cancel the queryStage when the corresponding job is running, the job will be cancelled
  6. Cancel the queryStage before he corresponding job is started, the job corresponding this queryStage will still be submitted, this case should be optimized in future commit, however it's not worse than before this pr

b.Apply this mechanism to AQEPropagateEmptyRelation, cancel the running queryStages which are unnecessary since propagate empty relations.

Why are the changes needed?

  1. Cancel queryStage mechanism can make AQE more flexible, we can add more CBO feature by using this mechanism after this pr merged.
  2. Tasks corresponding to unnecessary running queryStages occupy the executor cores, thus wasting compute resource

Does this PR introduce any user-facing change?

Yes, user will see stage failure because of optimized stage cancellation , but this failure takes no effect to the query result

How was this patch tested?

Manual test, since we can not guarantee the completion order of query stages, it's not reliable to put it in unit test

./bin/spark-shell --master local[4]

scala> case class TestData(key: Int, value: String)
defined class TestData

scala> case class TestData2(a: Int, b: Int)
defined class TestData2

scala> spark.sparkContext.parallelize(Seq.empty[Int].map(i => TestData(i, i.toString))).toDF().createOrReplaceTempView("emptyTestData")

scala> spark.sparkContext.parallelize((1 to 100).map(i => TestData(i, i.toString))).toDF().createOrReplaceTempView("testData")

scala> spark.sparkContext.parallelize(TestData2(1, 1) ::TestData2(1, 2) ::TestData2(2, 1) ::TestData2(2, 2) ::TestData2(3, 1) ::TestData2(3, 2) :: Nil,2).toDF().createOrReplaceTempView("testData2")

scala>     spark.udf.register("fake_udf", (input: Int) => {
     |       Thread.sleep(100)
     |       input
     |     })

scala> spark.sql("SELECT t.key1 FROM emptyTestData join (SELECT testData.key as key1 FROM testData join testData2 ON fake_udf(testData.key)=fake_udf(testData2.a) ) t on t.key1 = emptyTestData.key union SELECT testData.key FROM testData join testData2 ON testData.key=testData2.a ").collect

before this pr image after this pr image

Was this patch authored or co-authored using generative AI tooling?

No.

summaryzb avatar May 07 '25 17:05 summaryzb

@cloud-fan @LuciferYang @panbingkun PTAL

summaryzb avatar May 09 '25 09:05 summaryzb

We should make GA green first

LuciferYang avatar May 12 '25 19:05 LuciferYang

what if the shuffle stage is being reused by other places? I don't think we can cancel it without a ref counting to prove it's not used anywhere.

cloud-fan avatar May 30 '25 14:05 cloud-fan

what if the shuffle stage is being reused by other places? I don't think we can cancel it without a ref counting to prove it's not used anywhere.

Appreciate your time to review, could you help to explain more detail about 'reused by other places', currently i use AdaptiveExecutionContext#stageReuse to record the reuse cases, which similar to a ref counting.

summaryzb avatar Jun 01 '25 02:06 summaryzb

I think it's more complicated than what you implemented now: what if the reuse stage is completed before we want to cancel the original stage? What if the reuse stage has not started yet? It's really a complicate algorithm with shuffle reuse being considered, can we explain it in detail in the PR description?

cloud-fan avatar Jun 02 '25 05:06 cloud-fan

Fine, update the PR description.

 1. Cancel shuffle queryStage only When it's not reused nor reusing others
 2. Mark the cancellable queryStage when it's not completed yet in reOptimize stage
 3. When cancellable queryStage is determined to be cancelled by costEvaluator(cancellable queryStage in `EmptyPropagate` should always be cancelled), the queryStage has no relation with current logical plan nor physical plan any more
 4. Cancel the queryStage after the corresponding job is completed, will effect nothing
 5. Cancel the queryStage when the corresponding job is running, the job will be cancelled
 6. Cancel the queryStage before he corresponding job is started, the job will still be submitted, this case should be optimized in future commit, however it's not worse than before this pr

summaryzb avatar Jun 02 '25 14:06 summaryzb

Is there a possibility for this PR to progress further?

LuciferYang avatar Jun 24 '25 04:06 LuciferYang

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

github-actions[bot] avatar Oct 03 '25 00:10 github-actions[bot]