ClickHouse icon indicating copy to clipboard operation
ClickHouse copied to clipboard

[WIP] A new parquet reader that supports filter push down, which improves the total time on clickbench by 40+% compared to arrow parquet reader

Open liuneng1994 opened this issue 1 year ago • 1 comments

I have seen many PRs that do optimization on parquet. Here I share a redesigned parquet reader that can currently achieve more than 40% performance improvement on clickbench.

There are still some features to be completed before it can be used in practice, including supporting more types, including complex types, and more complete expression pushdown.

Below are the performance test results on my development machine (12900k, 128GB memory)

cold run

query no tries arrow new reader new reader/ arrow
1 1 0.003 0.005 1.666666667
2 1 0.091 0.091 1
3 1 0.125 0.122 0.976
4 1 0.174 0.195 1.120689655
5 1 0.857 0.809 0.943990665
6 1 0.917 0.756 0.824427481
7 1 0.084 0.099 1.178571429
8 1 0.081 0.082 1.012345679
9 1 1.2 1.083 0.9025
10 1 1.377 1.117 0.811183733
11 1 0.545 0.317 0.581651376
12 1 0.536 0.303 0.565298507
13 1 1.513 1.278 0.844679445
14 1 2.174 1.701 0.782428703
15 1 1.531 1.2 0.783801437
16 1 1.416 1.289 0.910310734
17 1 3.617 3.553 0.982305778
18 1 2.081 1.846 0.887073522
19 1 8.503 8.071 0.949194402
20 1 0.192 0.194 1.010416667
21 1 5.349 1.801 0.336698448
22 1 5.731 1.294 0.225789566
23 1 10.694 2.954 0.276229661
24 1 27.219 9.326 0.342628311
25 1 1.14 0.686 0.601754386
26 1 0.697 0.455 0.652797704
27 1 1.131 0.62 0.548187445
28 1 5.745 1.967 0.342384682
29 1 8.296 6.542 0.788572806
30 1 0.101 0.128 1.267326733
31 1 1.368 1.13 0.826023392
32 1 2.336 1.841 0.788099315
33 1 8.479 8.962 1.056964265
34 1 9.468 7.005 0.739860583
35 1 9.692 7.399 0.763413124
36 1 1.052 1.073 1.019961977
37 1 0.185 0.156 0.843243243
38 1 0.132 0.117 0.886363636
39 1 0.162 0.118 0.728395062
40 1 0.325 0.185 0.569230769
41 1 0.068 0.083 1.220588235
42 1 0.061 0.085 1.393442623
43 1 0.058 0.078 1.344827586
sum   126.5 78.1 0.617

hot run

query no tries arrow new reader new reader/ arrow
1 2 0.003 0.005 1.666666667
2 2 0.08 0.085 1.0625
3 2 0.113 0.107 0.946902655
4 2 0.161 0.182 1.130434783
5 2 0.828 0.777 0.938405797
6 2 0.918 0.73 0.795206972
7 2 0.077 0.096 1.246753247
8 2 0.078 0.084 1.076923077
9 2 1.154 0.992 0.859618718
10 2 1.318 1.07 0.811836115
11 2 0.44 0.287 0.652272727
12 2 0.501 0.268 0.53493014
13 2 1.474 1.088 0.738127544
14 2 2.133 1.653 0.774964838
15 2 1.508 1.096 0.726790451
16 2 1.162 1.085 0.93373494
17 2 3.515 3.219 0.915789474
18 2 2.033 1.75 0.860796852
19 2 6.809 6.214 0.912615656
20 2 0.16 0.13 0.8125
21 2 4.76 1.526 0.320588235
22 2 5.304 0.998 0.188159879
23 2 10.077 2.499 0.247990473
24 2 26.25 8.131 0.309752381
25 2 1.092 0.508 0.465201465
26 2 0.658 0.361 0.548632219
27 2 1.034 0.498 0.481624758
28 2 5.533 1.666 0.301102476
29 2 7.815 6.055 0.774792067
30 2 0.1 0.109 1.09
31 2 1.246 0.983 0.788924559
32 2 2 1.501 0.7505
33 2 8.58 8.328 0.970629371
34 2 9.314 6.719 0.721387159
35 2 9.431 7.093 0.752094158
36 2 0.873 0.856 0.980526919
37 2 0.163 0.14 0.858895706
38 2 0.114 0.102 0.894736842
39 2 0.155 0.103 0.664516129
40 2 0.28 0.185 0.660714286
41 2 0.062 0.087 1.403225806
42 2 0.053 0.081 1.528301887
43 2 0.052 0.074 1.423076923
sum   119.4 69.5 0.582

Changelog category (leave one):

  • Experimental Feature

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

...

Documentation entry for user-facing changes

  • [ ] Documentation is written (mandatory for new features)

Information about CI checks: https://clickhouse.com/docs/en/development/continuous-integration/

CI Settings (Only check the boxes if you know what you are doing):

  • [ ] Allow: All Required Checks
  • [ ] Allow: Stateless tests
  • [ ] Allow: Stateful tests
  • [ ] Allow: Integration Tests
  • [ ] Allow: Performance tests
  • [ ] Allow: All Builds
  • [ ] Allow: batch 1, 2 for multi-batch jobs
  • [ ] Allow: batch 3, 4, 5, 6 for multi-batch jobs

  • [ ] Exclude: Style check
  • [ ] Exclude: Fast test
  • [ ] Exclude: All with ASAN
  • [ ] Exclude: All with TSAN, MSAN, UBSAN, Coverage
  • [ ] Exclude: All with aarch64, release, debug

  • [ ] Run only fuzzers related jobs (libFuzzer fuzzers, AST fuzzers, etc.)
  • [ ] Exclude: AST fuzzers

  • [ ] Do not test
  • [ ] Woolen Wolfdog
  • [ ] Upload binaries for special builds
  • [ ] Disable merge-commit
  • [ ] Disable CI cache

liuneng1994 avatar Oct 14 '24 05:10 liuneng1994

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 2 committers have signed the CLA.

:white_check_mark: yxheartipp
:x: liuneng1994


liuneng1994 seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

CLAassistant avatar Oct 14 '24 05:10 CLAassistant

@liuneng1994 I assume this is supposed to replace the existing "native reader implementation". If so, do you plan to incorporate all of its functionality, including the ones that are introduced while your PR is on draft/ on review? For example: https://github.com/ClickHouse/ClickHouse/pull/70807.

I want to help achieve feature completeness on native reader, but I am afraid all the work will be thrown away once this PR gets merged. Do you see a way we could collaborate?

arthurpassos avatar Oct 23 '24 12:10 arthurpassos

@liuneng1994 I assume this is supposed to replace the existing "native reader implementation". If so, do you plan to incorporate all of its functionality, including the ones that are introduced while your PR is on draft/ on review? For example: #70807.

I want to help achieve feature completeness on native reader, but I am afraid all the work will be thrown away once this PR gets merged. Do you see a way we could collaborate?

I started this work in July, and I didn't realize there were so many PRs related to parquet, which was beyond my expectation. Of course, I hope that when this PR is ready for merging, it can include existing features, but this will add a lot of work.

I am very happy to have someone to work with to implement a native parquet reader. After referring to the parquet readers of other engines (starrocks, velox, duckdb, etc.), I also realized that it is indeed a lot of work to make an industrially available parquet reader.

If you don't mind, we can work together in my fork repository. You can exchange ideas at https://github.com/ClickHouse/ClickHouse/issues/68641, or use other communication tools (telegram or others). The current code is synchronized in real time in this PR. If you think my implementation is worth collaborating on, I am willing to take the time to write a detailed document to explain my design ideas and subsequent directions.

liuneng1994 avatar Oct 23 '24 12:10 liuneng1994

@liuneng1994 I assume this is supposed to replace the existing "native reader implementation". If so, do you plan to incorporate all of its functionality, including the ones that are introduced while your PR is on draft/ on review? For example: #70807. I want to help achieve feature completeness on native reader, but I am afraid all the work will be thrown away once this PR gets merged. Do you see a way we could collaborate?

I started this work in July, and I didn't realize there were so many PRs related to parquet, which was beyond my expectation. Of course, I hope that when this PR is ready for merging, it can include existing features, but this will add a lot of work.

I am very happy to have someone to work with to implement a native parquet reader. After referring to the parquet readers of other engines (starrocks, velox, duckdb, etc.), I also realized that it is indeed a lot of work to make an industrially available parquet reader.

If you don't mind, we can work together in my fork repository. You can exchange ideas at #68641, or use other communication tools (telegram or others). The current code is synchronized in real time in this PR. If you think my implementation is worth collaborating on, I am willing to take the time to write a detailed document to explain my design ideas and subsequent directions.

Working together towards better parquet support sounds like a good idea. Do you mind sharing your telegram handle? Mine is @arthurpassoscs

arthurpassos avatar Oct 30 '24 16:10 arthurpassos

@liuneng1994 I assume this is supposed to replace the existing "native reader implementation". If so, do you plan to incorporate all of its functionality, including the ones that are introduced while your PR is on draft/ on review? For example: #70807. I want to help achieve feature completeness on native reader, but I am afraid all the work will be thrown away once this PR gets merged. Do you see a way we could collaborate?

I started this work in July, and I didn't realize there were so many PRs related to parquet, which was beyond my expectation. Of course, I hope that when this PR is ready for merging, it can include existing features, but this will add a lot of work. I am very happy to have someone to work with to implement a native parquet reader. After referring to the parquet readers of other engines (starrocks, velox, duckdb, etc.), I also realized that it is indeed a lot of work to make an industrially available parquet reader. If you don't mind, we can work together in my fork repository. You can exchange ideas at #68641, or use other communication tools (telegram or others). The current code is synchronized in real time in this PR. If you think my implementation is worth collaborating on, I am willing to take the time to write a detailed document to explain my design ideas and subsequent directions.

Working together towards better parquet support sounds like a good idea. Do you mind sharing your telegram handle? Mine is @arthurpassoscs

My telegram is @liuneng1994, I have sent you a message. I will take the time to describe the current progress and future plans.

liuneng1994 avatar Nov 04 '24 02:11 liuneng1994

I attempted to test this implementation of the native reader on some of my Parquet files but encountered an issue where the ClickHouse server crashes at some point while reading from the Parquet file. Below is the complete ClickHouse log.

clickhouse-server.log

Selfeer avatar Nov 27 '24 14:11 Selfeer

I attempted to test this implementation of the native reader on some of my Parquet files but encountered an issue where the ClickHouse server crashes at some point while reading from the Parquet file. Below is the complete ClickHouse log.

clickhouse-server.log

Thank you for your test. If convenient, you can provide a way to reproduce it. My previous work was mainly on verifying the feasibility of performance. The test cases were limited, mainly focusing on the correctness of clickbench. Later, I will cover more test cases in arrow parquet test.

liuneng1994 avatar Nov 30 '24 09:11 liuneng1994

Have you compared the performance of your parquet reader vs clickhouse existing native reader?

arthurpassos avatar Dec 13 '24 14:12 arthurpassos

Have you compared the performance of your parquet reader vs clickhouse existing native reader?

I haven't compared it. I compared this reader with mergetree. Because the existing native reader does not support filter push down, there is not much comparability. Filter push down is crucial for benchmarks such as clickbench.

Off topic, I am still working on supporting nested types and ut coverage. The complexity is a bit beyond my expectations. If everything goes well, I should be able to have a version of a certain quality in a week or two.

liuneng1994 avatar Dec 18 '24 09:12 liuneng1994

performance compare with arrow and mergetree, version 20241223 mergetree use data without sorting

Hot Run

query no tries mergetree arrow PAR241223   241223/ARROW 241223/MERGETREE
1 2 0.003 0.003 0.021   7 7
2 2 0.009 0.08 0.084   1.05 9.333333333
3 2 0.017 0.113 0.102   0.902654867 6
4 2 0.025 0.161 0.15   0.931677019 6
5 2 0.594 0.828 0.661   0.798309179 1.112794613
6 2 0.497 0.918 0.645   0.702614379 1.29778672
7 2 0.012 0.077 0.108   1.402597403 9
8 2 0.011 0.078 0.083   1.064102564 7.545454545
9 2 0.749 1.154 0.852   0.73830156 1.137516689
10 2 0.828 1.318 0.913   0.692716237 1.102657005
11 2 0.139 0.44 0.251   0.570454545 1.805755396
12 2 0.173 0.501 0.246   0.491017964 1.421965318
13 2 0.945 1.474 0.884   0.59972863 0.935449735
14 2 1.367 2.133 1.281   0.600562588 0.937088515
15 2 0.948 1.508 0.882   0.584880637 0.930379747
16 2 0.908 1.162 0.873   0.751290878 0.961453744
17 2 2.528 3.515 2.499   0.710953058 0.988528481
18 2 1.286 2.033 1.425   0.700934579 1.108087092
19 2 4.878 6.809 5.067   0.744162138 1.038745387
20 2 0.024 0.16 0.093   0.58125 3.875
21 2 0.959 4.76 2.285   0.480042017 2.382690302
22 2 1.236 5.304 0.884   0.166666667 0.715210356
23 2 1.875 10.077 2.088   0.207204525 1.1136
24 2 3.409 26.25 8.919   0.339771429 2.616309768
25 2 0.24 1.092 0.403   0.369047619 1.679166667
26 2 0.234 0.658 0.307   0.46656535 1.311965812
27 2 0.244 1.034 0.427   0.412959381 1.75
28 2 1.327 5.533 1.422   0.257003434 1.071590053
29 2 4.396 7.815 4.845   0.619961612 1.102138308
30 2 0.024 0.1 0.113   1.13 4.708333333
31 2 0.707 1.246 0.743   0.596308186 1.050919378
32 2 1.088 2 1.155   0.5775 1.061580882
33 2 6.285 8.58 6.527   0.760722611 1.038504375
34 2 5.146 9.314 5.326   0.571827357 1.034978624
35 2 5.161 9.431 5.453   0.578199555 1.056578183
36 2 0.694 0.873 0.686   0.785796105 0.988472622
37 2 0.074 0.163 0.109   0.668711656 1.472972973
38 2 0.049 0.114 0.082   0.719298246 1.673469388
39 2 0.058 0.155 0.08   0.516129032 1.379310345
40 2 0.126 0.28 0.145   0.517857143 1.150793651
41 2 0.025 0.062 0.058   0.935483871 2.32
42 2 0.023 0.053 0.059   1.113207547 2.565217391
43 2 0.016 0.052 0.048   0.923076923 3

liuneng1994 avatar Dec 23 '24 08:12 liuneng1994

after fix some bugs, performance has a little improvement, parquet is faster than mergetree on low cardinality string column

total

image

time plot

newplot

relative ratio plot

newplot (2)

liuneng1994 avatar Dec 26 '24 10:12 liuneng1994

@nickitat Hi, currently the functions of parquet reader have been mostly completed, supporting most common types, and the latest performance of my test data is also far ahead of arrow. There is still a lot of work to be done, including supporting page index and other details optimization, which is a long-term work. If I hope to merge it into master, I can have a more complete CI to ensure quality, and it will be more convenient to cooperate with other developers in the community. What else do I need to do? Because there is already a native reader, my goal is to replace the existing parquet reader, but considering that many people have worked on the existing reader (data page v2, bfloat16, etc.), I need help to judge and give a suitable merging plan.

supported type Primitive Type

  1. Boolean
  2. Int8
  3. Int16
  4. Int32
  5. Int64
  6. UInt8
  7. UInt16
  8. UInt32
  9. UInt64
  10. Float32
  11. Float64
  12. Date32
  13. Date
  14. DateTime
  15. DateTime64
  16. FixedString
  17. Decimal32
  18. Decimal64
  19. Deicmal128
  20. Decimal256

Nested Type

  1. ArrayType
  2. StructType
  3. TupleType

There are still some use cases for filter push down to be added, which is expected to be completed in one to two weeks.

liuneng1994 avatar Dec 30 '24 07:12 liuneng1994

@nickitat Hi, I have fixed most of the issues and can fully support the test cases of the existing native reader. I think it is a basically usable version. There will be more details to optimize it in different scenarios. At the same time, I have introduced new configurations such as enabling multiple parquet readers to coexist. I hope you can help review this PR when you have time.

liuneng1994 avatar Jan 16 '25 06:01 liuneng1994

@al13n321 Hi, are you interested in reviewing this PR? It seems no one is willing to review it.

liuneng1994 avatar Feb 18 '25 09:02 liuneng1994

@alexey-milovidov @rschu1ze Can you guys add a can be tested label on this PR? Thanks.

zhanglistar avatar Feb 26 '25 06:02 zhanglistar

Workflow [PR], commit [d4d7f66b]

clickhouse-gh[bot] avatar Feb 26 '25 08:02 clickhouse-gh[bot]

It breaks with indexHint:

% rm t.parquet; ~/ClickHouse/build/programs/clickhouse local --stacktrace -q "insert into function file('t.parquet') select 1 as x; select * from file('t.parquet') where indexHint(x = 1) settings 
input_format_parquet_use_native_reader_with_filter_push_down=1"                                                                             
rm: cannot remove 't.parquet': No such file or directory
Code: 59. DB::Exception: Illegal type Const(UInt8) of column for filter. Must be UInt8 or Nullable(UInt8) or Const variants of them.: (in file/uri /home/ubuntu/ClickHouse/t.parquet): While executing ParquetBlockInputFormat: While executing File. (ILLEGAL_TYPE_OF_COLUMN_FOR_FILT
ER), Stack trace (when copying this message, always include the lines below):
                                                                     
0. ./contrib/llvm-project/libcxx/include/__exception/exception.h:106: Poco::Exception::Exception(String const&, int) @ 0x00000000171baa52
1. ./build/./src/Common/Exception.cpp:108: DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000d1beb79                                                                                                                                                   
2. DB::Exception::Exception(PreformattedMessage&&, int) @ 0x0000000007fbe3ec                                                                                                                                                                                                          
3. DB::Exception::Exception<String>(int, FormatStringHelperImpl<std::type_identity<String>::type>, String&&) @ 0x0000000007fbe0eb
4. ./build/./src/Columns/FilterDescription.cpp:98: DB::FilterDescription::FilterDescription(DB::IColumn const&) @ 0x0000000012f73802                                                                                                                                                  
5. ./build/./src/Processors/Formats/Impl/Parquet/ColumnFilter.cpp:1107: DB::ExpressionFilter::execute(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&) @ 0x000000001420b9b3
6. ./build/./src/Processors/Formats/Impl/Parquet/RowGroupChunkReader.cpp:473: DB::SelectConditions::selectRows(unsigned long) @ 0x000000001427f6b1
7. ./build/./src/Processors/Formats/Impl/Parquet/RowGroupChunkReader.cpp:161: DB::RowGroupChunkReader::readChunk(unsigned long) @ 0x000000001427d4a0              
8. ./build/./src/Processors/Formats/Impl/Parquet/ParquetReader.cpp:188: DB::SubRowGroupRangeReader::read(unsigned long) @ 0x000000001421d107
9. ./build/./src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp:1045: DB::ParquetBlockInputFormat::decodeOneChunk(unsigned long, std::unique_lock<std::mutex>&) @ 0x0000000013e6aef3
10. ./build/./src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp:949: void std::__function::__policy_invoker<void ()>::__call_impl[abi:ne190107]<std::__function::__default_alloc_func<DB::ParquetBlockInputFormat::scheduleRowGroup(unsigned long)::$_0, void ()>>(std::__functi
on::__policy_storage const*) @ 0x0000000013e6e505                                                                                                                                                                                                                                     
11. ./contrib/llvm-project/libcxx/include/__functional/function.h:716: ? @ 0x000000000d2990ec                                                                                                                                                                                         
12. ./contrib/llvm-project/libcxx/include/__type_traits/invoke.h:117: ThreadFromGlobalPoolImpl<false, true>::ThreadFromGlobalPoolImpl<void (ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool::*)(), ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::
ThreadFromThreadPool*>(void (ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool::*&&)(), ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool*&&)::'lambda'()::operator()() @ 0x000000000d29dba3
13. ./contrib/llvm-project/libcxx/include/__functional/function.h:716: ? @ 0x000000000d296fad         
14. ./contrib/llvm-project/libcxx/include/__type_traits/invoke.h:117: void* std::__thread_proxy[abi:ne190107]<std::tuple<std::unique_ptr<std::__thread_struct, std::default_delete<std::__thread_struct>>, void (ThreadPoolImpl<std::thread>::ThreadFromThreadPool::*)(), ThreadPoolIm
pl<std::thread>::ThreadFromThreadPool*>>(void*) @ 0x000000000d29bf9b                                                                       
15. ? @ 0x00007ffff7c94ac3                                                                                                                 
16. ? @ 0x00007ffff7d26850

al13n321 avatar Mar 01 '25 01:03 al13n321

Thank you for your review, I will solve the above problems one by one

liuneng1994 avatar Mar 04 '25 02:03 liuneng1994

In the future, @yxheartipp will work with me to improve this PR

liuneng1994 avatar Mar 05 '25 08:03 liuneng1994

This is probably outside the scope of this PR, but I think there's a better way to do PREWHERE-like prefiltering and input_format_parquet_bloom_filter_push_down. Especially when reading from network (rather than local file), especially with high latency. It seems that the best read strategy would be to read in 4 stages, pipelined as 4 sliding windows of tasks running in parallel (with some dependencies between them):

  1. Filter prefetch: read file byte ranges corresponding to bloom filters and/or the smallest/most-promising filter columns (similar to PREWHERE). Runs on IO threads with very high parallelism (important when reading from network), for lots of row groups at once, because these filters/columns are small and don't take a lot of memory per row group.
  2. Decompress and decode those bloom filters and filter columns. Evaluate filter expressions and produce filter masks. On non-IO threads, with parallelism matching the number of cores. One task per row group is ok, no need to parallelize by individual columns. The resulting filter masks are small (especially if we bit-pack them), so we can afford these tasks to run far ahead of the main data reading, usually all the way to the end of the file (for good read size estimate for the progress bar).
  3. Main data prefetch: read file byte ranges corresponding to the remaining columns. Use filter masks from previous step and page offset index to determine byte ranges to read. That's why this is a separate step from filtering - we want to know tight read ranges before we start reading. Runs on IO threads, with medium parallelism. Can't have very high parallelism because this takes a lot of memory. But higher than decompression+decoding because data is still compressed at this stage. Can read byte ranges within the same row group in parallel, can merge multiple small row groups into one byte range.
  4. Decompress and decode the remaining columns. Runs on non-IO threads, with parallelism matching the number of cores, or limited by memory. Parallelized at the level of primitive columns, not row groups (this is a problem for the current reader that can use at most one thread per row group, but we often don't have enough memory for many row groups) (this is another reason to build filter masks separately - it allows reading columns independently from each other). Composite columns like tuples can be reassembled at the very end before returning the block from ParquetBlockInputFormat.

Each of these would be pipelined across row groups, with a low and high watermark. The dependencies are simple: next stage for a row group can begin when the previous stage completed for that row group (but we won't necessarily begin one row group at a time; for small row groups we'll wait for a few row groups to be ready and schedule them as one task, based a target byte size).

(As a slight generalization, there can be multiple filtering stages: first by the smallest and most promising filter columns, then by other filter columns, then read main data. This hopefully won't be needed.)

(Another consideration is that sometimes when reading filter byte ranges we'll also read some data along the way, if gaps between ranges are small enough that we decided to read instead of seeking. Would be nice to preserve and reuse that read data in later stages instead of reading again. But other times we would rather deallocate it to free up more memory for later stages. Not sure how exactly to handle that.)

Maybe I'll make a native reader v3 at some point :) . This is speculative though, please finish this PR too.

(I previously assumed that this PR would have close to optimal performance and would need at most need incremental optimizations/simplifications. But now that I wrote down the above idea, it sounds very different from this PR, and potentially significantly faster and simpler (though it's likely that I missed something and it won't work). Probably faster to prototype it from scratch than modifying reader v2.)

al13n321 avatar Mar 11 '25 03:03 al13n321

This is probably outside the scope of this PR, but I think there's a better way to do PREWHERE-like prefiltering and input_format_parquet_bloom_filter_push_down. Especially when reading from network (rather than local file), especially with high latency. It seems that the best read strategy would be to read in 4 stages, pipelined as 4 sliding windows of tasks running in parallel (with some dependencies between them):

  1. Filter prefetch: read file byte ranges corresponding to bloom filters and/or the smallest/most-promising filter columns (similar to PREWHERE). Runs on IO threads with very high parallelism (important when reading from network), for lots of row groups at once, because these filters/columns are small and don't take a lot of memory per row group.
  2. Decompress and decode those bloom filters and filter columns. Evaluate filter expressions and produce filter masks. On non-IO threads, with parallelism matching the number of cores. One task per row group is ok, no need to parallelize by individual columns. The resulting filter masks are small (especially if we bit-pack them), so we can afford these tasks to run far ahead of the main data reading, usually all the way to the end of the file (for good read size estimate for the progress bar).
  3. Main data prefetch: read file byte ranges corresponding to the remaining columns. Use filter masks from previous step and page offset index to determine byte ranges to read. That's why this is a separate step from filtering - we want to know tight read ranges before we start reading. Runs on IO threads, with medium parallelism. Can't have very high parallelism because this takes a lot of memory. But higher than decompression+decoding because data is still compressed at this stage. Can read byte ranges within the same row group in parallel, can merge multiple small row groups into one byte range.
  4. Decompress and decode the remaining columns. Runs on non-IO threads, with parallelism matching the number of cores, or limited by memory. Parallelized at the level of primitive columns, not row groups (this is a problem for the current reader that can use at most one thread per row group, but we often don't have enough memory for many row groups) (this is another reason to build filter masks separately - it allows reading columns independently from each other). Composite columns like tuples can be reassembled at the very end before returning the block from ParquetBlockInputFormat.

Each of these would be pipelined across row groups, with a low and high watermark. The dependencies are simple: next stage for a row group can begin when the previous stage completed for that row group (but we won't necessarily begin one row group at a time; for small row groups we'll wait for a few row groups to be ready and schedule them as one task, based a target byte size).

(As a slight generalization, there can be multiple filtering stages: first by the smallest and most promising filter columns, then by other filter columns, then read main data. This hopefully won't be needed.)

(Another consideration is that sometimes when reading filter byte ranges we'll also read some data along the way, if gaps between ranges are small enough that we decided to read instead of seeking. Would be nice to preserve and reuse that read data in later stages instead of reading again. But other times we would rather deallocate it to free up more memory for later stages. Not sure how exactly to handle that.)

Maybe I'll make a native reader v3 at some point :) . This is speculative though, please finish this PR too.

(I previously assumed that this PR would have close to optimal performance and would need at most need incremental optimizations/simplifications. But now that I wrote down the above idea, it sounds very different from this PR, and potentially significantly faster and simpler (though it's likely that I missed something and it won't work). Probably faster to prototype it from scratch than modifying reader v2.)

This is a very professional suggestion. I originally made this reader hoping to have a more efficient single-threaded reader. I did not consider the parallel issues too much and only did some parallel processing on IO.

liuneng1994 avatar Mar 19 '25 12:03 liuneng1994

Dear @al13n321, this PR hasn't been updated for a while. You will be unassigned. Will you continue working on it? If so, please feel free to reassign yourself.

clickhouse-gh[bot] avatar Apr 22 '25 13:04 clickhouse-gh[bot]

I'm working on another parquet reader: https://github.com/ClickHouse/ClickHouse/pull/78380 (only the last 3 commits in that PR are relevant). Current plan is to merge both this PR ("v2") and that PR ("v3"), then see if one is pareto-better than the other and maybe remove one (and also remove v1).

The number of parquet readers in the codebase is getting kind of ridiculous, we should do something to separate them more cleanly and make them easy to remove. I'm thinking:

  • Move the writer, native readers v1, v2 (yours), and v3 (mine) to 4 subdirectories of Formats/Impl/Parquet. Please move the v2 into a directory in this PR, then I'll send a separate PR to move the existing files (writer and v1).
  • Don't add parquet-specific things to KeyCondition. ~Looks like the KeyCondition always comes from a call to SourceWithKeyCondition::setKeyCondition(const std::optional<ActionsDAG>, ContextPtr). So maybe we can just remove the overload setKeyCondition(const std::shared_ptr<const KeyCondition>) and make SourceWithKeyCondition store an ActionsDAG instead of KeyCondition? The format can create KeyCondition itself. Please try this. (Ideally we would also do something to reuse the KeyCondition/ColumnFilterHelper when reading many files in one query; I'm working on it, will try to unify it with https://github.com/ClickHouse/ClickHouse/pull/66253/ as well.)~ Use https://github.com/ClickHouse/ClickHouse/pull/80931 instead.

(I was also hoping that some v3 things would be easy to reuse in v2, but that doesn't seem to be the case. V3 integrates with prewhere, and will probably use KeyCondition for all non-prewhere filtering instead of re-analyzing the ActionsDAG; but that doesn't seem to fit v2 because v2 wants to split all conditions by columns, so it would need to pointlessly reimplement the splitting for KeyCondition and prewhere separately. V3 has some complicated fine-grained task scheduling and prefetching to extract ~all available parallelism and to avoid prefetching data before filters are applied; it doesn't seem feasible to reuse at all.)

Please make these changes, then we can get this merged and start benchmarking (and hopefully v3 will be ready soon so we can benchmark everything at once). You don't have to address all the nitpicks from review comments, up to you.

al13n321 avatar May 22 '25 18:05 al13n321

Don't add parquet-specific things to KeyCondition.

Use https://github.com/ClickHouse/ClickHouse/pull/80931 to get access to ActionsDAG from ParquetBlockInputFormat.

al13n321 avatar May 29 '25 22:05 al13n321

@liuneng1994, thank you so much for the contribution, which provided the basis of the currently merged implementation!

alexey-milovidov avatar Aug 29 '25 06:08 alexey-milovidov