kafkajs icon indicating copy to clipboard operation
kafkajs copied to clipboard

Consumer offset "stuck" on certain partitions

Open Shamshiel opened this issue 3 years ago • 11 comments

Observed behavior I consume messages from a topic and this topic has 24 partitions. I started to consume from beginning and at first everything was fine but after some time the consumer stopped consuming messages from certain partitions.

The issue itself is very similar to this issue (562) but I'm using the current version of KafkaJS (v1.15.0) so I'm at a loss what the problem could be. As far as I'm aware the topic also uses log compaction.

I wrote a simple partition assigner that I programmed to only consume from partitions that were "stuck". After that I added some console.log messages into the KafkaJS code (consumerGroup.js) to debug the problem further. I came to the point that I always got zero messages in the response from broker.fetch.

This was the response:

[
  {
    topicName: "MyTopic",
    partitions: [
      {
        partition: 1,
        errorCode: 0,
        highWatermark: "532672",
        lastStableOffset: "532672",
        lastStartOffset: "0",
        abortedTransactions: [],
        preferredReadReplica: -1,
        messages: [],
      },
    ],
  }
]

The offset that was used to fetch the next messages was like this: { MyTopic: { '1': '484158' } }

There are clearly still messages to consume but it always fetches zero because always the offset 484158 is used. I changed the offset manually via the admin interface to a higher and valid offset and after that the consumer worked again.

Expected behavior I would expect to receive all messages until the latest offset.

Environment:

  • OS: Mac OS 10.15.7
  • KafkaJS version 1.15.0
  • Kafka version 2.6.1
  • NodeJS version 12.18.3

Additional context If further logs are needed I can provide them. I couldn't see any useful debug messages for this problem....

Shamshiel avatar Apr 09 '21 13:04 Shamshiel

Can you check the size of the message at offset 484158? What is the format of the messages, JSON, Avro, etc.? I've seen this issue many times when you configure maxBytes at 1MB and have a message with 2MB. It could be something else but just checking a regular case.

tulios avatar Apr 09 '21 13:04 tulios

Thank you for the very quick answer. The messages are AVRO encoded. Currently I'm not sure how big the message at this offset is but usually they should not be bigger than 20kb.

I already tried the following settings for my consumer: const consumer = kafka.consumer({ partitionAssigners: [SpecificPartitionAssigner], groupId: 'MyGroupId', maxBytes: 2147483647, maxBytesPerPartition: 214748364 }); (Forgot to mention it above because I already tried so many different things)

Shamshiel avatar Apr 09 '21 14:04 Shamshiel

I think there is still maybe a problem with log compaction and offsets?

Here are the offsets of partition 1 in order as they appear: ... offset = 484134, offset = 484135, offset = 484136, offset = 484143, offset = 484156, offset = 484157, <-- this was the last offset that KafkaJS received offset = 484207 <- this is the next available offset ...

I played around with the offsets a little bit and the next offset that would work is "484197". I'm not sure if this is helpful or not. But there isn't really a message at this offset. The next message (like mentioned above) that KafkaJS received is the offset 484207.

Shamshiel avatar Apr 09 '21 14:04 Shamshiel

I tested two different frameworks (C# and Java) and both were able to consume past this offset (484158) with the same configuration.

As I mentioned above KafkaJS receives zero messages from Kafka when it tries to fetch messages with this offset (484158). I'm not sure but maybe the other frameworks also receive zero messages but if they do they then check the highest offset of this partition and if it is higher than the current fetch-offset they just skip this offset that receives zero messages? I'm quite new to Kafka so I'm not sure but if this proposal of Kafka was implemented "Fetch" should always at least return one message. So maybe it is handled as an error in the other frameworks if fetch doesn't return a message if the offset of the partition is higher.

Shamshiel avatar Apr 12 '21 07:04 Shamshiel

This feels incredibly familiar, like we worked on this bug before, but maybe I'm just having dejavu.

Related issues from other clients:

  • https://github.com/zendesk/ruby-kafka/issues/555
  • https://github.com/dpkp/kafka-python/issues/1701

EDIT: I knew I had worked on this! #577 Guess there's some other case that's not covered by that.

Nevon avatar Apr 12 '21 07:04 Nevon

@Nevon Do you need any further details to find the cause of this bug?

Shamshiel avatar Apr 12 '21 08:04 Shamshiel

I'm not sure I will have the time to look into this myself, but a way to reproduce the issue would be great. The relevant topic configuration would be a start, but ideally a script that creates a topic and produces messages in whatever way is needed to trigger the bug would be 💯

Nevon avatar Apr 12 '21 09:04 Nevon

I have a similar problem.

Last offset successfully consumed = 221235053, next offset = 221306740. diference of offsets between 2 messages is ~70k

But the consumer is stuck and does not consume further and constantly tries to fetch 221235053 offset and gets no messages.

I have to define ridiculously high maxBytes so that the consumer could grab the next offset. But this should not be a solution, because it's not optimal to fetch a high number of messages all at once.

I think there should be a check if this batch is empty, but not the last by using offsetApi or by checking if fetchApi returned OFFSET_OUT_OF_RANGE

alfamegaxq avatar Apr 13 '21 19:04 alfamegaxq

Hi,

We have what seems to be a similar issue.

One of our partition is stuck at the same offset for our three consumer groups. It also is a compacted topic and using eachMessage instead of eachBatch does not help.

How could we help resolving the issue ? Do you know any workaround other than moving the offset manually ?

Thanks !

ThomasFerro avatar Jun 10 '21 15:06 ThomasFerro

Hi,

We too have had the same issue twice this week. Each time one or two partitions from a 3-partitions topic were stuck at the same offset for all our consumer groups (This topic is compacted too). Do you know if someone made progress on this issue? Is there a way we can help solve it?

Thanks in advance

anaistournoisAdeo avatar Feb 04 '22 09:02 anaistournoisAdeo

Is there a way we can help solve it?

Like I mentioned a year ago, a way to consistently reproduce the issue is the best way to resolve it. Ideally a fork with a failing test, but even just a script that creates a topic and produces to it with whatever parameters are required to trigger the bug, and then a consumer that gets stuck, would be helpful.

Nevon avatar Feb 08 '22 14:02 Nevon

Is there any consensus that this IS related to compaction? We're seeing something similar, using 2.1.0. But this is not in a compacted topic, it does however also have 3 partitions.

dhdnicodemus avatar Oct 15 '22 13:10 dhdnicodemus

can you try reducing the max.poll.records or increasing the max.poll.interval.ms.

dhruvrathore93 avatar Nov 07 '22 13:11 dhruvrathore93

I will look into it, thanks.

dhdnicodemus avatar Nov 07 '22 13:11 dhdnicodemus

A quick question @dhruvrathore93 if either of these were the problem, wouldn't we be seeing a rebalance of the consumer group by the broker?

dhdnicodemus avatar Nov 07 '22 17:11 dhdnicodemus

One more follow up, does kafakJs support setting max.poll.orecord.size ?

dhdnicodemus avatar Nov 07 '22 18:11 dhdnicodemus

Any updates on this?

NorDroN avatar Jun 05 '23 13:06 NorDroN