datafusion-ballista icon indicating copy to clipboard operation
datafusion-ballista copied to clipboard

Support broadcast exchange

Open Dandandan opened this issue 3 years ago • 6 comments

Is your feature request related to a problem or challenge? Please describe what you are trying to do. Broadcasting partitions helps for when joins on the build side are small. In that case we can transform partitioned joins to broadcast joins.

Describe the solution you'd like We should support broadcasts in the physical plan.

Broadcasting means copying the entire dataset to each worker.

This could be used in broadcast joins, i.e. by broadcasting smaller dataframes to every worker, which can provide big speedups as the other (big) side doesn't have to be shuffled.

Describe alternatives you've considered

Additional context

Probably we can reuse some heuristics from Spark for conditions when to perform broadcasting for joins.

Dandandan avatar Oct 11 '22 17:10 Dandandan

One quick question regarding this, after those dataset are copied to each executor, should they kept in-memory or spilled to disk, if keep them in memory for a while, memory usage might be a concern.

mingmwang avatar Oct 12 '22 16:10 mingmwang

@mingmwang I think for broadcasting exchange the same thing applies as normal exchanges, they are spilled to disk by default and might be maintained in memory if memory budget allows. I believe Ballista doesn't support the latter yet(?). A limit may be chosen, like 100MB, so it will fit in memory most often, but even when written to disk for joins it might give impressive speedups as the other side of the join could be way larger.

Dandandan avatar Oct 12 '22 16:10 Dandandan

@Dandandan Sounds nice. There are couple of things need to do to support the broadcasting exchange.

  1. Rpc protocols to efficiently do broadcasting, something similar to Spark's TorrentBroadcast
  2. Executor memory management as you just mentioned.
  3. Broadcast lifecycle management, after SQL finish, those broadcasts should be cleaned.
  4. Planner need more accurate stats to choose broadcast join or partitioned hash join or even SortMergeJoin, build side selection etc. Need to enhance the stats collection.
  5. Broadcast exchange reuse within SQL?
  6. Broadcast reuse between SQLs ?

mingmwang avatar Oct 12 '22 16:10 mingmwang

Today, For partitioned hash join, DataFusion already support CollectLeft model, I think it is similar to the Broadcast HashJoin. I do not get a chance to test it on Ballista yet, but I think it should work in the distribution model. The downside is the Left side might cause lots of duplicate re-computations.


  fn required_input_distribution(&self) -> Vec<Distribution> {
        match self.mode {
            PartitionMode::CollectLeft => vec![
                Distribution::SinglePartition,
                Distribution::UnspecifiedDistribution,
            ],
            PartitionMode::Partitioned => {
                let (left_expr, right_expr) = self
                    .on
                    .iter()
                    .map(|(l, r)| {
                        (
                            Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
                            Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
                        )
                    })
                    .unzip();
                vec![
                    Distribution::HashPartitioned(left_expr),
                    Distribution::HashPartitioned(right_expr),
                ]
            }
        }
    }

mingmwang avatar Oct 12 '22 16:10 mingmwang

That's a good observation @mingmwang ! The difference with CollectLeft is that that mode collects the left side to one partition, whereas with broadcast we would broadcast the output of the left side to each worker.

Indeed, I think the trade off is that doing a bit more on the left side (i.e. building the hash table in each worker) we save the work on the right side (shuffle).

Dandandan avatar Oct 12 '22 17:10 Dandandan

I added some details for implementing a broadcast join optimization rule here: https://github.com/apache/arrow-ballista/issues/348 .

Dandandan avatar Oct 14 '22 11:10 Dandandan