alpakka-kafka icon indicating copy to clipboard operation
alpakka-kafka copied to clipboard

alpakka throughput is not as claimed

Open waytoharish opened this issue 3 years ago • 6 comments

Hi Team, I am trying to use the alpakka-kafka and my consumer throughput in nowhere as claimed in the article -

https://www.lightbend.com/blog/alpakka-kafka-flow-control-optimizations

My Consumer code is as following-

 ConsumerSettings<Integer, String> kafkaConsumerSettings =
                ConsumerSettings.create(toClassic(actorSystem),
                         new IntegerDeserializer(), new StringDeserializer())
                        .withBootstrapServers("localhost:9092")
                        .withGroupId("docs-group")
                        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                        .withPollInterval(Duration.ofMillis(100))
                        .withStopTimeout(Duration.ofSeconds(5));
        // #kafka-setup

 Consumer.DrainingControl<Done> control =
                Consumer.sourceWithOffsetContext(kafkaConsumerSettings, Subscriptions.topics("jobs-topic1")) // (5)
                        .map(
                                consumerRecord -> { // (6)

                                    System.out.println("startTime>>>>>>>>>>>>>"+start);
                                    Movie movie = JsonMappers.movieReader.readValue(consumerRecord.value());
                                    System.out.println(">>>>>>>>>>>>>"+movie.toString());
                                    System.out.println(counter++);
                                    long end = System.currentTimeMillis();

                                    long seconds = TimeUnit.MILLISECONDS.toSeconds(end-start);
                                    System.out.println("End time>>>>>"+seconds);
                                    //insertDataToMongoDB(actorSystem,movie,start);
                                    //retriveDataFromMongo(actorSystem);
                                    return WriteMessage.createUpsertMessage(String.valueOf(movie.id), movie);
                                })

Please could someone help me if i am doing something wrong here.Library used here-

implementation group: 'com.lightbend.akka', name: 'akka-stream-alpakka-elasticsearch_2.13', version: '2.0.2'
implementation group: 'com.typesafe.akka', name: 'akka-stream-kafka_2.13', version: '2.0.7'

waytoharish avatar Mar 01 '21 10:03 waytoharish

Reproducing benchmarks results is always a challenge. The numbers from the post were not suggesting a baseline performance metric. The point was to visualize the before and after metrics relative to each other that resulted from the improvement in the Kafka consumer.

The cluster was local to the consumer (running as docker containers) and the following criteria was used as listed in the post:

Number of records: 1,000,000
Test Topic Partitions: 100
Test Topic Replication Factor: 3
Message Size: 5KB
Number of Brokers: 3
All other Alpakka Kafka and Consumer configuration used defaults

There were no external network calls or sources of back-pressure (i.e. an elasticsearch flow or sink, if that's present in your stream).

seglo avatar Mar 01 '21 15:03 seglo

I am also doing this test on the local machine with 8core and 16GB RAM. Please could you suggest if anything need to be changed on code side To Consume 93473 it taking 8 sec with 1 Broker- 1 Topic -1 partitio

waytoharish avatar Mar 02 '21 04:03 waytoharish

You haven't provided enough information for me to give you any advice.

How large is your local cluster, how many partitions are you using, what is the topic replication factor, what is the message size, what are your consumer properties, etc.

seglo avatar Mar 03 '21 22:03 seglo

I am having a local cluster with 1 partition, 1 replicas , and message i am using is very small { "title":"Architect", "id":1235 }

My Consumer property is as below-

ConsumerSettings<Integer, String> kafkaConsumerSettings = ConsumerSettings.create(toClassic(actorSystem), new IntegerDeserializer(), new StringDeserializer()) .withBootstrapServers("localhost:9092") .withGroupId("docs-group") .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") .withPollInterval(Duration.ofMillis(100)) .withStopTimeout(Duration.ofSeconds(5));

Please let me know if you need any additional information. need help here to fix this issue how could i achieve the throughput mentioned in the document

waytoharish avatar Mar 15 '21 12:03 waytoharish

I used this benchmark test to produce the results for the blog post. It will output a csv with rows of values representing metrics from brokers and the Kafka consumer client at various time intervals during the test. My machine at the time was a Lenovo X1 Carbon 5th with 16GB ram, max CPU at the time (2 core, 4 with ht), and a SSD.

seglo avatar Mar 15 '21 15:03 seglo

i am using MAC with 16GB RAM, 8 core and SSD

waytoharish avatar Mar 16 '21 08:03 waytoharish