kafka-connect-redis icon indicating copy to clipboard operation
kafka-connect-redis copied to clipboard

Avro de-serialization failure - TopicNameStrategy

Open bluedog13 opened this issue 3 years ago • 2 comments
trafficstars

The record (value) in the Kafka topic is Avro serialized. I am using the below redis sink connector settings to copy the data from Kafka topic to Redis

{
  "name": "redis-sink-connector",
  "config": {
    "connector.class"                                     : "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
    "redis.hosts"                                         : "redis:6379",
    "tasks.max"                                           : 1,
    "topics"                                              : "[redacted]",
    "key.converter.schemas.enable"                        : "false",
    "key.converter"                                       : "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schema.registry.url"                   : "https://[redacted]azure.confluent.cloud",
    "key.converter.schema.registry.basic.auth.user.info"  : "[redacted]:[redacted]",
    "key.converter.basic.auth.credentials.source"         : "USER_INFO",
    "value.converter.schemas.enable"                      : "true",
    "value.converter"                                     : "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url"                 : "https://[redacted].azure.confluent.cloud",
    "value.converter.schema.registry.basic.auth.user.info": "[redacted]:[redacted]",
    "value.converter.basic.auth.credentials.source"       : "USER_INFO",
    "value.converter.value.subject.name.strategy"         : "io.confluent.kafka.serializers.subject.TopicNameStrategy"
  }
}

I see the below error

Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: The value for the record must be String or Bytes. Consider using the ByteArrayConverter or StringConverter if the data is stored in Kafka in the format needed in Redis. Another option is to use a single message transformation to transform the data before it is written to Redis. (org.apache.kafka.connect.runtime.WorkerSinkTask)

In the samples for this repo, I see "TopicRecordNameStrategy" is used for Avro de-serialization, so I wanted to know if it only work for "TopicRecordNameStrategy" strategy or any other strategy - as in my case (TopicNameStrategy)?

Thanks.

bluedog13 avatar Apr 03 '22 14:04 bluedog13

I haven't played with any other naming strategies so at this time it's not supported. I'm not sure what the problem is from looking at the error message so I'd have to play with it to figure out what's going on.

jaredpetersen avatar Apr 11 '22 18:04 jaredpetersen

Thank you for the reply. I was able to find an an alternative solution that works for TopicNameStrategy.

I used the "ByteArrayConverter" to move the data into Redis in "As-is" format from Kafka. Then I used the raw bytes from Redis on the reader side to connect to the Schema Registry and de-serialize the message.

On the connector side

"value.converter" : "org.apache.kafka.connect.converters.ByteArrayConverter",

On the Redis C# consumer side

AvroDeserializer<T> deserializer = new AvroDeserializer<T>(schemaRegistryClient);
T result = await deserializer.DeserializeAsync(obj, false, SerializationContext.Empty);

bluedog13 avatar Apr 11 '22 18:04 bluedog13