li-apache-kafka-clients
li-apache-kafka-clients copied to clipboard
Example with Akka stream Kafka for consumer
I'm wondering how this might work with Akka stream? Any example to show how to consume large messages using Akka streams Kafka ?
I don't have an example of using this in Akka stream. What do you mean by Akka streams Kafka?
Ok, unfortunately I couldn't consume the large message using Akka stream. It needs the consumer to deserialize the message correctly.
yeah I mean Reactive Kafka https://doc.akka.io/docs/akka-stream-kafka/current/home.html
Ok, unfortunately I couldn't consume the large message using Akka stream. It needs the consumer to deserialize the message correctly.好的,不幸的是我无法使用 Akka 流来处理大消息。它需要消费者正确反序列化消息。
yeah I mean Reactive Kafka https://doc.akka.io/docs/akka-stream-kafka/current/home.html是的,我的意思是 Reactive Kafka https://doc.akka.io/docs/akka-stream-kafka/current/home.html
work fine with the below code.
the most important code is: .withConsumerFactory(e -> factory.createConsumer(e.getProperties()));
replace ConsumerFactory with linkedin kafka client consumer factory.
ConsumerSettings settings =
ConsumerSettings.create(
config, new StringDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers(mqAddress)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
.withProperties(this.properties)
.withProperty(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName())
.withProperty(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName())
.withConsumerFactory(e -> factory.createConsumer(e.getProperties()));