Spark.jl icon indicating copy to clipboard operation
Spark.jl copied to clipboard

Use Apache Arrow for interprocess communication

Open dfdx opened this issue 6 years ago • 7 comments

According Apache Arrow description, it provides "zero-copy streaming messaging", which may be helpful to eliminate the cost of transferring data between JVM and Julia. Julia already has bindings for Arrow, so it shouldn't be too much work (although may be worth to wait until Julia 0.7).

cc: @xmehaut

dfdx avatar Mar 21 '18 09:03 dfdx

I'm toying with the idea of expanding my Arrow package so that it completes the implementation so that we can use it here. I haven't completely made up my mind on whether I'll take this on yet, but if people here are interested, I'd like to hear about it.

ExpandingMan avatar Feb 25 '19 22:02 ExpandingMan

Yes! From my observations interprocess communication is the main performance killer for RDD API, so switching to Arrow should be the most important improvement in a while. Although, I did't look into API yet, so it will take time to do the change.

dfdx avatar Feb 27 '19 20:02 dfdx

Now with Arrow.jl in a good state might be worth revisiting this?

ValdarT avatar Feb 12 '21 09:02 ValdarT

Although integrating Arrow into existing API may be easy, I believe we need to drop RDD API and fully migrate to Dataset API first - otherwise we will need to implement serialization layer twice, one of them for an interface which is little used nowadays.

To fully support dataset API we must implement Julia UDFs similar to PythonUDF. PythonUDF extends Catalyst's Expression and honestly I don't yet understand all the underlying machinery. So it's quite a huge change. I'll try to gather more information and create a preliminary plan, but can't commit to any changes in near future.

dfdx avatar Feb 15 '21 23:02 dfdx

Using Arrow seems a like a good idea to me, I'd be willing to help implementing this, but I'd probably need some help. I think I know how to use Arrow in Julia and create Arrow data in Spark, but I'm really not sure how to send the data from JVM to Julia without copying :/

I find it limiting that only few primitive field types are currently, no support for arrays, structs and maps. Using Arrow instead of jcall based conversion mechanism (on Dataset) or the custom format (in RDDs) should also help this problem (apart from being faster), right?

exyi avatar Jul 17 '21 17:07 exyi

Currently Julia and JVM communicate in 2 ways:

  • Julia starts a JVM and calls Java functions via JavaCall. In particular, Julia driver creates Spark application and delegates computations to JVM, including JuliaRDD
  • JVM, or more specifically, JuliaRDD starts a Julia worker per partition and streams RDD's data to them via normal OS sockets

The most important parts on JVM side are JuliaRDD.writeValueToStream() and JuliaRDD.readValueFromStream(). Julia worker is started via worker_runner.jl.

At the moment I don't see a way to implement Julia UDFs, so apparently we are left with RDD API. I didn't use Arrow or Arrow.jl myself, but I guess to migrate to it in Spark.jl we need to create an Arrow data structure in the JVM and then reference it from the Julia worker. Since I don't do much Spark work lately, I don't have a specific plan for this, but I'll be happy to support it as much as I can!

dfdx avatar Jul 17 '21 20:07 dfdx

More broadly, there are several ways to efficiently bring custom Julia functions to Spark clusters including things like compiling Julia to Java and creating a new distributed computation framework. But the demand for such features is unclear for me. @exyi do you already have a use case for custom Julia functions on Spark?

dfdx avatar Jul 17 '21 21:07 dfdx