cloudflow icon indicating copy to clipboard operation
cloudflow copied to clipboard

CloudFlow can't convert avro logicalType "timestamp-millis" to catalyst type

Open matveevval opened this issue 4 years ago • 2 comments

Can't convert Avro schema to catalyst type because schema at path dateTime isn't compatible (avroType = TimestampType, sqlType = LONG)

This error can be get in runLocal. If you have some avro schema with logical type "timestamp-millis". In tests no problem. Avro example:

{
  "namespace": "some.org.pkg",
  "type": "record",
  "name": "SomeAvro",
  "doc": "SomeAvro",
  "fields":[
    {
      "name": "fieldOne",
      "doc": "FieldOne",
      "type": "string"
    },
    {
      "name": "dateTime",
      "doc": "dateTime",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
]
}

Expected Result: Cloudflow will find a suitable encoder for logicalType timestamp-millis and convert Avro schema type to catalyst type for Spark

CloudFlowEx.log

matveevval avatar Aug 04 '20 20:08 matveevval

@matveevval

Check our the sensor-data-scala example, it adds an implicit to handle the conversion.

package object sensordata {
  implicit def toInstant(millis: Long): Instant = Instant.ofEpochMilli(millis)
}

https://github.com/lightbend/cloudflow/tree/master/examples/snippets/modules/ROOT/examples/sensor-data-scala

olofwalker avatar Aug 06 '20 13:08 olofwalker

@olofwalker Your example doesn't have any Spark streamlets, there are only Akka streamlets.

Also, please help me understand - how that's implicit conversion helps spark to desserialize avro generated case class with field of Instant type.

I think, the problem is that cloudflow-spark doesn't support Instant type natively in SQLImplicits.scala

sd1ver avatar Aug 10 '20 12:08 sd1ver