sparkle
sparkle copied to clipboard
Switch from Java to Scala Spark API
We currently bind the Java API for Spark. @alpmestan likely remembers the rationale better than me. I assume it was mostly a choice by default, and because the Java API was more straightforward to bind, because it doesn't refer to "non-standard" language specific types such as scala.Tuple2
and scala.Function1
. And more importantly, because the Java API doesn't expose Scala-specific implicit arguments in function signatures, when these implicits become entirely explicit like any other arguments when calling said function from other JVM languages.
However, there are downsides to the Java API:
- The Java API features a proliferation of interfaces for function-like objects. Counter-intuitively,
FlatMapFunction
is not a subtype ofFunction
. There is alsoForeachFunction
,PairFunction
, etc. Since these are unrelated, it means that we can't straightforwardly write a uniformReflect (Closure (a -> b)) JFun1
instance. The Scala API OTOH only hasFunction1
,Function2
etc. - The Scala semantics fit Haskell better: ** there are no void methods: all methods return a value. ** pairs are packed into tuple objects, the way one would expect in Haskell.
- the inheritance hierarchy is messy. Whereas in Scala,
HadoopRDD < RDD < Object
, in Java we haveJavaHadoopRDD < JavaPairRDD < Object
. BothJavaHadoopRDD
andJavaPairRDD
both reimplement wrappers that are implementations of the Java-specificJavaRDDLike
interface.
If we move to the Scala API, we could tighten the codomain of Uncurry
: the Proc
code for void methods would no longer be needed. But really the main goal is: all function objects would be treated uniformly according to their arities, hence allowing us to bind RDD.mapPartition
, RDD.foreach
etc without requiring overlapping instances or newtype wrappers around the closures.
As a side-effect, we'd have slightly less overhead, since we'd be calling into the Scala methods directly, rather than trampolining through their respective Java wrappers first.
So how do we deal with implicits? The only implicits in the API are evidence for Ordering
constraints and ClassTag
. Our bindings do know at runtime the ground type they're instantiated at the call site, so we can generate ClassTag
evidence on demand, inside the binding. Same goes for Ordering
evidence. For ClassTag
, creating evidence goes something like this:
do klass <- classOf x
[java| scala.reflect.ClassTag$.MODULE$.apply($klass) |]
This isn't a huge change besides: pretty much only RDD.hs
and Closure.hs
source files would need to change. The other bindings would stay largely intact.
cc @alpmestan @robinbb @dcoutts
Would this change result in new dependencies?
New Cabal or Java packages? We already depend on scala libs indirectly via Spark (scala is the implementation language for Spark), so I don't think we would have any new dependencies.
Indeed, we would probably just make the dependency on scala (and its standard library I guess?) direct as opposed to transitive.
Only the standard library. Nothing else.
@mboes Just a little note on your initial post here before I forget, but it's really a detail more than anything else: it would be nice to avoid using inline-java in sparkle for now for the sake of supporting ghc 7.10.
Didn't mean to imply that inline-java would be required. It was merely a more convenient notation for the sake of discussion than using call
and friends.
@mboes How much effort is required for this change? Would many more tests be required to verify that this is done correctly?
@mboes @facundominguez Can you give an estimate about how much effort is required for this change?
@robinbb 1-2 days I'd say.
@mboes Thank you for the estimate. Next question: would this create a build-time dependency on Scala that is not now there?
@robinbb it would turn an indirect dependency on the scala standard library (via Spark) into a direct one. So no new dependencies overall.
@mboes Understood. Seems like a worthy change to make.
Per @mboes request, here's a summary of my attempt to add a binding to mapPartitions
for dataframes. The Java function has signature
<U> Dataset<U> mapPartitions(MapPartitionsFunction<T,U> f, Encoder<U> encoder)
Since we work with dataframes, we need this instantiated at U = Row
. Right off the bat this introduced two challenges: we need to extend the Sparkle Closure
module to support this MapPartitionsFunction
, and we need to somehow get that Encoder
. The former is a not really related to this particular ticket, although @facundominguez , @alpmestan and I had some ideas on how to generalize the Closure stuff and that should probably be described elsewhere; but the latter pertains to the Java/Scala question.
It is my understanding that in Scala this encoder would have been implicit, and nobody needs to worry about it. But in Java world we need to provide it explicitly, and nobody seems to say how. There are some mysterious references to RowEncoder
s on the internet but the Spark API docs don't say anything about it; the only thing we found was its Scala source code in the Spark github repo. Eventually managed to come up with a standalone Java program that works:
Encoder<Row> enc = RowEncoder.apply(ds.schema());
Dataset<Row> split = ds.mapPartitions(iter -> new Split(iter), enc);
(where ds :: Dataset<Row>
). Now binding to this Haskell side was a separate challenge. Initially I tried
newtype RowEncoder = RowEncoder (J ('Class "org.apache.spark.sql.Encoder"))
instance Coercible RowEncoder ('Class "org.apache.spark.sql.Encoder")
getRowEncoder :: StructType -> IO RowEncoder
getRowEncoder st =
callStatic (sing :: Sing "org.apache.spark.sql.catalyst.encoders.RowEncoder") "apply" [coerce st]
but that yielded the dreaded NoSuchMethodError
. After looking at the source code for apply
, realize that this actually yields an ExpressionEncoder
; yet another undocumented class. So changing this to
newtype RowEncoder = RowEncoder (J ('Class "org.apache.spark.sql.catalyst.encoders.ExpressionEncoder"))
instance Coercible RowEncoder ('Class "org.apache.spark.sql.catalyst.encoders.ExpressionEncoder")
now makes getRowEncoder
above work. However, now the call to mapPartitions
threw a NoSuchMethodError
. Turns out the solution to that was to manually upcast the ExpressionEncoder
to an Encoder
:
mapPartitionsDF :: Closure (Iterator Row -> Iterator Row)
-> DataFrame -> IO DataFrame
mapPartitionsDF fun df = do
RowEncoder enc <- getRowEncoder =<< schema df
let enc' :: J ('Class "org.apache.spark.sql.Encoder")
enc' = unsafeCast enc
jfun <- reflect (HaskellMapPartitionsFunction fun)
call df "mapPartitions" [coerce jfun, coerce enc']
This now finally works. Well, actually, still getting an exception but I think it's at least finding and invoking the method.
So how do we deal with implicits? The only implicits in the API are evidence for
Ordering
constraints andClassTag
. Our bindings do know at runtime the ground type they're instantiated at the call site, so we can generateClassTag
evidence on demand, inside the binding. Same goes forOrdering
evidence.
I just tested a proof of concept of this idea: https://gist.github.com/mboes/1f31da7e1859371ce5ab74b51397c492
It seems to work!
Note that once we move to the dataset API we'll have a potentially more challenging Implicit to deal with: Encoding
.
Do you mean Encoder
? It looks like those can be created explicitly using static methods in the Encoders
class.