jackdaw
jackdaw copied to clipboard
How does one create kafka streams setup to read AVRO encoded messages?
Hi,
I've been trying to use streams to be able to read messages from kafka topic that has AVRO encoded messages, AVRO schemas are in AWS Glue.
I've tried many variations of configuration below but failed to get any progress
(defn build-topology [builder input-topics]
(let [topic-maps (map topic-config input-topics)]
(-> (jstreams/kstreams builder topic-maps)
(jstreams/peek forward ONLY PRN MSG)))
builder)
(mount/defstate kafka
:start (let [builder (jstreams/streams-builder)
topology (build-topology builder ["test"])
application-config {"bootstrap.servers" (:kafka-servers config/config)
"consumer.auto.offset.reset" "latest"
"application.id" (str (:profile (mount/args)) "-match-status-" (random-uuid))
"key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer" "com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer"
"avroRecordType" "GENERIC_RECORD"
"registry.name" "test"
"schemaName" "test"}
application (jstreams/kafka-streams
topology
application-config)]
(timbre/info "Starting Kafka Streams application..." {:application-config application-config})
(jstreams/start application)
{:application application
:application-config application-config
:builder builder
:topology topology})
:stop (do (jstreams/close (:application kafka))
{}))
I've setup basic consumer to make sure i am not losing my sanity and this works.
(def consumer (jc/subscribed-consumer {"bootstrap.servers" (:kafka-servers config/config)
"consumer.auto.offset.reset" "latest"
"key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer" "com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer"
"avroRecordType" "GENERIC_RECORD"
"registry.name" "test123"
"group.id" "test123"
"max.poll.records" (.intValue 1)
"schemaName" "test123"}
[{:topic-name "test123"}]))
(.close consumer)
(jc/poll consumer 1000)
I think using streams for avro encoded glue is possible but my lack of skill/knowledge is showing. Any guidance would be appreciated.
Just wanna mention that i tried asking for help in slack and didn't get response for few days now that's why i am opening an issue