datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

feat: add a config param to avoid grouping partitions

Open NGA-TRAN opened this issue 9 months ago • 3 comments

Which issue does this PR close?

Closes #https://github.com/apache/datafusion/issues/10257

Rationale for this change

What changes are included in this PR?

Add an option to avoid converting Union to Interleave

Are these changes tested?

Yes

Are there any user-facing changes?

No, because by the fault, nothing is changed

NGA-TRAN avatar Apr 26 '24 21:04 NGA-TRAN

@alamb Is this PR is good enough? We do not need to set prefer_existing_union in DF, right? It will be set in IOx?

NGA-TRAN avatar Apr 26 '24 21:04 NGA-TRAN

cc @mustafasrepo and @ozankabak for your thoughts

alamb avatar Apr 27 '24 12:04 alamb

Thanks @NGA-TRAN and @alamb, we will take a look and circle back on Monday

ozankabak avatar Apr 27 '24 13:04 ozankabak

I think having dedicated config setting is more verbose and clear (as in prefer_existing_union). If we were to use prefer_existing_sort that might also work. However, if the condition to replace UnionExec to InterleaveExec is changed to

plan.as_any().is::<UnionExec>() 
&& !config.optimizer.prefer_existing_sort 
&& can_interleave(children_plans.iter())

this will prefer UnionExec instead of InterleaveExec even if inputs of the UnionExec is unordered when the config.optimizer.prefer_existing_sort flag is true. Which might be counter intuitive given there is no ordering to preserve. However, config.optimizer.prefer_existing_union does exactly what it says. Hence, it is a bit clearer to me. Hence, I think it is better to proceed with current approach in this PR.

In the future, if we add support for OrderPreservingInterleaveExec (this might be accomplished by replacing CombinedRecordBatchStream with streaming_merge in the fn execute method of the InterleaveExec.) using the flag config.optimizer.prefer_existing_sort to decide between InterleaveExec and OrderPreservingInterleaveExec might solve the issue. This approach may invalidate the requirement for prefer_existing_union setting. However, until we have this support current approach is much more clear.

mustafasrepo avatar Apr 29 '24 09:04 mustafasrepo

Hence, I think it is better to proceed with current approach in this PR. (as in prefer_existing_union)

Sounds good to me.

In the future, if we add support for OrderPreservingInterleaveExec (this might be accomplished by replacing CombinedRecordBatchStream with streaming_merge in the fn execute method of the InterleaveExec.)

This is an excellent idea, I will file a ticket to track the idea

@NGA-TRAN can you get the tests passing on this PR and we'll give it another review?

alamb avatar Apr 29 '24 15:04 alamb

@alamb @phillipleblanc and @mustafasrepo all the tests have passed

NGA-TRAN avatar Apr 29 '24 18:04 NGA-TRAN

Since this PR is good to go, merging it in. I am also in the process of filing the follow on work

alamb avatar Apr 30 '24 15:04 alamb

Filed https://github.com/apache/datafusion/issues/10314 to track order preserving UnionExec

alamb avatar Apr 30 '24 16:04 alamb

I also started collecting sorted related optimizations in https://github.com/apache/datafusion/issues/10313 as well

alamb avatar Apr 30 '24 16:04 alamb