[SPARK-49038][SQL] Fix regression in Spark UI SQL operator metrics calculation to filter out invalid accumulator values correctly
What changes were proposed in this pull request?
This patch fixes an issue in the driver hosted Spark UI SQL tab DAG view where the invalid SQL metric values are not filtered out correctly and hence showing incorrect minimum and median metric values in the UI. This regression got introduced in #39311 .
Why are the changes needed?
SIZE, TIMING and NS_TIMING metrics are created with initial value -1 (given 0 is a valid metric value for them). The SQLMetrics.stringValue method filters out the invalid values using condition: value >= 0 before calculating the min, med and max values. But #39311 introduced in Spark 3.4.0 introduced a regression where the SQLMetric.value is always >= 0. This means that the invalid accumulators with value -1 are no longer invalid to get filtered out correctly. This needs to be fixed.
Does this PR introduce any user-facing change?
Yes, as end users can access accumulator values directly. Users accessing the values in the physical plan programmatically should use SQLMetric.isZero before consuming its value.
How was this patch tested?
Existing tests; Created new jar for Spark 3.5.1 and confirms that the incorrect data is shown correctly in Spark UI now.
Old UI view: old_spark_ui_view_3_5_1.pdf
Fixed UI view: new_spark_ui_view_3_5_1.pdf
Was this patch authored or co-authored using generative AI tooling?
No
@dongjoon-hyun could you please review - thanks!
Sure, @abmodi and @virrrat .
Do we have a test case to demonstrate the issue? AFAIK Spark filters out 0-value accumulators at the executor side.
Do we have a test case to demonstrate the issue? AFAIK Spark filters out 0-value accumulators at the executor side.
SQLMetrics.stringValue has different logic for different metric types to filter out invalid accumulator values.
- For
AVERAGE_METRICzero and negative values are filtered out here. - For
SIZE,TIMINGandNS_TIMING, negative values are filtered out here. Given zero is a valid metric value for them, those are not filtered out.
So the change made in #39311 basically converts invalid -1 accumulator value to a valid 0 value which isn't getting filtered out now, resulting in wrong min and median values for size and time metrics. While the max and cumulative value still matches.
I think there is no existing test for this, that's why it was never caught. Let me see if I can add one to demonstrate the issue.
Just to note, SIZE, TIMING and NS_TIMING metrics are created with initial value -1.
Do we have a test case to demonstrate the issue?
Added the test @cloud-fan
I'm not convinced by the added test, as it calls SQLMetrics.stringValue directly. In reality, the values parameter of this method comes from accumulator values from spark events. Are you sure it may contain -1?
I don't see any problem with the current SQLMetric framework. Let me give a summary here:
SQLMetrichas an initial value. When executing the SQL operator on the worker side, and the metrics are not updated at all, the metrics value will remain as the initial value and be filtered and not sent back to the driver.- The initial value is not always 0, as 0 can be valid as well, for max/min stats.
- UI is not the only way to access the metrics. People can access
SQLMetricinstances in physical plan programmatically. We need to hide-1as it's an internal value for filtering metrics values at the worker side.
Are you sure it may contain -1?
You can run the tpc-ds benchmark q1 and check the SQL tab DAG views in driver hosted Spark UI. The minimum and median values are not correct for multiple operator metrics and they don't match with history server values either (as pointed out by @abmodi in the JIRA ticket). For incorrect metric cases, the minimum value is always zero while the median value is always less than the actual median value. The reason for this is that there are extra zeros in the values parameter of SQLMetrics.stringValue method which were actually -1 and should have filtered out.
I'm not a spark expert but given SQLMetrics.stringValue method has this logic to fetch validValues itself indicates that we expect -1 in the values parameter.
People can access SQLMetric instances in physical plan programmatically.
Agree. If users are consuming values directly from listener, they may use SQLMetric.isZero before consuming it. We can't compromise the integrity of the SQLMetric.value just for this given it is causing regression in the data shown in Spark UI. Either we need to expose two different variables, one to be used for Spark UI metric data calculation and other to be used by users programmatically.
For incorrect metric cases, the minimum value is always zero while the median value is always less than the actual median value.
As I explained earlier, this should not happen as we will filter out -1 values at the executor side. So the 0 values in the UI may be valid values from certain tasks. Do you have a simple repro (end-to-end query) to trigger this bug?
Do you have a simple repro (end-to-end query) to trigger this bug?
Can you please use the below reproducer? This is join between two tables that shuffles data. This can be run in a spark-shell.
import scala.util._
def randString() = Random.alphanumeric take 30 mkString
val x = sc.parallelize(0 until 100000, 100)
val y = sc.parallelize(100000 until 2000000, 100)
val a = x.map(x => (x,randString()))
val b = y.map(y => (y,randString()))
val df1 = spark.createDataFrame(a).toDF("col1", "col2")
val df2 = spark.createDataFrame(b).toDF("col3", "col4")
df1.createOrReplaceTempView("t1")
df2.createOrReplaceTempView("t2")
spark.sql("select * from t1, t2 where t1.col1 = t2.col3").collect
Attaching screenshots, data in spark UI is not correct and it doesn't match between spark UI and history server for Spark 3.5.0. Data in spark UI for Spark 3.3.2 is correct.
3.5.0 Spark UI: spark_ui_350.pdf
3.5.0 History Server: history_server_350.pdf
3.3.2 Spark UI: spark_ui_332.pdf
@cloud-fan waiting for your response to unblock the review - thanks!
Can you please use the below reproducer? This is join between two tables that shuffles data. This can be run in a spark-shell.
@cloud-fan were you able to reproduce the issue? It is a very simple scenario that reproduces the issue for spark 3.4.0 onwards. The issue wasn't there in the previous versions.
cc @abmodi and @dongjoon-hyun too.
I confirmed that the bug exists. I was wrong about executor side accumulator updates filtering. We only filter out zero values for task metrics, but not SQL metrics.
But I don't think this PR is the right fix as it makes SQLMetric#value to return the wrong result. I've proposed a different fix: https://github.com/apache/spark/pull/47721
Thanks for checking and confirming! Actually I was to able to track down that the bug started after #39311 only and hence I proposed this fix. But If there is a better way to fix this bug, I am fine with it as long as we are fixing it.
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!