spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-47050][SQL] Collect and publish partition level metrics

Open snmvaughan opened this issue 1 year ago • 7 comments

We currently capture metrics which include the number of files, bytes and rows for a task along with the updated partitions. This change captures metrics for each updated partition, reporting the partition sub-paths along with the number of files, bytes, and rows per partition for each task.

What changes were proposed in this pull request?

For Spark V1

  1. Update the WriteTaskStatsTracker implementation to associate a partition with the file during writing, and to track the number of rows written to each file. The final stats now include a map of partitions and the associated partition stats
  2. Update the WriteJobStatsTracker implementation to capture the partition subpaths and to publish a new Event to the listener bus. The processed stats aggregate the statistics for each partition
  3. Posting to the listener bus a SparkListenerSQLPartitionMetrics event is handled outside the writer

For Spark V2

  1. Writer would need to implement PartitionMetricsCollector
  2. Implementing writers are provided with a PartitionMetricsWriteInfo to aggregate the information
  3. Posting to the listener bus a SparkListenerSQLPartitionMetrics event is handled outside the writer

Why are the changes needed?

This increases our understanding of written data by tracking the impact for each task on our datasets

Does this PR introduce any user-facing change?

This makes partition-level data accessible through a new event.

How was this patch tested?

In addition to the new unit tests, this was run in a Kubernetes environment writing tables with differing partitioning strategies and validating the reported stats. Unit tests using both InsertIntoHadoopFsRelationCommand and InsertIntoHiveTable now verify partition stats when dynamic partitioning is enabled. We also verified that the aggregated partition metrics matched the existing metrics for number of files, bytes, and rows.

This was also tested with updates to Iceberg to verify that Iceberg batch writes could also capture and publish these metrics.

Was this patch authored or co-authored using generative AI tooling?

No

snmvaughan avatar Feb 15 '24 15:02 snmvaughan

cc @sunchao @dongjoon-hyun @huaxingao

viirya avatar Mar 18 '24 19:03 viirya

Thank you for pinging me, @viirya . Ya, the last commit seems to pass CI.

dongjoon-hyun avatar Mar 18 '24 19:03 dongjoon-hyun

Also, cc @cloud-fan , @MaxGekk , @Ngone51 , @HyukjinKwon , too.

dongjoon-hyun avatar Mar 18 '24 19:03 dongjoon-hyun

@cloud-fan Any thoughts after the changes? I've tried to move data source specific changes into the implementation classes. I understand that the set of covered cases is entirely append related at the moment, but the overall goal is to expand this to more complex scenarios is follow-up PRs.

snmvaughan avatar Apr 03 '24 20:04 snmvaughan

Gently pinging @cloud-fan on this. Thanks.

dbtsai avatar Apr 04 '24 18:04 dbtsai

Can we split this PR into two? IIUC the DS v1 change can benefit file source tables immediately if spark.sql.statistics.size.autoUpdate.enabled is enabled. For the DS v2 part, do we support table level statistics collecting during writing today?

cloud-fan avatar Apr 08 '24 04:04 cloud-fan

Can we split this PR into two? IIUC the DS v1 change can benefit file source tables immediately if spark.sql.statistics.size.autoUpdate.enabled is enabled. For the DS v2 part, do we support table level statistics collecting during writing today?

That settings reports on the updates to the table, not at the granular level of the individual partition. This is referenced in https://issues.apache.org/jira/browse/SPARK-33825

snmvaughan avatar Jun 06 '24 15:06 snmvaughan