cloudflow
cloudflow copied to clipboard
CloudFlow can't convert avro logicalType "timestamp-millis" to catalyst type
Can't convert Avro schema to catalyst type because schema at path dateTime isn't compatible (avroType = TimestampType, sqlType = LONG)
This error can be get in runLocal. If you have some avro schema with logical type "timestamp-millis". In tests no problem. Avro example:
{
"namespace": "some.org.pkg",
"type": "record",
"name": "SomeAvro",
"doc": "SomeAvro",
"fields":[
{
"name": "fieldOne",
"doc": "FieldOne",
"type": "string"
},
{
"name": "dateTime",
"doc": "dateTime",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}
Expected Result: Cloudflow will find a suitable encoder for logicalType timestamp-millis and convert Avro schema type to catalyst type for Spark
@matveevval
Check our the sensor-data-scala
example, it adds an implicit to handle the conversion.
package object sensordata {
implicit def toInstant(millis: Long): Instant = Instant.ofEpochMilli(millis)
}
https://github.com/lightbend/cloudflow/tree/master/examples/snippets/modules/ROOT/examples/sensor-data-scala
@olofwalker Your example doesn't have any Spark streamlets, there are only Akka streamlets.
Also, please help me understand - how that's implicit conversion helps spark to desserialize avro generated case class with field of Instant type.
I think, the problem is that cloudflow-spark doesn't support Instant type natively in SQLImplicits.scala