Daft icon indicating copy to clipboard operation
Daft copied to clipboard

Integrate with Celeborn to support distributed push-based shuffle

Open SteNicholas opened this issue 5 months ago • 7 comments

Is your feature request related to a problem?

Distributed Push-Based Shuffler -this will enable Daft to process data an order of magnitude larger than what currently is possible.

Describe the solution you'd like

Celeborn (/ˈkeləbɔ:n/) is dedicated to improving the efficiency and elasticity of different map-reduce engines and provides an elastic, high-efficient management service for intermediate data including shuffle data, spilled data, result data, etc. Currently, Celeborn is focusing on shuffle data. Integration with Celeborn could support distributed push-based shuffler enhancement.

Describe alternatives you've considered

No response

Additional Context

No response

Would you like to implement a fix?

No

SteNicholas avatar Jul 17 '25 07:07 SteNicholas

Ping @colin-ho.

SteNicholas avatar Jul 17 '25 07:07 SteNicholas

Hey there, curious as to how this should be integrated? What API does Celeborn expose that might be convenient for a Rust-based engine such as Daft?

jaychia avatar Jul 19 '25 08:07 jaychia

@jaychia, thanks for replying this ticket. The implementation design of the integration is that support celeborn Rust client to provide daft with Rust SDK. Celeborn provides the REST API and the Java client at present. The public interfaces include PushData, CommitFiles, OpenStream, FetchChunk etc. With celeborn Rust client, the integration with celeborn is easy to implement. The push-based shuffle could be guaranteed by celeborn instead of daft. WDYT?

SteNicholas avatar Jul 20 '25 00:07 SteNicholas

@jaychia, thanks for replying this ticket. The implementation design of the integration is that support celeborn Rust client to provide daft with Rust SDK. Celeborn provides the REST API and the Java client at present. The public interfaces include PushData, CommitFiles, OpenStream, FetchChunk etc. With celeborn Rust client, the integration with celeborn is easy to implement. The push-based shuffle could be guaranteed by celeborn instead of daft. WDYT?

Thanks, REST would be way too slow for any high performance tasks we're trying to do. We have a pluggable shuffle system and would love to collaborate when the interfaces are ready if that makes sense. Also open to comment on any interfaces as you folks are building them out.

TLDR for us -- as long as you stick to something common like Arrow as an interchange, it should be pretty easy. We actually already have an Arrow Flight integration, so maybe explore that as a potential API?

jaychia avatar Jul 22 '25 20:07 jaychia

@jaychia, thanks for replying this ticket. The implementation design of the integration is that support celeborn Rust client to provide daft with Rust SDK. Celeborn provides the REST API and the Java client at present. The public interfaces include PushData, CommitFiles, OpenStream, FetchChunk etc. With celeborn Rust client, the integration with celeborn is easy to implement. The push-based shuffle could be guaranteed by celeborn instead of daft. WDYT?

Thanks, REST would be way too slow for any high performance tasks we're trying to do. We have a pluggable shuffle system and would love to collaborate when the interfaces are ready if that makes sense. Also open to comment on any interfaces as you folks are building them out.

TLDR for us -- as long as you stick to something common like Arrow as an interchange, it should be pretty easy. We actually already have an Arrow Flight integration, so maybe explore that as a potential API?

@jaychia, I have verified the interfaces of FlightService for flight shuffle. Does the pluggable shuffle system mean that the shuffle system is based on FlightService? IMO, the interfaces of distributed push-based shuffle are similar, which make integration with celeborn relatively easy. Meanwhile, celeborn also supports columnar shuffle, which refers to Introduction to Celeborn's Java Columnar Shuffle. The difference is that the communication protocol is different and requires additional adaptation.

Note: the interfaces of celeborn shuffle client refers to ShuffleClient.

BTW, is there performance benchmark report for flight shuffle? This could help measure the performance of the Celeborn integration.

SteNicholas avatar Jul 23 '25 09:07 SteNicholas

We are working on benchmarks! Will be ready in about 3 weeks or so with the prototype + initial benchmarks.

In other words, if Celeborn were to support this protocol (including the underlying communication protocols) we think that this is the best way of integration without requiring JVM and still maintaining good performance (REST will be incredibly performance-limiting)

jaychia avatar Jul 23 '25 18:07 jaychia

The proposal of Flight Shuffle integration with Celeborn: https://docs.google.com/document/d/1Yg8P4GDqrYlZePfaltFSaYAqvcPgZXOrAC4d4OI8Qi4/edit?usp=sharing.

SteNicholas avatar Aug 14 '25 02:08 SteNicholas