sparkle icon indicating copy to clipboard operation
sparkle copied to clipboard

Switch from Java to Scala Spark API

Open mboes opened this issue 8 years ago • 16 comments

We currently bind the Java API for Spark. @alpmestan likely remembers the rationale better than me. I assume it was mostly a choice by default, and because the Java API was more straightforward to bind, because it doesn't refer to "non-standard" language specific types such as scala.Tuple2 and scala.Function1. And more importantly, because the Java API doesn't expose Scala-specific implicit arguments in function signatures, when these implicits become entirely explicit like any other arguments when calling said function from other JVM languages.

However, there are downsides to the Java API:

  • The Java API features a proliferation of interfaces for function-like objects. Counter-intuitively, FlatMapFunction is not a subtype of Function. There is also ForeachFunction, PairFunction, etc. Since these are unrelated, it means that we can't straightforwardly write a uniform Reflect (Closure (a -> b)) JFun1 instance. The Scala API OTOH only has Function1, Function2 etc.
  • The Scala semantics fit Haskell better: ** there are no void methods: all methods return a value. ** pairs are packed into tuple objects, the way one would expect in Haskell.
  • the inheritance hierarchy is messy. Whereas in Scala, HadoopRDD < RDD < Object, in Java we have JavaHadoopRDD < JavaPairRDD < Object. Both JavaHadoopRDD and JavaPairRDD both reimplement wrappers that are implementations of the Java-specific JavaRDDLike interface.

If we move to the Scala API, we could tighten the codomain of Uncurry: the Proc code for void methods would no longer be needed. But really the main goal is: all function objects would be treated uniformly according to their arities, hence allowing us to bind RDD.mapPartition, RDD.foreach etc without requiring overlapping instances or newtype wrappers around the closures.

As a side-effect, we'd have slightly less overhead, since we'd be calling into the Scala methods directly, rather than trampolining through their respective Java wrappers first.

So how do we deal with implicits? The only implicits in the API are evidence for Ordering constraints and ClassTag. Our bindings do know at runtime the ground type they're instantiated at the call site, so we can generate ClassTag evidence on demand, inside the binding. Same goes for Ordering evidence. For ClassTag, creating evidence goes something like this:

do klass <- classOf x
    [java| scala.reflect.ClassTag$.MODULE$.apply($klass) |]

This isn't a huge change besides: pretty much only RDD.hs and Closure.hs source files would need to change. The other bindings would stay largely intact.

cc @alpmestan @robinbb @dcoutts

mboes avatar Nov 13 '16 13:11 mboes

Would this change result in new dependencies?

robinbb avatar Nov 13 '16 19:11 robinbb

New Cabal or Java packages? We already depend on scala libs indirectly via Spark (scala is the implementation language for Spark), so I don't think we would have any new dependencies.

mboes avatar Nov 13 '16 20:11 mboes

Indeed, we would probably just make the dependency on scala (and its standard library I guess?) direct as opposed to transitive.

alpmestan avatar Nov 13 '16 23:11 alpmestan

Only the standard library. Nothing else.

mboes avatar Nov 14 '16 04:11 mboes

@mboes Just a little note on your initial post here before I forget, but it's really a detail more than anything else: it would be nice to avoid using inline-java in sparkle for now for the sake of supporting ghc 7.10.

alpmestan avatar Nov 14 '16 10:11 alpmestan

Didn't mean to imply that inline-java would be required. It was merely a more convenient notation for the sake of discussion than using call and friends.

mboes avatar Nov 14 '16 13:11 mboes

@mboes How much effort is required for this change? Would many more tests be required to verify that this is done correctly?

robinbb avatar Dec 07 '16 21:12 robinbb

@mboes @facundominguez Can you give an estimate about how much effort is required for this change?

robinbb avatar Feb 06 '17 17:02 robinbb

@robinbb 1-2 days I'd say.

mboes avatar Feb 06 '17 17:02 mboes

@mboes Thank you for the estimate. Next question: would this create a build-time dependency on Scala that is not now there?

robinbb avatar Feb 06 '17 22:02 robinbb

@robinbb it would turn an indirect dependency on the scala standard library (via Spark) into a direct one. So no new dependencies overall.

mboes avatar Feb 06 '17 22:02 mboes

@mboes Understood. Seems like a worthy change to make.

robinbb avatar Feb 06 '17 23:02 robinbb

Per @mboes request, here's a summary of my attempt to add a binding to mapPartitions for dataframes. The Java function has signature

<U> Dataset<U> mapPartitions(MapPartitionsFunction<T,U> f, Encoder<U> encoder)

Since we work with dataframes, we need this instantiated at U = Row. Right off the bat this introduced two challenges: we need to extend the Sparkle Closure module to support this MapPartitionsFunction, and we need to somehow get that Encoder. The former is a not really related to this particular ticket, although @facundominguez , @alpmestan and I had some ideas on how to generalize the Closure stuff and that should probably be described elsewhere; but the latter pertains to the Java/Scala question.

It is my understanding that in Scala this encoder would have been implicit, and nobody needs to worry about it. But in Java world we need to provide it explicitly, and nobody seems to say how. There are some mysterious references to RowEncoders on the internet but the Spark API docs don't say anything about it; the only thing we found was its Scala source code in the Spark github repo. Eventually managed to come up with a standalone Java program that works:

    Encoder<Row> enc   = RowEncoder.apply(ds.schema());
    Dataset<Row> split = ds.mapPartitions(iter -> new Split(iter), enc);

(where ds :: Dataset<Row>). Now binding to this Haskell side was a separate challenge. Initially I tried

newtype RowEncoder = RowEncoder (J ('Class "org.apache.spark.sql.Encoder"))
instance Coercible RowEncoder ('Class "org.apache.spark.sql.Encoder")

getRowEncoder :: StructType -> IO RowEncoder
getRowEncoder st =
    callStatic (sing :: Sing "org.apache.spark.sql.catalyst.encoders.RowEncoder") "apply" [coerce st]

but that yielded the dreaded NoSuchMethodError. After looking at the source code for apply, realize that this actually yields an ExpressionEncoder; yet another undocumented class. So changing this to

newtype RowEncoder = RowEncoder (J ('Class "org.apache.spark.sql.catalyst.encoders.ExpressionEncoder"))
instance Coercible RowEncoder ('Class "org.apache.spark.sql.catalyst.encoders.ExpressionEncoder")

now makes getRowEncoder above work. However, now the call to mapPartitions threw a NoSuchMethodError. Turns out the solution to that was to manually upcast the ExpressionEncoder to an Encoder:

mapPartitionsDF :: Closure (Iterator Row -> Iterator Row)
                -> DataFrame -> IO DataFrame
mapPartitionsDF fun df = do
    RowEncoder enc <- getRowEncoder =<< schema df
    let enc' :: J ('Class "org.apache.spark.sql.Encoder")
        enc' = unsafeCast enc
    jfun <- reflect (HaskellMapPartitionsFunction fun)
    call df "mapPartitions" [coerce jfun, coerce enc']

This now finally works. Well, actually, still getting an exception but I think it's at least finding and invoking the method.

edsko avatar Feb 22 '17 11:02 edsko

So how do we deal with implicits? The only implicits in the API are evidence for Ordering constraints and ClassTag. Our bindings do know at runtime the ground type they're instantiated at the call site, so we can generate ClassTag evidence on demand, inside the binding. Same goes for Ordering evidence.

I just tested a proof of concept of this idea: https://gist.github.com/mboes/1f31da7e1859371ce5ab74b51397c492

It seems to work!

mboes avatar Apr 06 '17 16:04 mboes

Note that once we move to the dataset API we'll have a potentially more challenging Implicit to deal with: Encoding.

edsko avatar Apr 06 '17 16:04 edsko

Do you mean Encoder? It looks like those can be created explicitly using static methods in the Encoders class.

mboes avatar Apr 06 '17 16:04 mboes