alpakka-kafka icon indicating copy to clipboard operation
alpakka-kafka copied to clipboard

Consumer Source reads any message after controller dead and stops consuming

Open rolandjohann opened this issue 6 years ago • 6 comments

We are experiencing strange consumer behavior causing the consumer to read any message, or no messages anymore.

Had to copy-paste parts of the log because of ELK-only output...

end at June 22nd 2018, 12:49:25.858 => no further messages will be consumed, even if new are produced to the topic

received message with offset 29
[Consumer clientId=consumer-1, groupId=smrwr-e8bb0778] Setting newly assigned partitions [some-topic-0]
[Consumer clientId=consumer-1, groupId=smrwr-e8bb0778] Successfully joined group with generation 3
[Consumer clientId=consumer-1, groupId=smrwr-e8bb0778] (Re-)joining group
[Consumer clientId=consumer-1, groupId=smrwr-e8bb0778] Revoking previously assigned partitions [some-topic-0]
[Consumer clientId=consumer-1, groupId=smrwr-e8bb0778] Discovered group coordinator kafka-1:9093 (id: 2147483646 rack: null)
[Consumer clientId=consumer-1, groupId=smrwr-e8bb0778] Marking the coordinator kafka-1:9093 (id: 2147483646 rack: null) dead
received message with offset 42

start at June 22nd 2018, 12:17:15.857

The topic is single partitioned and replicated.

rolandjohann avatar Jun 22 '18 16:06 rolandjohann

What version of the Alpakka Kafka connector do you use?

ennru avatar Jun 26 '18 09:06 ennru

latest version, 0.21.1

rolandjohann avatar Jun 26 '18 09:06 rolandjohann

I'm not sure I understand what exactly you are experiencing. What do you mean with "read any message, or no messages anymore"?

Can you give release 0.22 a spin, please?

ennru avatar Jun 27 '18 13:06 ennru

Consumer reads messages until offset 42. Then coordinator will be marked as dead, after a few seconds consumer discovers coordinator and consumes a single message with offset 29. after that no further messages will be consumed until the app will be restarted. I‘m not even sure if it‘s alpakka or kafka consumer related.

I‘ll try 0.22 and report the results. Enno [email protected] schrieb am Mi. 27. Juni 2018 um 15:40:

I'm not sure I understand what exactly you are experiencing. What do you mean with "read any message, or no messages anymore"?

Can you give release 0.22 a spin, please?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/akka/alpakka-kafka/issues/519#issuecomment-400675431, or mute the thread https://github.com/notifications/unsubscribe-auth/AFddGr_zfRxG2VC4QKauWGfjC35psBQPks5uA4tbgaJpZM4U0GIS .

rolandjohann avatar Jun 27 '18 13:06 rolandjohann

Was there any progress made on this issue? I run into something similar on 1.0-RC1

I'm getting the same symptoms and messages after a long GC pause, like so:

First: Long GC pause (>10 seconds)*

Then messages in log

[Consumer clientId=consumer-1, groupId=smrwr-e8bb0778] Setting newly assigned partitions [some-topic-0]
[Consumer clientId=consumer-1, groupId=smrwr-e8bb0778] Successfully joined group with generation 3
[Consumer clientId=consumer-1, groupId=smrwr-e8bb0778] (Re-)joining group
[Consumer clientId=consumer-1, groupId=smrwr-e8bb0778] Revoking previously assigned partitions [some-topic-0]
[Consumer clientId=consumer-1, groupId=smrwr-e8bb0778] Discovered group coordinator kafka-1:9093 (id: 2147483646 rack: null)
[Consumer clientId=consumer-1, groupId=smrwr-e8bb0778] Marking the coordinator kafka-1:9093 (id: 2147483646 rack: null) dead

After this the consumer resets offset to earliest/latest, consumption stops, but the stream is not killed.

Interestingly, if i put an akka-http async() somewhere in my stream, consumption continues.

craffit avatar Feb 13 '19 14:02 craffit

The log messages both of you sent are from the Apache Kafka client library.

I'm not sure, but I believe the client doesn't signal coordinator dead upwards so there is no way to react on it. But the long GC would explain why that happens in the first place.

Do you have timestamps for these events? It could probably be avoided with different timeouts on the Kafka client or broker.

What do you mean with put and akka-http async in the stream?

ennru avatar Feb 15 '19 10:02 ennru