isarn-sketches-spark
                                
                                
                                
                                    isarn-sketches-spark copied to clipboard
                            
                            
                            
                        Serialize Dataframes from TDigest UDAFs
I had a request to serialize the Dataframes resulting from TDigest UDAFs directly, to something like parquet or other formats.
Here is a workaround that goes through RDD and uses saveAsObjectFile
scala> import org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._
import org.isarnproject.sketches.udaf._
import org.apache.spark.isarnproject.sketches.udt._
scala> val udaf = tdigestUDAF[Double]
udaf: org.isarnproject.sketches.udaf.TDigestUDAF[Double] = TDigestUDAF(0.5,0)
scala> val df = sc.parallelize(Seq((1,2),(3,4),(5,6))).toDF("x1","x2")
df: org.apache.spark.sql.DataFrame = [x1: int, x2: int]
scala> val agg = df.agg(udaf($"x1"))
agg: org.apache.spark.sql.DataFrame = [tdigestudaf(x1): tdigest]
scala> agg.as[TDigestSQL].rdd.saveAsObjectFile("/tmp/tdigest")
scala> val reload = sc.objectFile[TDigestSQL]("/tmp/tdigest").toDF()
reload: org.apache.spark.sql.DataFrame = [value: tdigest]
scala> 
                                    
                                    
                                    
                                
As an FYI on top of this, you can use it to save and restore a data frame too:
val restore = sc.objectFile[Row]("maprfs:///work/dev/streaming-test/test2.obj")
val restoredDF = spark.sqlContext.createDataFrame(restore, minuteDF.schema)
                                    
                                    
                                    
                                
Thanks! I did not know that!
That reminds me - I haven't experimented with this yet but there is one additional method I did not override on my UDTs:
private[spark] override def asNullable: TDigestUDT = this
Parquet i/o in spark requires that the struct-fields be "cast" to nullable as part of the process, so it is possible that this will allow parquet i/o, but I have not tried it out so far.
On Thu, Jul 27, 2017 at 11:47 AM, John Humphreys [email protected] wrote:
As an FYI on top of this, you can use it to save and restore a data frame too:
val restore = sc.objectFileRow val restoredDF = spark.sqlContext.createDataFrame(restore, minuteDF.schema)
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/isarn/isarn-sketches-spark/issues/3#issuecomment-318452326, or mute the thread https://github.com/notifications/unsubscribe-auth/AAP3Oom3WliCPlLkzyoIrLbDvSROdcZ2ks5sSNtDgaJpZM4OcIq3 .
FYI, adding the asNullable method did not help with writing to parquet when I tried it.  I'm still working on figuring this out
Ah okay; that's unfortunate. I'll let you know if I find anything helpful when I get to dig into that problem :).
On Jul 30, 2017, at 2:59 PM, Erik Erlandson [email protected] wrote:
FYI, adding the asNullable method did not help with writing to parquet when I tried it. I'm still working on figuring this out
— You are receiving this because you commented. Reply to this email directly, view it on GitHub, or mute the thread.
Out of curiosity; how are you managing your UDTs? It seems like in Spark 2.1.0 they're private and there is no replacement (yet).
I need to make a UDAF to aggregate BitSets for efficiency in my app and this is killing me.
The trick is to define your own UDT classes in the org.apache.spark scope.  Notice where my UDT files live:
isarn-sketches-spark/src/main/scala/org/apache/spark/isarnproject/sketches/udt/TDigestUDT.scala
and the package is: org.apache.spark.isarnproject.sketches.udt
I'm a bit irritated that they privatized this before they actually came up with a replacement, however at the end of the day it's not too hard to get around it, by doing the above hackery.
Ooh nice trick :).
I managed to hack around it another way but this will be a better fix. Thank you!
On Aug 3, 2017, at 1:28 PM, Erik Erlandson [email protected] wrote:
The trick is to define your own UDT classes in the org.apache.spark scope. Notice where my UDT files live:
isarn-sketches-spark/src/main/scala/org/apache/spark/isarnproject/sketches/udt/TDigestUDT.scala
and the package is: org.apache.spark.isarnproject.sketches.udt
I'm a bit irritated that they privatized this before they actually came up with a replacement, however at the end of the day it's not too hard to get around it, by doing the above hackery.
— You are receiving this because you commented. Reply to this email directly, view it on GitHub, or mute the thread.