datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

CrossJoin Implementation on (M x N) Partitions

Open berkaysynnada opened this issue 1 year ago • 1 comments

Is your feature request related to a problem or challenge?

There is a TODO item in CrossJoin: https://github.com/apache/arrow-datafusion/blob/2f550032140d42d1ee6d8ed86f7790766fa7302e/datafusion/physical-plan/src/joins/cross_join.rs#L122

Currently CrossJoin partition count is the partition count of the right child. We can increase parallelism here if allowed.

Describe the solution you'd like

Let's say left has M and right has N partitions, and the target partition is T. We can increase the parallelism by getting the left partitions count to floor[T/N] (assuming T is not smaller than N). If (M x N) is smaller or equal than T, there would be no need to coalesce left partitions also.

Describe alternatives you've considered

Additional context

Theoretically, for example, 1x8 partitions of joins does the same amount of unit work with 2x4, but in practice, 2x4 parallelism may be more preferable (I have no solid evidence). So, without changing the target partitions, such kind of parallelism adjustment also be done if it is proved that it works better.

berkaysynnada avatar Apr 04 '24 06:04 berkaysynnada

How would this work? At this moment the entire left side is loaded in memory, basically performing a https://en.m.wikipedia.org/wiki/Block_nested_loop with all data loaded into memory. The left child plan can be executed in parallel. Not loading all of the left side seems to require scanning one side more than once?

Dandandan avatar Oct 18 '24 17:10 Dandandan

How would this work? At this moment the entire left side is loaded in memory, basically performing a https://en.m.wikipedia.org/wiki/Block_nested_loop with all data loaded into memory. The left child plan can be executed in parallel. Not loading all of the left side seems to require scanning one side more than once?

Let's say left has 2 partitions (L1, L2), and right has 3(R1, R2, R3), and the target partition count is 6(T1, T2, T3, T4, T5, T6). execute() of CrossJoin is called for those 6 partitions, and each of them calls corresponding input partitions, like T1: L1-R1 -- T2: L1-R2 -- T3: L1-R3 -- T4: L2-R1 -- T5: L2-R2 -- T6: L2-R3

I don't think we need multiple scan for the same source. The first one can scan and share the data with other users. That said, I am not sure this approach would bring significant performance improvements, but I saw the TODO item and thought it would be interesting to discuss with you.

berkaysynnada avatar Oct 21 '24 07:10 berkaysynnada

No worries, just trying to understand the benefit of implementing this.

So thinking more about it I think there might be two (I think smaller) improvements from implementing this compared to what we have now:

  • Right side can start executing earlier as soon as data on left side is available instead of waiting on all partitions to be loaded, can be helpful if data is not balanced and might utilize resources a bit more, e.g. if one side is IO bound and the other more CPU bound.
  • There is a bit more parallelism available, will be helpful if the inner plans can not be parallelized well (i.e. 1 < partitions < target_partitions)

Dandandan avatar Oct 21 '24 07:10 Dandandan