alpakka-kafka icon indicating copy to clipboard operation
alpakka-kafka copied to clipboard

Consumer is not consumed from few partitions

Open GnanasoundariSoundarajan opened this issue 7 years ago • 12 comments

Infrastructure:

Kafka is running in cluster mode with 3 brokers and 3 zookeeper instances. Kafka broker is running in t2.xlarge AWS instance. 3 Zookeeper instances is running in single t2.xlarge AWS instance. Kafka Version: 1.1.0

Issue: Kafka consumer is not consuming messages from few partitions though the consumer is still active and subscribed to the partition in kafka.

How to reproduce: -Send message to a topic once in a day. Keep message flow in non-predictable rate.

Observation: There is no error/warning messages/rebalance during the period in kafka as well as consumer.

Is that consumer is not consuming messages from kafka though message reaches the partition? This issue we are not able to find in other topics where we are getting stream of message flow is happening in the same server.

Kafka Consumer settings:

enable.auto.commit = true session.timeout.ms = 90000 heartbeat.interval.ms = 30000 max.poll.records = 1000

poll-interval = 60ms poll-timeout = 60ms

Kafka Subscription status:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
asset-action2 0 0 0 0 consumer-1-3c56c5a4-b629-4044-9570-3fddaf3aff84 /CN4 consumer-1
asset-action2 1 1 1 0 consumer-1-3c56c5a4-b629-4044-9570-3fddaf3aff84 /CN4 consumer-1
asset-action2 2 9 9 0 consumer-1-3c56c5a4-b629-4044-9570-3fddaf3aff84 /CN4 consumer-1
asset-action2 3 20 20 0 consumer-1-3c56c5a4-b629-4044-9570-3fddaf3aff84 /CN4 consumer-1
asset-action3 0 6 6 0 consumer-2-4baad662-d0cd-4d1b-af8d-9e7f9c3f5028 /CN4 consumer-2
asset-action3 1 0 0 0 consumer-2-4baad662-d0cd-4d1b-af8d-9e7f9c3f5028 /CN4 consumer-2
asset-action-request4 0 0 0 0 consumer-5-d9dbeb5b-57d8-43b5-9999-8ee4688c51b8 /CN1 consumer-5
asset-action-request4 1 0 0 0 consumer-5-d9dbeb5b-57d8-43b5-9999-8ee4688c51b8 /CN1 consumer-5
asset-action-request0 0 9 14 5 consumer-10-5b16308f-2386-4bcc-a8e9-819e921a8100 /CN4 consumer-10
asset-action-request0 1 4 6 2 consumer-10-5b16308f-2386-4bcc-a8e9-819e921a8100 /CN4 consumer-10
asset-action-request3 0 0 0 0 consumer-5-32893946-4835-479d-9cd2-96a6b1ae5185 /CN4 consumer-5
asset-action-request3 1 0 0 0 consumer-5-32893946-4835-479d-9cd2-96a6b1ae5185 /CN4 consumer-5
asset-action-request2 0 0 0 0 consumer-6-62ac1703-de40-46f9-9a45-484eff411752 /CN2 consumer-6
asset-action-request2 1 0 0 0 consumer-6-62ac1703-de40-46f9-9a45-484eff411752 /CN2 consumer-6
asset-action4 2 14 14 0 consumer-6-39d83700-9ca4-44db-a8b4-6e294b8112a5 /CN3 consumer-6
asset-action4 3 0 0 0 consumer-6-39d83700-9ca4-44db-a8b4-6e294b8112a5 /CN3 consumer-6
asset-action-request1 0 0 0 0 consumer-10-70f886b3-e1d9-413c-bb67-10af165c45e5 /CN2 consumer-10
asset-action-request1 1 0 0 0 consumer-10-70f886b3-e1d9-413c-bb67-10af165c45e5 /CN2 consumer-10
asset-action3 2 2 2 0 consumer-3-c3421e00-45e7-45c2-ab4e-dde96f655a77 /CN1 consumer-3
asset-action3 3 6 6 0 consumer-3-c3421e00-45e7-45c2-ab4e-dde96f655a77 /CN1 consumer-3
asset-action-request3 2 0 0 0 consumer-8-ccae17d1-a48b-4242-bb91-130ef0e54eac /CN2 consumer-8
asset-action-request3 3 0 0 0 consumer-8-ccae17d1-a48b-4242-bb91-130ef0e54eac /CN2 consumer-8
asset-action4 0 0 0 0 consumer-6-2803e5b5-ba05-43a2-b057-b02d49a33a67 /CN4 consumer-6
asset-action4 1 5 5 0 consumer-6-2803e5b5-ba05-43a2-b057-b02d49a33a67 /CN4 consumer-6
asset-action1 2 0 0 0 consumer-7-2d807ea1-ee8a-40b2-8a02-4f25ad5354af /CN1 consumer-7
asset-action-request2 2 0 0 0 consumer-7-ea0b61be-4187-4c93-abe5-0130726d6655 /CN4 consumer-7
asset-action-request1 3 0 0 0 consumer-8-f39ebd37-dde8-45df-b7f9-49a3821ba584 /CN1 consumer-8
asset-action0 0 14 14 0 consumer-3-8e90b176-af02-47f7-8e06-a03b37839fcf /CN4 consumer-3
asset-action1 1 4 4 0 consumer-2-13961583-6545-4e00-9ce8-8a6f039ee699 /CN2 consumer-2
asset-action-request0 2 14 14 0 consumer-10-b2016d59-6698-4bf5-9888-829f808b6856 /CN3 consumer-10
asset-action-request1 2 0 0 0 consumer-4-623975e3-8c8b-48ae-b49c-fe5b6d18eea5 /CN4 consumer-4
asset-action0 3 0 0 0 consumer-9-cd0f8733-811f-42b5-96c3-b96763f1ed0b /CN1 consumer-9
asset-action-request0 3 4 4 0 consumer-5-90ed68b6-4f51-4363-9f87-86a2c3d779a6 /CN2 consumer-5
asset-action-request4 2 0 0 0 consumer-7-92539217-2ecc-43c4-8b41-36a17cbe76fa /CN3 consumer-7
asset-action0 2 19 19 0 consumer-7-f6a53366-2c53-48b4-8d42-42367f794420 /CN2 consumer-7
asset-action0 1 4 6 2 consumer-5-2b29b017-56bb-4c84-8da0-1b8a9af98a3c /CN3 consumer-5
asset-action-request4 3 0 0 0 consumer-9-d61ca364-37dd-4299-ad9b-85c3dbbe0bd5 /CN4 consumer-9
asset-action1 0 0 0 0 consumer-1-6660179f-d5e6-4f38-b637-29415e255d6f /CN3 consumer-1
asset-action-request2 3 0 0 0 consumer-9-143cd443-c4c3-45c8-8c19-40974bbf4fa9 /CN3 consumer-9
asset-action1 3 0 0 0 consumer-8-b437a0f3-2afd-4f5e-90ed-9204127387f1 /CN4 consumer-8

Please help us to know why this behavior is happening .

I have seen a similar problem using Lagom. I have a consumer that stops withouts errors or warnings. I have seen it occur about once a week and could not reproduce.

asauray avatar Mar 25 '19 14:03 asauray

Currently I am not facing the issue. I have upgraded alpakka kafka client to "com.typesafe.akka" %% "akka-stream-kafka" % "1.0-M1" and kafka version to kafka_2.12-2.0.0.

I have increased poll-interval to 1s , max poll records to 50K to 80K and for tuning kafka you can refer https://community.hortonworks.com/articles/80813/kafka-best-practices-1.html.

Thank you, but have you managed to reproduce it consistently at some point ? It's really obscure for us. We see our consumers stopping about once a week and we have to restart them manually. We use the default Lagom configuration for Kafka. It would be great to have an explanation for this.

asauray avatar Mar 27 '19 09:03 asauray

I couldnt reproduce it consistently. I have opened this ticket to understand why it happened?

I'm having the same issue, it looks like there is a lag, but it actually consumes all messages, but it does not commit them. I did count all message in and out to Committer.flow and they all go throw, but Committer.flow does not commits it?

my code looks like:

Consumer
      .committableSource(consumerSettings, Subscriptions.topics(topics.toSet.asJava))
        .groupBy(topics.size, _.record.topic())
        .groupedWeightedWithin(config.batchWeight, config.batchWindowSec seconds)(_.record.value().length)
     .map( x => logger.(s"Got batch size {x.size}"); x )
      .map()  // do something 
     ...
    .mergeSubstreams
      .map(offsets => {
        logger.info(s"About to commit ${offsets.size} offset(s), sample: ${offsets.slice(0, 3)}")
        offsets
      })
      .mapConcat(_.toList)
      .via(Committer.flow(committerDefaults))
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(Sink.ignore)(Keep.both)
      .run()

so all messages go throw the stream until Commiter.flow, but even after an hour the lag does not go down.

Also it seems to be stopping right before the end. In in topic i had millons of messages, but ended up with a lag of 10 or something for some partitions.

I'm using val AkkaVersion = "2.6.5" val AkkaKafkaVersion = "2.0.3" val KafkaVersion = "2.5.0"

Cheers

silles79 avatar May 22 '20 16:05 silles79

Assuming the CommitterSettings are the defaults and your config.batchWindowSec is less than an hour then the only things I can think of is that a stage downstream of Committer.flow is backpressuring.

How are you measuring lag?

seglo avatar May 22 '20 17:05 seglo

yes, default settings and config.batchWindowSec is 30 seconds.

I'm using kafka-consumer-groups --bootstrap-server localhost:9092 --describe --offsets --group GROUPNAME to check lags

my kafka cluster has 6 brokers, and it's idle, only runnings this app, nothing else, why would the Commiter.flow backpressure ? also why for hours?

silles79 avatar May 22 '20 17:05 silles79

my kafka cluster has 6 brokers, and it's idle, only runnings this app, nothing else, why would the Commiter.flow backpressure ? also why for hours?

I mean a stage or sink downstream of the Committer.flow that's not included in your snippet. It's possible for Committer.flow to backpressure too, but that's probably not the case here.

seglo avatar May 22 '20 18:05 seglo

my kafka cluster has 6 brokers, and it's idle, only runnings this app, nothing else, why would the Commiter.flow backpressure ? also why for hours?

I mean a stage or sink downstream of the Committer.flow that's not included in your snippet. It's possible for Committer.flow to backpressure too, but that's probably not the case here.

updated my snippet, but there is nothing after the Commiter.flow.

i can reproduce this 100 times, tried to bump consumer groups and reprocess, always end up with lags, but all data is processed. Also tried other other batch sizes, windows, just process 1 topic with 1 consumer. nothing works. maybe something with new cluster?

the weird thing this code worked before. only the cluster was rebuild and upgraded .

silles79 avatar May 22 '20 19:05 silles79

figured it out, it was my fault, one of the batch processing functions shuffled the messages, so all get processed but offset were not commited in order. :facepalm:

For example msgs with offset 1,4,5,7,8 might have been comited as 4,1,8,7,5. So at the end all processed but the lag was still not 0 because it was sitting at offset 5.

silles79 avatar Jun 11 '20 07:06 silles79

Actually i have a second problem, now i don't mess up the order, but it looks like somehow 2 consumers get the same topic+partitions assigned:

1st consumer:

hdfs-sink-85bb75c6f8-kjgkk entellect-hdfs-sink 09:01:42.826 [hdfs-sink--131415277-akka.actor.default-dispatcher-5]
INFO  HdfsSink - About to commit 1468 offset(s), sample: List(CommittableOffsetImpl(PartitionOffset(key=GroupTopicPartition(hdfs-sink-0,proxy-chembl-domain,0),offset=0),), CommittableOffsetImpl(PartitionOffset(key=GroupTopicPartition(hdfs-sink-0,-proxy-chembl-domain,0),offset=1),), CommittableOffsetImpl(PartitionOffset(key=GroupTopicPartition(hdfs-sink-0,-proxy-chembl-domain,0),offset=2),))

2nd consumer

hdfs-sink-85bb75c6f8-8cdsp entellect-hdfs-sink 09:04:35.698 [hdfs-sink--799353276-akka.actor.default-dispatcher-4]
INFO  HdfsSink - About to commit 182 offset(s), sample: List(CommittableOffsetImpl(PartitionOffset(key=GroupTopicPartition(hdfs-sink-0,proxy-chembl-domain,0),offset=2367),), CommittableOffsetImpl(PartitionOffset(key=GroupTopicPartition(hdfs-sink-0,-proxy-chembl-domain,0),offset=2368),), CommittableOffsetImpl(PartitionOffset(key=GroupTopicPartition(hdfs-sink-0,proxy-chembl-domain,0),offset=2370),))

and looks like the first one is slower and commits last:

GROUP           TOPIC       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                 HOST            CLIENT-ID
hdfs-sink-0     proxy-chembl-domain                       0          1717            2713           996             consumer-hdfs-sink-0-1-9fb0bda1-637b-4cc8-85d4-5c9343993586 /x.x.xx.x  consumer-hdfs-sink-0-1

so all data is consumed (twice) and i still end up with an end offset 1717, where u can clearly see that consume 2r committed 2370 at least

silles79 avatar Jun 16 '20 07:06 silles79

If I had to guess I would assume the 1st consumer was assigned topic partition proxy-chembl-domain-0 and then it was assigned to 2nd consumer.

There are situations where two consumers can commit the same offset, such as when a message is already consumed and in-flight within the stream, then a rebalance occurs and the in-flight message is committed, but the 2nd consumer starts from whatever the last committed offset was so it consumes duplicates. There's a recent PR that's addressing this now #821

However, your system should be able to tolerate duplicates, because they're possible with the all non-transactional sources and sinks.

seglo avatar Jun 16 '20 19:06 seglo