isarn-sketches-spark icon indicating copy to clipboard operation
isarn-sketches-spark copied to clipboard

Serialize Dataframes from TDigest UDAFs

Open erikerlandson opened this issue 8 years ago • 8 comments

I had a request to serialize the Dataframes resulting from TDigest UDAFs directly, to something like parquet or other formats.

erikerlandson avatar Jul 19 '17 02:07 erikerlandson

Here is a workaround that goes through RDD and uses saveAsObjectFile

scala> import org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._
import org.isarnproject.sketches.udaf._
import org.apache.spark.isarnproject.sketches.udt._

scala> val udaf = tdigestUDAF[Double]
udaf: org.isarnproject.sketches.udaf.TDigestUDAF[Double] = TDigestUDAF(0.5,0)

scala> val df = sc.parallelize(Seq((1,2),(3,4),(5,6))).toDF("x1","x2")
df: org.apache.spark.sql.DataFrame = [x1: int, x2: int]

scala> val agg = df.agg(udaf($"x1"))
agg: org.apache.spark.sql.DataFrame = [tdigestudaf(x1): tdigest]

scala> agg.as[TDigestSQL].rdd.saveAsObjectFile("/tmp/tdigest")

scala> val reload = sc.objectFile[TDigestSQL]("/tmp/tdigest").toDF()
reload: org.apache.spark.sql.DataFrame = [value: tdigest]

scala> 

erikerlandson avatar Jul 19 '17 02:07 erikerlandson

As an FYI on top of this, you can use it to save and restore a data frame too:

val restore = sc.objectFile[Row]("maprfs:///work/dev/streaming-test/test2.obj")
val restoredDF = spark.sqlContext.createDataFrame(restore, minuteDF.schema)

johnwhumphreys avatar Jul 27 '17 18:07 johnwhumphreys

Thanks! I did not know that!

That reminds me - I haven't experimented with this yet but there is one additional method I did not override on my UDTs:

private[spark] override def asNullable: TDigestUDT = this

Parquet i/o in spark requires that the struct-fields be "cast" to nullable as part of the process, so it is possible that this will allow parquet i/o, but I have not tried it out so far.

On Thu, Jul 27, 2017 at 11:47 AM, John Humphreys [email protected] wrote:

As an FYI on top of this, you can use it to save and restore a data frame too:

val restore = sc.objectFileRow val restoredDF = spark.sqlContext.createDataFrame(restore, minuteDF.schema)

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/isarn/isarn-sketches-spark/issues/3#issuecomment-318452326, or mute the thread https://github.com/notifications/unsubscribe-auth/AAP3Oom3WliCPlLkzyoIrLbDvSROdcZ2ks5sSNtDgaJpZM4OcIq3 .

erikerlandson avatar Jul 27 '17 19:07 erikerlandson

FYI, adding the asNullable method did not help with writing to parquet when I tried it. I'm still working on figuring this out

erikerlandson avatar Jul 30 '17 18:07 erikerlandson

Ah okay; that's unfortunate. I'll let you know if I find anything helpful when I get to dig into that problem :).

On Jul 30, 2017, at 2:59 PM, Erik Erlandson [email protected] wrote:

FYI, adding the asNullable method did not help with writing to parquet when I tried it. I'm still working on figuring this out

— You are receiving this because you commented. Reply to this email directly, view it on GitHub, or mute the thread.

johnwhumphreys avatar Jul 31 '17 00:07 johnwhumphreys

Out of curiosity; how are you managing your UDTs? It seems like in Spark 2.1.0 they're private and there is no replacement (yet).

I need to make a UDAF to aggregate BitSets for efficiency in my app and this is killing me.

johnwhumphreys avatar Aug 03 '17 16:08 johnwhumphreys

The trick is to define your own UDT classes in the org.apache.spark scope. Notice where my UDT files live:

isarn-sketches-spark/src/main/scala/org/apache/spark/isarnproject/sketches/udt/TDigestUDT.scala

and the package is: org.apache.spark.isarnproject.sketches.udt

I'm a bit irritated that they privatized this before they actually came up with a replacement, however at the end of the day it's not too hard to get around it, by doing the above hackery.

erikerlandson avatar Aug 03 '17 17:08 erikerlandson

Ooh nice trick :).

I managed to hack around it another way but this will be a better fix. Thank you!

On Aug 3, 2017, at 1:28 PM, Erik Erlandson [email protected] wrote:

The trick is to define your own UDT classes in the org.apache.spark scope. Notice where my UDT files live:

isarn-sketches-spark/src/main/scala/org/apache/spark/isarnproject/sketches/udt/TDigestUDT.scala

and the package is: org.apache.spark.isarnproject.sketches.udt

I'm a bit irritated that they privatized this before they actually came up with a replacement, however at the end of the day it's not too hard to get around it, by doing the above hackery.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub, or mute the thread.

johnwhumphreys avatar Aug 03 '17 22:08 johnwhumphreys