hdk
hdk copied to clipboard
[Perf][Bench] Join is slow on big tables.
On size of tables ~10^8 join performance is very low, compared with duckdb.
On join by single int column for 2 tables 10^8 size takes about 12 s.
12888ms total duration for executeRelAlgQuery
12886ms start(1ms) executeWorkUnit RelAlgExecutor.cpp:1389
569ms start(1ms) compileWorkUnit NativeCodegen.cpp:1403
5394ms start(571ms) fetchChunks Execute.cpp:3090
67ms start(5966ms) executePlan Execute.cpp:3462
9ms start(6487ms) collectAllDeviceResults Execute.cpp:2550
70ms start(6497ms) compileWorkUnit NativeCodegen.cpp:1403
3071ms start(6568ms) fetchChunks Execute.cpp:3090
527ms start(9640ms) executePlan Execute.cpp:3462
The biggest performance drop is in fetchChunks. There are also 2 subqueries, the first is COUNT(*) to collect metadata.
Possible points/hints to increase performance:
- There are too much copying of data in getAllTableColumnFragments.
585ms start(597ms) getAllTableColumnFragments ColumnFetcher.cpp:243
68ms start(597ms) getOneTableColumnFragment ColumnFetcher.cpp:176
58ms start(666ms) ColumnarResults ColumnarResults.cpp:124
69ms start(724ms) getOneTableColumnFragment ColumnFetcher.cpp:176
57ms start(793ms) ColumnarResults ColumnarResults.cpp:124
69ms start(851ms) getOneTableColumnFragment ColumnFetcher.cpp:176
57ms start(921ms) ColumnarResults ColumnarResults.cpp:124
8ms start(979ms) getOneTableColumnFragment ColumnFetcher.cpp:176
7ms start(987ms) ColumnarResults ColumnarResults.cpp:124
187ms start(995ms) mergeResults ColumnarResults.cpp:139
Looks like there are 3 chunk copies.
- getChunkBuffer - should be copy from Arrow and aimed to create chunks
- createZeroCopyBuffer - is fast and fine
- if not we are going to persistent DataMgr, than to ArrowStorage::fetchBuffer <- this is slow.
- mergeResults is too slow
1st copy - is copy from specific fragment to required format (arrow), 2nd copy - is copy of all fragments to united common structured format (getAllTableColumnFragments colunbFetcher), 3rd copy - is unknown <- possible issue.
- Why
getAllTableColumnFragmentstook ~ 3 s. In some cases waiting to acquire a lock.
2070ms start(2ms) getAllTableColumnFragments ColumnFetcher.cpp:248
0ms start(2072ms) lock taken, execution started ColumnFetcher.cpp:262
-
Columns in
getAllTableColumnFragmentsare fetched not in parallel, but it's accessing different fragments. (connected with previous issue) It's possible point for improvement. Already tried, reduced total time for about 2 s, -
Second call of
fetchChunksalso do some data transfers. Didn't we already save the columns during the previousfetchChunksrun? Most of cache is stored in Cpu/Gpu BufferMgr. -
At the end of the execution, we spent about 3.5 seconds on unknown activity, which looks suspicious.
-
Fragment data sometimes is available without any copying as they already in the correct format.
Do we keep copying data even when everything is fine with format? Currently there is no interface to check DataLayout, but maybe we should add something like this.
Support for multi-fragment joins has always been poor. There is multi-fragment join hash table construction, but the references from the hash table to the actual data are 0-indexed and therefore do not support multi-fragment.
count(*) first is a known pattern; it is used to size output buffers.
Hash tables should be cached, so I suspect the extra copies are part of the query. It might be worthwhile to check that the hash table cache is working between the count(*) and the actual query.
- is interesting - do you know what method this is happening in?
I am curious - what is the fragment size when you run these queries? Can you try running where the inner join table fits into a single fragment?
Discussed with Alex this issue.
Possible solution here is to remove call of getAllTableColumnFragments with needFetchAllFragments function.
During this function we are moving and linearizing data, that in common not required and can be covered with changing of accessing mechanism to hash table data by row id - to convert it to fragment_idx + offset instead of copying.
So it should be investigated with fragment_size manipulation and hash_table initiation on compileWorkUnit stage.
I found how to reduce generated fragment_size via number_fo_fragments variable.
In this case number_fo_fragments = 100_000_000 reduces number of fragments per kernel to 1.
e.g. with needFetchAllFragments number of fragments: 25 table_id: 2 fragment_size = 4_000_000
10782ms total duration for executeRelAlgQuery
10781ms start(0ms) executeRelAlgQueryNoRetry RelAlgExecutor.cpp:216
1ms start(0ms) Query pre-execution steps RelAlgExecutor.cpp:217
10779ms start(2ms) execute RelAlgExecutor.cpp:411
10779ms start(2ms) executeStep RelAlgExecutor.cpp:895
10779ms start(2ms) executeWorkUnit RelAlgExecutor.cpp:1389
547ms start(3ms) compileWorkUnit NativeCodegen.cpp:1403
0ms start(551ms) ExecutionKernel::run ExecutionKernel.cpp:12
4678ms start(551ms) fetchChunks Execute.cpp:3092
0ms start(5229ms) create QueryExecutionContext.cpp:94
108ms start(5229ms) executePlan Execute.cpp:3464
10ms start(5340ms) collectAllDeviceResults Execute.cpp:2550
71ms start(5351ms) compileWorkUnit NativeCodegen.cpp:1403
0ms start(5423ms) ExecutionKernel::run ExecutionKernel.cpp:126
3542ms start(5423ms) fetchChunks Execute.cpp:3092
0ms start(8965ms) create QueryExecutionContext.cpp:94
1023ms start(8965ms) executePlan Execute.cpp:3464
with needFetchAllFragments number of fragments: 1 table_id: 4 fragment_size = 100_000_000
8097ms total duration for executeRelAlgQuery
8097ms start(0ms) executeRelAlgQueryNoRetry RelAlgExecutor.cpp:216
0ms start(0ms) Query pre-execution steps RelAlgExecutor.cpp:217
8097ms start(0ms) execute RelAlgExecutor.cpp:411
8097ms start(0ms) executeStep RelAlgExecutor.cpp:895
8096ms start(1ms) executeWorkUnit RelAlgExecutor.cpp:1389
13ms start(1ms) compileWorkUnit NativeCodegen.cpp:1403
0ms start(14ms) ExecutionKernel::run ExecutionKernel.cpp:126
1ms start(14ms) fetchChunks Execute.cpp:3092
0ms start(16ms) create QueryExecutionContext.cpp:94
1787ms start(16ms) executePlan Execute.cpp:3464
0ms start(1803ms) collectAllDeviceResults Execute.cpp:255
54ms start(1804ms) compileWorkUnit NativeCodegen.cpp:1403
0ms start(1858ms) ExecutionKernel::run ExecutionKernel.cpp:126
1ms start(1858ms) fetchChunks Execute.cpp:3092
0ms start(1860ms) create QueryExecutionContext.cpp:94
6236ms start(1860ms) executePlan Execute.cpp:3464
So fetchChunks time reduced, but executePlan increased.
I noticed that count* fetches too much columns. It's done to estimate size of output buffer for query.
Currently it's just all cols that participate in query are fetched during count*, so in most cases it's redundant.
Possible fix at https://github.com/intel-ai/hdk/pull/579/commits/25d78bcd9a39eacfe25373142ea95a29e1e9b6bf and following commit.
Original main on current join
5742 ms start(514ms) fetchChunks Execute.cpp:3087 (count*)
4237 ms start(6891ms) fetchChunks Execute.cpp:3087 (join)
Updated with https://github.com/intel-ai/hdk/commit/25d78bcd9a39eacfe25373142ea95a29e1e9b6bf
428 ms start(455ms) fetchChunks Execute.cpp:3093 (count*)
4558 ms start(1653ms) fetchChunks Execute.cpp:3093 (join)
Current plan on this issue:
- [x] Misuse of generated
count *for collecting metadata. PR #590 - [x] Useless copying(memcpy) in
getAllTableColumnFragments.- reduce copies Zero Copy approach PR #623
- [x] Useless copying(memcpy) in
getAllTableColumnFragments. Change whole function to allow fetch to determined address.- More parallel 1-copy PR #616
- [x]
initHashTableexecution increases with enabledenable-non-lazy-data-importflag:118ms start(64ms) CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketizedwithout8874ms start(42ms) CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketizedwith flag enabled Covered in PR #663 Root cause is the incorrect use of the multithreading mechanism, corrected with the help of tbb. - [ ] Hashing mechanism. Change hash function to take into a count fragment_id. (Conditioned by the join algorithm (hashjoin))
- [x] Segfault with
non-lazy-data-importon 1e9 size. #705
./_launcher/solution.R --solution=pyhdk --task=join --nrow=1e9
[1] "./pyhdk/join-pyhdk.py"
# join-pyhdk.py
pyhdk data_name: J1_1e9_NA_0_0
loading datasets J1_1e9_NA_0_0, J1_1e9_1e3_0_0, J1_1e9_1e6_0_0, J1_1e9_1e9_0_0
Using fragment size 4000000
1000000000
1000
1000000
1000000000
joining...
(899999033, 9)
[thread 851878 also had an error][thread 854019 also had an error][thread 851800 also had an error][thread 851224 also had an error][thread 854989 also had an error][thread 853424 also had an error]
[thread 851529 also had an error]2023-09-13T16:57:41.940055 F 726860 0 194 ColumnarResults.cpp:371 Check failed: type->isString()
[thread 852976 also had an error]
[thread 852381 also had an error]
[thread 853851 also had an error]
#
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGSEGV (0xb) at pc=0x00007f509cc352d6, pid=726860, tid=853223
#
# JRE version: OpenJDK Runtime Environment (20.0) (build 20-internal-adhoc..src)
# Java VM: OpenJDK 64-Bit Server VM (20-internal-adhoc..src, mixed mode, sharing, tiered, compressed oops, compressed class ptrs, g1 gc, linux-amd64)
# Problematic frame:
# 2023-09-13T16:57:41.939437 F 726860 0 195 ColumnarResults.cpp:371 Check failed: type->isString()
Aborted (core dumped)
~~Irrelevant Issues~~
- [x] Missing 3 seconds at the end of benchmark.
486ms start(2904ms) executePlan Execute.cpp:3464
0ms start(6189ms) resultsUnion Execute.cpp:1134
I understand timings in new threads in wrong way. This time is spent on kernel execution:
New thread(6)
1561ms start(0ms) fetchChunks Execute.cpp:3095
3136ms start(1562ms) executePlan Execute.cpp:3464
3136ms start(1562ms) launchCpuCode QueryExecutionContext.cpp:564
0ms start(4699ms) getRowSet QueryExecutionContext.cpp:192
End thread(6)
New thread(7)
1562ms start(0ms) fetchChunks Execute.cpp:3095
3284ms start(1562ms) executePlan Execute.cpp:3464
3284ms start(1562ms) launchCpuCode QueryExecutionContext.cpp:564
0ms start(4846ms) getRowSet QueryExecutionContext.cpp:192
End thread(7)
New thread(8)
1562ms start(0ms) fetchChunks Execute.cpp:3095
3129ms start(1562ms) executePlan Execute.cpp:3464
3129ms start(1562ms) launchCpuCode QueryExecutionContext.cpp:564
0ms start(4692ms) getRowSet QueryExecutionContext.cpp:192
End thread(8)
issues with copies and hashing also here, so it should be open.