[SPARK-47050][SQL] Collect and publish partition level metrics
We currently capture metrics which include the number of files, bytes and rows for a task along with the updated partitions. This change captures metrics for each updated partition, reporting the partition sub-paths along with the number of files, bytes, and rows per partition for each task.
What changes were proposed in this pull request?
For Spark V1
- Update the
WriteTaskStatsTrackerimplementation to associate a partition with the file during writing, and to track the number of rows written to each file. The final stats now include a map of partitions and the associated partition stats - Update the
WriteJobStatsTrackerimplementation to capture the partition subpaths and to publish a new Event to the listener bus. The processed stats aggregate the statistics for each partition - Posting to the listener bus a
SparkListenerSQLPartitionMetricsevent is handled outside the writer
For Spark V2
- Writer would need to implement
PartitionMetricsCollector - Implementing writers are provided with a
PartitionMetricsWriteInfoto aggregate the information - Posting to the listener bus a
SparkListenerSQLPartitionMetricsevent is handled outside the writer
Why are the changes needed?
This increases our understanding of written data by tracking the impact for each task on our datasets
Does this PR introduce any user-facing change?
This makes partition-level data accessible through a new event.
How was this patch tested?
In addition to the new unit tests, this was run in a Kubernetes environment writing tables with differing partitioning strategies and validating the reported stats. Unit tests using both InsertIntoHadoopFsRelationCommand and InsertIntoHiveTable now verify partition stats when dynamic partitioning is enabled. We also verified that the aggregated partition metrics matched the existing metrics for number of files, bytes, and rows.
This was also tested with updates to Iceberg to verify that Iceberg batch writes could also capture and publish these metrics.
Was this patch authored or co-authored using generative AI tooling?
No
cc @sunchao @dongjoon-hyun @huaxingao
Thank you for pinging me, @viirya . Ya, the last commit seems to pass CI.
Also, cc @cloud-fan , @MaxGekk , @Ngone51 , @HyukjinKwon , too.
@cloud-fan Any thoughts after the changes? I've tried to move data source specific changes into the implementation classes. I understand that the set of covered cases is entirely append related at the moment, but the overall goal is to expand this to more complex scenarios is follow-up PRs.
Gently pinging @cloud-fan on this. Thanks.
Can we split this PR into two? IIUC the DS v1 change can benefit file source tables immediately if spark.sql.statistics.size.autoUpdate.enabled is enabled. For the DS v2 part, do we support table level statistics collecting during writing today?
Can we split this PR into two? IIUC the DS v1 change can benefit file source tables immediately if
spark.sql.statistics.size.autoUpdate.enabledis enabled. For the DS v2 part, do we support table level statistics collecting during writing today?
That settings reports on the updates to the table, not at the granular level of the individual partition. This is referenced in https://issues.apache.org/jira/browse/SPARK-33825