datafusion
datafusion copied to clipboard
Optimization: Avoid sort for already sorted Parquet files that do not overlap values on condition
Describe the bug
I'm testing performance of querying a number of Parquet files, where I can make some assumptions about the Parquet files.
- Each Parquet file is already sorted on the column "timestamp".
- Each Parquet file does not overlap values on the column "timestamp". For instance, file A has values for timestamps for 2022, and file B has values for timestamps 2023.
The schema of the files are:
- "timestamp": TimestampMillisecond
- "value": Float64
Consider the following query and it's query plan:
SELECT timestamp, value
FROM samples
ORDER BY timestamp ASC
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | SortPreservingMergeExec: [timestamp@0 ASC], metrics=[output_rows=1000000, elapsed_compute=572.526968ms] |
| | ParquetExec: file_groups={20 groups: [[0.parquet], [1.parquet], [2.parquet], [3.parquet], [4.parquet], ...]}, projection=[timestamp, value], output_ordering=[timestamp@0 ASC], metrics=[output_rows=1000000, elapsed_compute=20ns, num_predicate_creation_errors=0, predicate_evaluation_errors=0, bytes_scanned=57972, page_index_rows_filtered=0, row_groups_pruned=0, pushdown_rows_filtered=0, time_elapsed_processing=51.918935ms, page_index_eval_time=40ns, time_elapsed_scanning_total=48.94925ms, time_elapsed_opening=2.996325ms, time_elapsed_scanning_until_data=48.311008ms, pushdown_eval_time=40ns] |
| | |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
The 572 milliseconds on the SortPreservingMergeExec
seems to be the bottleneck in the query, so I would like to optimize it.
Given the assumptions I can make about the Parquet files, I think that the SortPreservingMergeExec
can be replaced by what is essentially a concatenation of each of the Parquet files.
What would be the best approach to remove the SortPreservingMergeExec
?
My ideas:
- Manually re-partition the Parquet files into a single Parquet file using this new API: https://docs.rs/parquet/latest/parquet/file/writer/struct.SerializedRowGroupWriter.html#method.append_column
- I have an idea of implementing a custom
PhysicalOptimizerRule
that looks for theSortPreservingMergeExec ParquetExec
pattern, and replaces it with a concatenation instead.
But I would like to hear if there are any better ways.
To Reproduce
No response
Expected behavior
No response
Additional context
No response
Given the assumptions I can make about the Parquet files, I think that the SortPreservingMergeExec can be replaced by what is essentially a concatenation of each of the Parquet files.
I agree
I have an idea of implementing a custom PhysicalOptimizerRule that looks for the SortPreservingMergeExec ParquetExec pattern, and replaces it with a concatenation instead.
Yes, I think this would work. We do some similar things in IOx (interestingly also for the timeseries usecase with non-overlapping timeranges).
It was implemented by @crepererum which you can see in https://github.com/influxdata/influxdb_iox/tree/main/iox_query/src/physical_optimizer
Manually re-partition the Parquet files into a single Parquet file using this new API: https://docs.rs/parquet/latest/parquet/file/writer/struct.SerializedRowGroupWriter.html#method.append_column
I think this is likely the solution that would be the fastest for querying because then time predicates could be used to prune out entire row groups and you would have lower file opening overhead
The downside, is of course, you would need to rewrite the parquet files
I am marking this as a question as I am not sure it is really a bug -- though please let me know if you disagree
I think this is likely the solution that would be the fastest for querying because then time predicates could be used to prune out entire row groups and you would have lower file opening overhead
Thanks, I'll try this.
I am marking this as a question as I am not sure it is really a bug -- though please let me know if you disagree
My bad, it was a question.
Although one could argue it is also a feature request for an inbuilt optimization that removes sorts if it can detect non-overlaps using either hints or directly looking at min/max statistics on inputs. Do you think that is reasonable, or is it too specific for just my use case?
Although one could argue it is also a feature request for an inbuilt optimization that removes sorts if it can detect non-overlaps using either hints or directly looking at min/max statistics on inputs.
Do you think that is reasonable, or is it too specific for just my use case?
I think it is a reasonable request as having data sorted by date is so common, though the trick would be making the API reasonable and general purpose 🤔
I have had a somewhat overlapping (no pun intended) issue where DataFusion abandons the SortPreservingMergeStream
and does a global sort if there are multiple files in any file groups. It should be possible for DataFusion to realize that, if the files are non-overlapping, the file groups can be re-ordered to satisfy the required output ordering. We would be partitioning a poset of files into a series of chains, where A < B if they are non-overlapping, and every row in A goes before every row in B. Then each chain becomes one file group in the physical plan, which would be read sequentially. Using statistics and partition columns it should be possible to generate a reasonable execution plan without reading any rows.
I have had a somewhat overlapping (no pun intended) issue where DataFusion abandons the SortPreservingMergeStream and does a global sort if there are multiple files in any file groups. It should be possible for DataFusion to realize that, if the files are non-overlapping, the file groups can be re-ordered to satisfy the required output ordering.
Yes, that is correct -- each partition stream from the parquet reader is produced back to back, so if there are multiple files, the resulting stream is not ordered even if all the input files were
We would be partitioning a poset of files into a series of chains, where A < B if they are non-overlapping, and every row in A goes before every row in B.
Indeed as long as each output group was ordered in non overlapping time the parquet reader would not need to be changed at all
FWIW @NGA-TRAN is working on something similar downstream in InfluxDB IOx
FWIW @NGA-TRAN is working on something similar downstream in InfluxDB IOx
Follow up: https://github.com/apache/datafusion/issues/10316