Automatic Converting of Snowflake Data Types to Spark Types
Is your feature request related to a problem? Please describe. Snowflake's Data types don t always get mapped to a useful equivalent Type when fetching Data from Snowflake using the Snowflake Spark Connector. For example, Integers stored in Snowflake are mapped to BigInts in Spark and not to Ints, even if the stored numbers are always small. Moreover, complex Datatypes (structs, and list of structs) get stored as the Variant Type. When they are read by the Snowflake Spark Connector, they get converted to a JSON String and not to a complex Data Type.
Describe the solution you'd like When the User knows the Data Types he wants to read (and for example has a case class for the Data Type), SDLB could use that information to convert the DataFrame that was read from Snowflake to a DataSet of said Datatype.
Additional context A sample code similar to this one could be used for that task:
def snowflakeToSparkTypes[A](df: DataFrame)(implicit spark: SparkSession, typeTagArg: TypeTag[A]): Dataset[A] = {
val theType = typeTag[A].tpe
val serializer = ScalaReflection.serializerForType(theType)
val deserializer = ScalaReflection.deserializerForType(theType)
val classTag = ClassTag[A](typeTag[A].mirror.runtimeClass(theType))
val encoder = new ExpressionEncoder(serializer, deserializer, classTag)
val theDS = spark.emptyDataset(encoder)
val expectedSchema: StructType = theDS.schema
val resultDF = expectedSchema.foldLeft(df.toDF()){
case (dataFrame, schemaOfField) =>
schemaOfField.dataType match {
case _: ArrayType | _: StructType =>
dataFrame.withColumn(schemaOfField.name, from_json(col(schemaOfField.name), schemaOfField.dataType))
case _ => dataFrame.withColumn(schemaOfField.name, col(schemaOfField.name).cast(schemaOfField.dataType))
}
}