li-apache-kafka-clients icon indicating copy to clipboard operation
li-apache-kafka-clients copied to clipboard

Example with Akka stream Kafka for consumer

Open mavencode01 opened this issue 6 years ago • 3 comments

I'm wondering how this might work with Akka stream? Any example to show how to consume large messages using Akka streams Kafka ?

mavencode01 avatar Mar 20 '18 01:03 mavencode01

I don't have an example of using this in Akka stream. What do you mean by Akka streams Kafka?

becketqin avatar Mar 21 '18 03:03 becketqin

Ok, unfortunately I couldn't consume the large message using Akka stream. It needs the consumer to deserialize the message correctly.

yeah I mean Reactive Kafka https://doc.akka.io/docs/akka-stream-kafka/current/home.html

mavencode01 avatar Mar 21 '18 18:03 mavencode01

Ok, unfortunately I couldn't consume the large message using Akka stream. It needs the consumer to deserialize the message correctly.好的,不幸的是我无法使用 Akka 流来处理大消息。它需要消费者正确反序列化消息。

yeah I mean Reactive Kafka https://doc.akka.io/docs/akka-stream-kafka/current/home.html是的,我的意思是 Reactive Kafka https://doc.akka.io/docs/akka-stream-kafka/current/home.html

work fine with the below code.

the most important code is: .withConsumerFactory(e -> factory.createConsumer(e.getProperties()));

replace ConsumerFactory with linkedin kafka client consumer factory.

ConsumerSettings settings =
        ConsumerSettings.create(
                        config, new StringDeserializer(), new ByteArrayDeserializer())
                .withBootstrapServers(mqAddress)
                .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                .withProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
                .withProperties(this.properties)
                .withProperty(
                        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                        StringDeserializer.class.getName())
                .withProperty(
                        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                        ByteArrayDeserializer.class.getName())
                .withConsumerFactory(e -> factory.createConsumer(e.getProperties()));

Roiocam avatar Jun 01 '23 07:06 Roiocam