datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Optimization: Avoid sort for already sorted Parquet files that do not overlap values on condition

Open simonvandel opened this issue 1 year ago • 8 comments

Describe the bug

I'm testing performance of querying a number of Parquet files, where I can make some assumptions about the Parquet files.

  • Each Parquet file is already sorted on the column "timestamp".
  • Each Parquet file does not overlap values on the column "timestamp". For instance, file A has values for timestamps for 2022, and file B has values for timestamps 2023.

The schema of the files are:

  • "timestamp": TimestampMillisecond
  • "value": Float64

Consider the following query and it's query plan:

SELECT timestamp, value 
FROM samples 
ORDER BY timestamp ASC
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | SortPreservingMergeExec: [timestamp@0 ASC], metrics=[output_rows=1000000, elapsed_compute=572.526968ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|                   |   ParquetExec: file_groups={20 groups: [[0.parquet], [1.parquet], [2.parquet], [3.parquet], [4.parquet], ...]}, projection=[timestamp, value], output_ordering=[timestamp@0 ASC], metrics=[output_rows=1000000, elapsed_compute=20ns, num_predicate_creation_errors=0, predicate_evaluation_errors=0, bytes_scanned=57972, page_index_rows_filtered=0, row_groups_pruned=0, pushdown_rows_filtered=0, time_elapsed_processing=51.918935ms, page_index_eval_time=40ns, time_elapsed_scanning_total=48.94925ms, time_elapsed_opening=2.996325ms, time_elapsed_scanning_until_data=48.311008ms, pushdown_eval_time=40ns] |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

The 572 milliseconds on the SortPreservingMergeExec seems to be the bottleneck in the query, so I would like to optimize it.

Given the assumptions I can make about the Parquet files, I think that the SortPreservingMergeExec can be replaced by what is essentially a concatenation of each of the Parquet files.

What would be the best approach to remove the SortPreservingMergeExec? My ideas:

  • Manually re-partition the Parquet files into a single Parquet file using this new API: https://docs.rs/parquet/latest/parquet/file/writer/struct.SerializedRowGroupWriter.html#method.append_column
  • I have an idea of implementing a custom PhysicalOptimizerRule that looks for the SortPreservingMergeExec ParquetExec pattern, and replaces it with a concatenation instead.

But I would like to hear if there are any better ways.

To Reproduce

No response

Expected behavior

No response

Additional context

No response

simonvandel avatar Jun 14 '23 20:06 simonvandel

Given the assumptions I can make about the Parquet files, I think that the SortPreservingMergeExec can be replaced by what is essentially a concatenation of each of the Parquet files.

I agree

I have an idea of implementing a custom PhysicalOptimizerRule that looks for the SortPreservingMergeExec ParquetExec pattern, and replaces it with a concatenation instead.

Yes, I think this would work. We do some similar things in IOx (interestingly also for the timeseries usecase with non-overlapping timeranges).

It was implemented by @crepererum which you can see in https://github.com/influxdata/influxdb_iox/tree/main/iox_query/src/physical_optimizer

Manually re-partition the Parquet files into a single Parquet file using this new API: https://docs.rs/parquet/latest/parquet/file/writer/struct.SerializedRowGroupWriter.html#method.append_column

I think this is likely the solution that would be the fastest for querying because then time predicates could be used to prune out entire row groups and you would have lower file opening overhead

The downside, is of course, you would need to rewrite the parquet files

alamb avatar Jun 15 '23 14:06 alamb

I am marking this as a question as I am not sure it is really a bug -- though please let me know if you disagree

alamb avatar Jun 15 '23 14:06 alamb

I think this is likely the solution that would be the fastest for querying because then time predicates could be used to prune out entire row groups and you would have lower file opening overhead

Thanks, I'll try this.

I am marking this as a question as I am not sure it is really a bug -- though please let me know if you disagree

My bad, it was a question.

Although one could argue it is also a feature request for an inbuilt optimization that removes sorts if it can detect non-overlaps using either hints or directly looking at min/max statistics on inputs. Do you think that is reasonable, or is it too specific for just my use case?

simonvandel avatar Jun 15 '23 20:06 simonvandel

Although one could argue it is also a feature request for an inbuilt optimization that removes sorts if it can detect non-overlaps using either hints or directly looking at min/max statistics on inputs.

Do you think that is reasonable, or is it too specific for just my use case?

I think it is a reasonable request as having data sorted by date is so common, though the trick would be making the API reasonable and general purpose 🤔

alamb avatar Jun 16 '23 15:06 alamb

I have had a somewhat overlapping (no pun intended) issue where DataFusion abandons the SortPreservingMergeStream and does a global sort if there are multiple files in any file groups. It should be possible for DataFusion to realize that, if the files are non-overlapping, the file groups can be re-ordered to satisfy the required output ordering. We would be partitioning a poset of files into a series of chains, where A < B if they are non-overlapping, and every row in A goes before every row in B. Then each chain becomes one file group in the physical plan, which would be read sequentially. Using statistics and partition columns it should be possible to generate a reasonable execution plan without reading any rows.

suremarc avatar Jun 26 '23 18:06 suremarc

I have had a somewhat overlapping (no pun intended) issue where DataFusion abandons the SortPreservingMergeStream and does a global sort if there are multiple files in any file groups. It should be possible for DataFusion to realize that, if the files are non-overlapping, the file groups can be re-ordered to satisfy the required output ordering.

Yes, that is correct -- each partition stream from the parquet reader is produced back to back, so if there are multiple files, the resulting stream is not ordered even if all the input files were

We would be partitioning a poset of files into a series of chains, where A < B if they are non-overlapping, and every row in A goes before every row in B.

Indeed as long as each output group was ordered in non overlapping time the parquet reader would not need to be changed at all

alamb avatar Jun 26 '23 19:06 alamb

FWIW @NGA-TRAN is working on something similar downstream in InfluxDB IOx

alamb avatar Nov 27 '23 20:11 alamb

FWIW @NGA-TRAN is working on something similar downstream in InfluxDB IOx

Follow up: https://github.com/apache/datafusion/issues/10316

alamb avatar Apr 30 '24 17:04 alamb