alpakka-kafka
alpakka-kafka copied to clipboard
alpakka throughput is not as claimed
Hi Team, I am trying to use the alpakka-kafka and my consumer throughput in nowhere as claimed in the article -
https://www.lightbend.com/blog/alpakka-kafka-flow-control-optimizations
My Consumer code is as following-
ConsumerSettings<Integer, String> kafkaConsumerSettings =
ConsumerSettings.create(toClassic(actorSystem),
new IntegerDeserializer(), new StringDeserializer())
.withBootstrapServers("localhost:9092")
.withGroupId("docs-group")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withPollInterval(Duration.ofMillis(100))
.withStopTimeout(Duration.ofSeconds(5));
// #kafka-setup
Consumer.DrainingControl<Done> control =
Consumer.sourceWithOffsetContext(kafkaConsumerSettings, Subscriptions.topics("jobs-topic1")) // (5)
.map(
consumerRecord -> { // (6)
System.out.println("startTime>>>>>>>>>>>>>"+start);
Movie movie = JsonMappers.movieReader.readValue(consumerRecord.value());
System.out.println(">>>>>>>>>>>>>"+movie.toString());
System.out.println(counter++);
long end = System.currentTimeMillis();
long seconds = TimeUnit.MILLISECONDS.toSeconds(end-start);
System.out.println("End time>>>>>"+seconds);
//insertDataToMongoDB(actorSystem,movie,start);
//retriveDataFromMongo(actorSystem);
return WriteMessage.createUpsertMessage(String.valueOf(movie.id), movie);
})
Please could someone help me if i am doing something wrong here.Library used here-
implementation group: 'com.lightbend.akka', name: 'akka-stream-alpakka-elasticsearch_2.13', version: '2.0.2'
implementation group: 'com.typesafe.akka', name: 'akka-stream-kafka_2.13', version: '2.0.7'
Reproducing benchmarks results is always a challenge. The numbers from the post were not suggesting a baseline performance metric. The point was to visualize the before and after metrics relative to each other that resulted from the improvement in the Kafka consumer.
The cluster was local to the consumer (running as docker containers) and the following criteria was used as listed in the post:
Number of records: 1,000,000
Test Topic Partitions: 100
Test Topic Replication Factor: 3
Message Size: 5KB
Number of Brokers: 3
All other Alpakka Kafka and Consumer configuration used defaults
There were no external network calls or sources of back-pressure (i.e. an elasticsearch flow or sink, if that's present in your stream).
I am also doing this test on the local machine with 8core and 16GB RAM. Please could you suggest if anything need to be changed on code side To Consume 93473 it taking 8 sec with 1 Broker- 1 Topic -1 partitio
You haven't provided enough information for me to give you any advice.
How large is your local cluster, how many partitions are you using, what is the topic replication factor, what is the message size, what are your consumer properties, etc.
I am having a local cluster with 1 partition, 1 replicas , and message i am using is very small { "title":"Architect", "id":1235 }
My Consumer property is as below-
ConsumerSettings<Integer, String> kafkaConsumerSettings = ConsumerSettings.create(toClassic(actorSystem), new IntegerDeserializer(), new StringDeserializer()) .withBootstrapServers("localhost:9092") .withGroupId("docs-group") .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") .withPollInterval(Duration.ofMillis(100)) .withStopTimeout(Duration.ofSeconds(5));
Please let me know if you need any additional information. need help here to fix this issue how could i achieve the throughput mentioned in the document
I used this benchmark test to produce the results for the blog post. It will output a csv with rows of values representing metrics from brokers and the Kafka consumer client at various time intervals during the test. My machine at the time was a Lenovo X1 Carbon 5th with 16GB ram, max CPU at the time (2 core, 4 with ht), and a SSD.
i am using MAC with 16GB RAM, 8 core and SSD