[SPARK-52024][SQL] Support cancel ShuffleQueryStage when propagate empty relations
What changes were proposed in this pull request?
a.This pr introduce cancel queryStage mechanism.
- Cancel shuffle queryStage only When it's not reused nor reusing others
- Mark the cancellable queryStage when it's not completed yet in reOptimize stage
- When cancellable queryStage is determined to be cancelled by costEvaluator(cancellable queryStage in
EmptyPropagateshould always be cancelled), the queryStage has no relation with current logical plan nor physical plan any more - Cancel the queryStage after the corresponding job is completed, will effect nothing
- Cancel the queryStage when the corresponding job is running, the job will be cancelled
- Cancel the queryStage before he corresponding job is started, the job corresponding this queryStage will still be submitted, this case should be optimized in future commit, however it's not worse than before this pr
b.Apply this mechanism to AQEPropagateEmptyRelation, cancel the running queryStages which are unnecessary since propagate empty relations.
Why are the changes needed?
- Cancel queryStage mechanism can make AQE more flexible, we can add more CBO feature by using this mechanism after this pr merged.
- Tasks corresponding to unnecessary running queryStages occupy the executor cores, thus wasting compute resource
Does this PR introduce any user-facing change?
Yes, user will see stage failure because of optimized stage cancellation , but this failure takes no effect to the query result
How was this patch tested?
Manual test, since we can not guarantee the completion order of query stages, it's not reliable to put it in unit test
./bin/spark-shell --master local[4]
scala> case class TestData(key: Int, value: String)
defined class TestData
scala> case class TestData2(a: Int, b: Int)
defined class TestData2
scala> spark.sparkContext.parallelize(Seq.empty[Int].map(i => TestData(i, i.toString))).toDF().createOrReplaceTempView("emptyTestData")
scala> spark.sparkContext.parallelize((1 to 100).map(i => TestData(i, i.toString))).toDF().createOrReplaceTempView("testData")
scala> spark.sparkContext.parallelize(TestData2(1, 1) ::TestData2(1, 2) ::TestData2(2, 1) ::TestData2(2, 2) ::TestData2(3, 1) ::TestData2(3, 2) :: Nil,2).toDF().createOrReplaceTempView("testData2")
scala> spark.udf.register("fake_udf", (input: Int) => {
| Thread.sleep(100)
| input
| })
scala> spark.sql("SELECT t.key1 FROM emptyTestData join (SELECT testData.key as key1 FROM testData join testData2 ON fake_udf(testData.key)=fake_udf(testData2.a) ) t on t.key1 = emptyTestData.key union SELECT testData.key FROM testData join testData2 ON testData.key=testData2.a ").collect
before this pr
after this pr
Was this patch authored or co-authored using generative AI tooling?
No.
@cloud-fan @LuciferYang @panbingkun PTAL
We should make GA green first
what if the shuffle stage is being reused by other places? I don't think we can cancel it without a ref counting to prove it's not used anywhere.
what if the shuffle stage is being reused by other places? I don't think we can cancel it without a ref counting to prove it's not used anywhere.
Appreciate your time to review, could you help to explain more detail about 'reused by other places', currently i use AdaptiveExecutionContext#stageReuse to record the reuse cases, which similar to a ref counting.
I think it's more complicated than what you implemented now: what if the reuse stage is completed before we want to cancel the original stage? What if the reuse stage has not started yet? It's really a complicate algorithm with shuffle reuse being considered, can we explain it in detail in the PR description?
Fine, update the PR description.
1. Cancel shuffle queryStage only When it's not reused nor reusing others
2. Mark the cancellable queryStage when it's not completed yet in reOptimize stage
3. When cancellable queryStage is determined to be cancelled by costEvaluator(cancellable queryStage in `EmptyPropagate` should always be cancelled), the queryStage has no relation with current logical plan nor physical plan any more
4. Cancel the queryStage after the corresponding job is completed, will effect nothing
5. Cancel the queryStage when the corresponding job is running, the job will be cancelled
6. Cancel the queryStage before he corresponding job is started, the job will still be submitted, this case should be optimized in future commit, however it's not worse than before this pr
Is there a possibility for this PR to progress further?
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!