spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-49038][SQL] Fix regression in Spark UI SQL operator metrics calculation to filter out invalid accumulator values correctly

Open virrrat opened this issue 1 year ago • 15 comments

What changes were proposed in this pull request?

This patch fixes an issue in the driver hosted Spark UI SQL tab DAG view where the invalid SQL metric values are not filtered out correctly and hence showing incorrect minimum and median metric values in the UI. This regression got introduced in #39311 .

Why are the changes needed?

SIZE, TIMING and NS_TIMING metrics are created with initial value -1 (given 0 is a valid metric value for them). The SQLMetrics.stringValue method filters out the invalid values using condition: value >= 0 before calculating the min, med and max values. But #39311 introduced in Spark 3.4.0 introduced a regression where the SQLMetric.value is always >= 0. This means that the invalid accumulators with value -1 are no longer invalid to get filtered out correctly. This needs to be fixed.

Does this PR introduce any user-facing change?

Yes, as end users can access accumulator values directly. Users accessing the values in the physical plan programmatically should use SQLMetric.isZero before consuming its value.

How was this patch tested?

Existing tests; Created new jar for Spark 3.5.1 and confirms that the incorrect data is shown correctly in Spark UI now.

Old UI view: old_spark_ui_view_3_5_1.pdf

Fixed UI view: new_spark_ui_view_3_5_1.pdf

Was this patch authored or co-authored using generative AI tooling?

No

virrrat avatar Jul 29 '24 10:07 virrrat

@dongjoon-hyun could you please review - thanks!

abmodi avatar Jul 31 '24 06:07 abmodi

Sure, @abmodi and @virrrat .

dongjoon-hyun avatar Jul 31 '24 06:07 dongjoon-hyun

Do we have a test case to demonstrate the issue? AFAIK Spark filters out 0-value accumulators at the executor side.

cloud-fan avatar Jul 31 '24 08:07 cloud-fan

Do we have a test case to demonstrate the issue? AFAIK Spark filters out 0-value accumulators at the executor side.

SQLMetrics.stringValue has different logic for different metric types to filter out invalid accumulator values.

  1. For AVERAGE_METRIC zero and negative values are filtered out here.
  2. For SIZE, TIMING and NS_TIMING, negative values are filtered out here. Given zero is a valid metric value for them, those are not filtered out.

So the change made in #39311 basically converts invalid -1 accumulator value to a valid 0 value which isn't getting filtered out now, resulting in wrong min and median values for size and time metrics. While the max and cumulative value still matches.

I think there is no existing test for this, that's why it was never caught. Let me see if I can add one to demonstrate the issue.

virrrat avatar Jul 31 '24 11:07 virrrat

Just to note, SIZE, TIMING and NS_TIMING metrics are created with initial value -1.

virrrat avatar Jul 31 '24 11:07 virrrat

Do we have a test case to demonstrate the issue?

Added the test @cloud-fan

virrrat avatar Jul 31 '24 17:07 virrrat

I'm not convinced by the added test, as it calls SQLMetrics.stringValue directly. In reality, the values parameter of this method comes from accumulator values from spark events. Are you sure it may contain -1?

cloud-fan avatar Aug 01 '24 02:08 cloud-fan

I don't see any problem with the current SQLMetric framework. Let me give a summary here:

  1. SQLMetric has an initial value. When executing the SQL operator on the worker side, and the metrics are not updated at all, the metrics value will remain as the initial value and be filtered and not sent back to the driver.
  2. The initial value is not always 0, as 0 can be valid as well, for max/min stats.
  3. UI is not the only way to access the metrics. People can access SQLMetric instances in physical plan programmatically. We need to hide -1 as it's an internal value for filtering metrics values at the worker side.

cloud-fan avatar Aug 01 '24 02:08 cloud-fan

Are you sure it may contain -1?

You can run the tpc-ds benchmark q1 and check the SQL tab DAG views in driver hosted Spark UI. The minimum and median values are not correct for multiple operator metrics and they don't match with history server values either (as pointed out by @abmodi in the JIRA ticket). For incorrect metric cases, the minimum value is always zero while the median value is always less than the actual median value. The reason for this is that there are extra zeros in the values parameter of SQLMetrics.stringValue method which were actually -1 and should have filtered out.

I'm not a spark expert but given SQLMetrics.stringValue method has this logic to fetch validValues itself indicates that we expect -1 in the values parameter.

People can access SQLMetric instances in physical plan programmatically.

Agree. If users are consuming values directly from listener, they may use SQLMetric.isZero before consuming it. We can't compromise the integrity of the SQLMetric.value just for this given it is causing regression in the data shown in Spark UI. Either we need to expose two different variables, one to be used for Spark UI metric data calculation and other to be used by users programmatically.

virrrat avatar Aug 01 '24 07:08 virrrat

For incorrect metric cases, the minimum value is always zero while the median value is always less than the actual median value.

As I explained earlier, this should not happen as we will filter out -1 values at the executor side. So the 0 values in the UI may be valid values from certain tasks. Do you have a simple repro (end-to-end query) to trigger this bug?

cloud-fan avatar Aug 01 '24 14:08 cloud-fan

Do you have a simple repro (end-to-end query) to trigger this bug?

Can you please use the below reproducer? This is join between two tables that shuffles data. This can be run in a spark-shell.

import scala.util._

def randString() = Random.alphanumeric take 30 mkString

val x = sc.parallelize(0 until 100000, 100)
val y = sc.parallelize(100000 until 2000000, 100)

val a = x.map(x => (x,randString()))
val b = y.map(y => (y,randString()))

val df1 = spark.createDataFrame(a).toDF("col1", "col2")
val df2 = spark.createDataFrame(b).toDF("col3", "col4")

df1.createOrReplaceTempView("t1")
df2.createOrReplaceTempView("t2")


spark.sql("select * from t1, t2 where t1.col1 = t2.col3").collect

Attaching screenshots, data in spark UI is not correct and it doesn't match between spark UI and history server for Spark 3.5.0. Data in spark UI for Spark 3.3.2 is correct.

3.5.0 Spark UI: spark_ui_350.pdf 3.5.0 History Server: history_server_350.pdf 3.3.2 Spark UI: spark_ui_332.pdf

virrrat avatar Aug 02 '24 16:08 virrrat

@cloud-fan waiting for your response to unblock the review - thanks!

virrrat avatar Aug 07 '24 09:08 virrrat

Can you please use the below reproducer? This is join between two tables that shuffles data. This can be run in a spark-shell.

@cloud-fan were you able to reproduce the issue? It is a very simple scenario that reproduces the issue for spark 3.4.0 onwards. The issue wasn't there in the previous versions.

cc @abmodi and @dongjoon-hyun too.

virrrat avatar Aug 12 '24 07:08 virrrat

I confirmed that the bug exists. I was wrong about executor side accumulator updates filtering. We only filter out zero values for task metrics, but not SQL metrics.

But I don't think this PR is the right fix as it makes SQLMetric#value to return the wrong result. I've proposed a different fix: https://github.com/apache/spark/pull/47721

cloud-fan avatar Aug 12 '24 14:08 cloud-fan

Thanks for checking and confirming! Actually I was to able to track down that the bug started after #39311 only and hence I proposed this fix. But If there is a better way to fix this bug, I am fine with it as long as we are fixing it.

virrrat avatar Aug 12 '24 16:08 virrrat

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

github-actions[bot] avatar Dec 04 '24 00:12 github-actions[bot]