datafusion-ballista icon indicating copy to clipboard operation
datafusion-ballista copied to clipboard

Proposal for more efficient disk-based shuffle mechanism

Open andygrove opened this issue 2 years ago • 6 comments

Is your feature request related to a problem or challenge? Please describe what you are trying to do. The current shuffle mechanism is too basic and produces too many small shuffle files.

Tasks to implement new version:

  • [ ] https://github.com/apache/arrow-ballista/issues/332
  • [ ] Implement a new version of ShuffleWriterExec that can work with a coalesced input of multiple parallel input partitions
  • [ ] Changes to scheduler tracking of shuffle files

Describe the solution you'd like See https://docs.google.com/document/d/16SIEoniAWKSFt8XKDLsOfRQ0sU--5E9_OIyE42Zj808/edit?usp=sharing

Describe alternatives you've considered

Additional context

andygrove avatar Feb 08 '23 14:02 andygrove

@thinkharderdev @yahoNanJing @mingmwang @avantgardnerio Let me know what you think. If this seems like a good idea, I may have some time later this month to try and implement it. I am also happy for someone else to pick this up.

andygrove avatar Feb 08 '23 14:02 andygrove

Sounds like a great idea to me

Dandandan avatar Feb 08 '23 15:02 Dandandan

I ran into some challenges with the proposed design, which I have documented in the Google doc (near the end). Feedback welcome.

andygrove avatar Feb 09 '23 14:02 andygrove

I read through it, sounds indeed a bit more simple.

A nice side effect btw of this optimization is that limit on the shufflewriter is also a bit more effective - allowing to quicker terminate the tasks rather than only applying a local limit to each partition.

Dandandan avatar Feb 09 '23 14:02 Dandandan

Thanks @andygrove for raising the discussion for this topic.

For the second approach, the optimization chance is also limited when tasks of a query stage are assigned to different executors, which is a common case when using the RoundRobin task scheduling policy for load balancing.

Actually, to reduce the shuffle write file, I recommend to use the sort-based shuffle writer used in Spark https://issues.apache.org/jira/browse/SPARK-2045. Then for each original ShuffleWriterExec, there will be only 2 output files rather than N files for its downside stage. One file for shuffling data with concatenating all of the output partition data, and the other one for the indexes of each partition's offset in the data file.

An intuitive graph can be find here, https://github.com/blaze-init/blaze/blob/master/dev/doc/architectural_overview.md.

Hi @yjshen, could you share your opinions?

yahoNanJing avatar Feb 15 '23 05:02 yahoNanJing

This sounds great @yahoNanJing. I would also like to hear if @yjshen has an opinion on this.

andygrove avatar Feb 19 '23 18:02 andygrove