3.4+
Fixes https://github.com/Kotlin/kotlin-spark-api/issues/195, which is a fun read if you're interested in the process :)
This is a work-in-progress overhaul of the core parts of the library to support Spark 3.4+.
Why
- Too much has changed in Spark 3.4+ due to Spark decoupling their encoding/decoding system with Spark Connect in mind.
- Our previous method was hacky and made us publish exact versions of Spark to maintain bytecode-level compatibility.
- We too would like Spark Connect support in the future :)
- We need to keep supporting newer Spark versions
What has changed
- Removed the
:coremodule entirely. No more spark-package injected code that can break at the bytecode level. - Instead, we just have a
:scala-helpersmodule which doesn't even depend on Spark atm. We just need the VarargUnwrapper class. - Rewrote Encoding from the ground up in pure Kotlin this time. We use the power of Kotlin reflection. I took inspiration of JavaTypeInference and ScalaReflection, which, since 3.4, now build an
AgnosticEncoderas a sort-of intermediate step in building anEncoderfor the data. This non-implementation-specific encoder can be turned into an actual encoder by passing it toExpressionEncoder()or into something entirely different (which is what makes Spark Connect possible). - Our
KotlinTypeInference.encoderForimplementation is a mix of the Java and Scala types, supporting both Scala/Java lists, primitives, scala Tuples, and most importantly Kotlin data classes. - One downside of having to create an
AgnosticEncoderis that we are limited to theAgnosticEncodersoffered to us by Spark. We cannot write our own (de)serializers anymore if we want to support Spark Connect. So, in order to support data classes, we need to hijackProductEncoder. - Deserializing data classes using
ProductEncoderworks fine, but for serializing we hit a snag. In Scala, case classes have a function with the same name as each property. This assumption is used under the hood, so we need to make sure those functions exist in our data classes. Plus, later I found this function to do an actual instance check to see if the value is ascala.Product... It's compiler plugin time! - I created a Kotlin compiler plugin which, when applied to your project, can convert:
@Sparkify
data class User(
val name: String,
@ColumnName("test") val age: Int,
)
to
@Sparkify
data class User(
@get:JvmName("name") val name: String,
@get:JvmName("test") @ColumnName("test") val age: Int,
): scala.Product, Serializable {
override fun canEqual(that: Any?): Boolean = that is User
override fun productArity(): Int = 2
override fun productElement(n: Int): Any? =
if (n == 0) this.name
else if (n == 1) this.age
else throw IndexOutOfBoundsException()
}
satisfying both needs from Spark. One downside of this approach is that now you need to annotate each data class you want to encode with @Sparkify (else the column names will be getName and getAge). And you cannot annotate external data classes like Pair :/ So I recommend working with tuples from now on (or make your own @Sparkify Pair).
- The compiler plugin (
:compiler-plugin) is going to be applicable to your Gradle project by the gradle plugin (:gradle-plugin) withid("org.jetbrains.kotlinx.spark.api") version Xor in maven with the<compilerPlugins>tag (probably). - The
:kotlin-spark-apiand:examplesmodules also depend on these two plugins for their tests. This is done with a gradle trick that updates bootstrap jars and adds them to the classpath/repository. - Updated to Kotlin 2.0 Beta 5. You should still be able to use 1.9.23 with the compiler plugin, since it just uses IR. It does not require K2.
- For Kotlin 2.0, just make sure you set
freeCompilerArgs.add("-Xlambdas=class")since Spark cannot serialize lamdas otherwise. If you use the gradle plugin, this is done for you.
TODO
- [x] Provide warnings for non-Sparkified classes, especially for
Pair/Triple - [x] Java bean as fallback encoder
- [x] Jupyter support
- [ ] Finalize Jupyter support
- [x] UDTs for non-generic Kotlin types like
Instant,LocalDateTimeetc. - [ ] Spark Connect
- [ ] Docs
- [ ] Fix RddTest "Work with any number"
- [ ] Remove streaming in favor of structured streaming, update examples
added encoding for KotlinX: DatePeriod, DateTimePeriod, Instant, LocalDateTime, and LocalDate, kotlin.time.Duration is sadly not working as it's a value class. (I think that's the reason)