arrow
arrow copied to clipboard
ARROW-16695: [R][Python][C++] Extension types are not supported in joins
This is to resolve ARROW-16695.
https://issues.apache.org/jira/browse/ARROW-16695
@michalursa I'd like to enable joining tables where some of non-key columns are ExtensionTypes. Am I right to assume this should be possible? Naive approach in this PR segfaults.
Trying a similar thing in c++ fails in RadixRecordBatchSorter
(vector_sort.cc) which could be fixed by disabling further checks (VISIT_SORTABLE_PHYSICAL_TYPES
).
With the current changes in this PR, it fails in RowEncoder::Init
, which also would need to be updated to handle ExtensionTypes (delegating to the storage type).
What I don't fully understand here is why the "non-key field" needs to be included in creating the Encoder? The non-key fields are not used for building up the join values, so why is it important that we support that data type? (naively, I would assume that for such non-key fields, we only need to be able to "take" the resulting values needed for the result)
I'm not sure who best to ask for review here. I'm approaching this very naively and am afraid I'm missing something.
With the current changes in this PR, it fails in
RowEncoder::Init
, which also would need to be updated to handle ExtensionTypes (delegating to the storage type).What I don't fully understand here is why the "non-key field" needs to be included in creating the Encoder? The non-key fields are not used for building up the join values, so why is it important that we support that data type? (naively, I would assume that for such non-key fields, we only need to be able to "take" the resulting values needed for the result)
The data on the build side of hash join is stored in a row oriented way, that is why there is a storage transformation that is not simply a Take. Storing hash table rows in a row oriented way is important for performance, especially when the entire hash table does not fit into CPU cache, because there is only an access to contiguous block of memory (potentially just one or a few cache lines next to each other) instead of multiple loads from separate addresses.
Thanks for the explanation @michalursa! That makes a lot of sense!
@westonpace could I ask you or someone in vicinity to this code for review? :)
Thanks for the review @westonpace! I've pushed proposed changes. Shall I merge if CI passes?
Yes, thanks.
Huh, python and C++ on s390x get the physical type instead of extension.
Probably because we fall back to the basic hash join implementation on big endian machines:
#if ARROW_LITTLE_ENDIAN
use_swiss_join = (filter == literal(true)) && !schema_mgr->HasDictionaries() &&
!schema_mgr->HasLargeBinary();
#else
use_swiss_join = false;
#endif
Got it! I'll adapt HashJoin as well.
@westonpace I had to introduce some changes to RowEncoder
to get the HashJoin
to work. Could you please confirm that I'm not doing something bad and I'll merge afterwards.
That seems reasonable. Feel free to merge. You're basically working on the physical type and saving off the extension type to reattach later?
I think, longer term, we might want to consider doing this at the plan level. So nodes never have to know about the existence of extension types, and we simply attach them, as needed, in the sink. Might want to wait until we have a few compute functions for extension types first though so there is a bit more example.
That seems reasonable. Feel free to merge. You're basically working on the physical type and saving off the extension type to reattach later?
Yeah exactly.
I think, longer term, we might want to consider doing this at the plan level. So nodes never have to know about the existence of extension types, and we simply attach them, as needed, in the sink. Might want to wait until we have a few compute functions for extension types first though so there is a bit more example.
Huh, wouldn't there be cases where extensions would be relevant to the kernels? E.g. geographical coordinates in a certain projection (where extension would contain the projection) and calculating haversine distances.
Benchmark runs are scheduled for baseline = 672431b5146e078406c9295e873705db553fa115 and contender = 50a7d15dfb4cbc4dd449ff2bb3ba2b1cde62a3ab. 50a7d15dfb4cbc4dd449ff2bb3ba2b1cde62a3ab is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished :arrow_down:0.0% :arrow_up:0.0%] ec2-t3-xlarge-us-east-2
[Failed] test-mac-arm
[Failed :arrow_down:0.0% :arrow_up:0.0%] ursa-i9-9960x
[Finished :arrow_down:0.53% :arrow_up:0.11%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] 50a7d15d
ec2-t3-xlarge-us-east-2
[Failed] 50a7d15d
test-mac-arm
[Failed] 50a7d15d
ursa-i9-9960x
[Finished] 50a7d15d
ursa-thinkcentre-m75q
[Finished] 672431b5
ec2-t3-xlarge-us-east-2
[Failed] 672431b5
test-mac-arm
[Failed] 672431b5
ursa-i9-9960x
[Finished] 672431b5
ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java