alpakka-kafka icon indicating copy to clipboard operation
alpakka-kafka copied to clipboard

KafkaConsumerActor only pauses partition, doesn't resume, for a long time

Open evis opened this issue 8 years ago • 6 comments

KafkaConsumerActor#poll() method should resume underlying Kafka consumer, if there are partitions ot fetch: https://github.com/akka/reactive-kafka/blob/master/core/src/main/scala/akka/kafka/KafkaConsumerActor.scala#L255 . Though, I faced situation, when it is unable to resume it for a long time (despite the fact, that partitions were alive). I enabled debug logging, here is part of log:

[2017-10-10 18:05:23,882] DEBUG [my-app-akka.kafka.default-dispatcher-43] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:23,992] DEBUG [my-app-akka.kafka.default-dispatcher-45] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:24,102] DEBUG [my-app-akka.kafka.default-dispatcher-46] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:24,213] DEBUG [my-app-akka.kafka.default-dispatcher-49] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:24,333] DEBUG [my-app-akka.kafka.default-dispatcher-50] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:24,453] DEBUG [my-app-akka.kafka.default-dispatcher-51] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:24,573] DEBUG [my-app-akka.kafka.default-dispatcher-41] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:24,692] DEBUG [my-app-akka.kafka.default-dispatcher-42] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:24,802] DEBUG [my-app-akka.kafka.default-dispatcher-44] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:24,912] DEBUG [my-app-akka.kafka.default-dispatcher-48] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,022] DEBUG [my-app-akka.kafka.default-dispatcher-47] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,132] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,242] DEBUG [my-app-akka.kafka.default-dispatcher-37] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,353] DEBUG [my-app-akka.kafka.default-dispatcher-38] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,473] DEBUG [my-app-akka.kafka.default-dispatcher-40] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,592] DEBUG [my-app-akka.kafka.default-dispatcher-39] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,702] DEBUG [my-app-akka.kafka.default-dispatcher-43] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,812] DEBUG [my-app-akka.kafka.default-dispatcher-45] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,922] DEBUG [my-app-akka.kafka.default-dispatcher-46] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:26,033] DEBUG [my-app-akka.kafka.default-dispatcher-49] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:26,153] DEBUG [my-app-akka.kafka.default-dispatcher-50] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:26,273] DEBUG [my-app-akka.kafka.default-dispatcher-51] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:26,382] DEBUG [my-app-akka.kafka.default-dispatcher-41] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:26,492] DEBUG [my-app-akka.kafka.default-dispatcher-42] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:26,603] DEBUG [my-app-akka.kafka.default-dispatcher-44] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:26,723] DEBUG [my-app-akka.kafka.default-dispatcher-48] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:26,843] DEBUG [my-app-akka.kafka.default-dispatcher-47] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0

Consumers haven't been resumed for a couple of hours, until I restarted application. My application is cluster of four nodes, and there is one consumer at each node. All consumers belong to one consumer group.

When app at one node began to restart, only then consumers at other nodes began to receive messages:

[2017-10-10 18:05:45,112] DEBUG [my-app-akka.kafka.default-dispatcher-47] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:45,223] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:45,262] DEBUG [kafka-coordinator-heartbeat-thread | my-app] o.a.k.c.c.i.AbstractCoordinator - Sending Heartbeat request for group my-consumer-group to coordinator my-kafka-host-1:9092 (id: 2147483646 rack: null) [2017-10-10 18:05:45,263] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.AbstractCoordinator - Attempt to heartbeat failed for group my-consumer-group since it is rebalancing. [2017-10-10 18:05:45,263] INFO [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [my-topic-0] for group my-consumer-group [2017-10-10 18:05:45,263] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:45,263] INFO [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.ConsumerCoordinator - Revoking previously assigned partitions [my-topic-0] for group my-consumer-group [2017-10-10 18:05:45,263] INFO [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group my-consumer-group [2017-10-10 18:05:45,263] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.AbstractCoordinator - Sending JoinGroup ((type: JoinGroupRequest, groupId=my-consumer-group, sessionTimeout=10000, rebalanceTimeout=300000, memberId=consumer-1-4ccc4a57-7b00-498d-8a75-070257633afc, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@39ba9197)) to coordinator my-kafka-host-1:9092 (id: 2147483646 rack: null) [2017-10-10 18:05:47,397] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.AbstractCoordinator - Received successful JoinGroup response for group my-consumer-group: {error_code=0,generation_id=5510,group_protocol=range,leader_id=consumer-1-7121559e-a3c4-49dc-be09-cc3cb38e6544,member_id=consumer-1-4ccc4a57-7b00-498d-8a75-070257633afc,members=[]} [2017-10-10 18:05:47,397] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.AbstractCoordinator - Sending follower SyncGroup for group my-consumer-group to coordinator my-kafka-host-1:9092 (id: 2147483646 rack: null): (type=SyncGroupRequest, groupId=my-consumer-group, generationId=5510, memberId=consumer-1-4ccc4a57-7b00-498d-8a75-070257633afc, groupAssignment=) [2017-10-10 18:05:47,415] INFO [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.AbstractCoordinator - Successfully joined group my-consumer-group with generation 5510 [2017-10-10 18:05:47,415] INFO [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [my-topic-0] for group my-consumer-group [2017-10-10 18:05:47,415] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:47,415] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.ConsumerCoordinator - Group my-consumer-group fetching committed offsets for partitions: [my-topic-0] [2017-10-10 18:05:47,416] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition my-topic-0 to the committed offset 39435637 [2017-10-10 18:05:47,416] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.clients.consumer.KafkaConsumer - Resuming partition my-topic-0 [2017-10-10 18:05:47,416] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions [my-topic-0] to broker my-kafka-host-2:9092 (id: 3 rack: null) [2017-10-10 18:05:47,466] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:47,516] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.clients.consumer.KafkaConsumer - Resuming partition my-topic-0 [2017-10-10 18:05:47,566] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.clients.consumer.KafkaConsumer - Resuming partition my-topic-0 [2017-10-10 18:05:47,682] DEBUG [my-app-akka.kafka.default-dispatcher-50] o.a.k.clients.consumer.KafkaConsumer - Resuming partition my-topic-0 [2017-10-10 18:05:47,854] DEBUG [my-app-akka.kafka.default-dispatcher-50] o.a.kafka.common.metrics.Metrics - Added sensor with name my-topic-0.records-lag

I wonder, what could be a reason of this KafkaConsumerActor's behaviour?

p.s. Sadly, I turned on Kafka debug logging only after problem has occurred :(

evis avatar Oct 10 '17 15:10 evis

This looks like #302 to me. I'm guessing that there was a wake up exception at some point during a previous rebalance before the consumer got into this state of pausing forever. The rebalance changed up the partitions and there wasn't another wake up exception so everything was fine again.

jpeel avatar Oct 11 '17 15:10 jpeel

We have similar bug. With trace log we see this messages during hang:

[2017-10-16 06:32:46,905] DEBUG [kafka-coordinator-heartbeat-thread | nmarket-notifier-client] o.a.k.c.c.i.AbstractCoordinator - Sending Heartbeat request for group nmarket-notifier-client to coordinator kafka-01-myt.test.vertis.yandex.net:9092 (id: 2147483646 rack: null) [2017-10-16 06:32:46,905] TRACE [kafka-coordinator-heartbeat-thread | nmarket-notifier-client] o.apache.kafka.clients.NetworkClient - Sending {group_id=nmarket-notifier-client,group_generation_id=165,member_id=nmarket-notifier-client-c59e229a-27d0-4c1f-990a-45e5e7a5aca8} to node 2147483646. [2017-10-16 06:32:46,906] TRACE [notifier-akka.kafka.default-dispatcher-33] o.apache.kafka.clients.NetworkClient - Completed receive from node 2147483646, for key 12, received {error_code=0} [2017-10-16 06:32:46,906] DEBUG [notifier-akka.kafka.default-dispatcher-33] o.a.k.c.c.i.AbstractCoordinator - Received successful Heartbeat response for group nmarket-notifier-client [2017-10-16 06:32:46,984] DEBUG [notifier-akka.kafka.default-dispatcher-22] o.a.k.clients.consumer.KafkaConsumer - Pausing partition telepony-mts-event-log-0 [2017-10-16 06:32:47,095] DEBUG [notifier-akka.kafka.default-dispatcher-21] o.a.k.clients.consumer.KafkaConsumer - Pausing partition telepony-mts-event-log-0 ...

and so on. And when this hang itself fixed logs changed:

[2017-10-16 12:32:24,815] DEBUG [notifier-akka.kafka.default-dispatcher-26] o.a.k.clients.consumer.KafkaConsumer - Pausing partition telepony-mts-event-log-0 [2017-10-16 12:32:24,855] DEBUG [kafka-coordinator-heartbeat-thread | nmarket-notifier-client] o.a.k.c.c.i.AbstractCoordinator - Sending Heartbeat request for group nmarket-notifier-client to coordinator kafka-01-myt.test.vertis.yandex.net:9092 (id: 2147483646 rack: null) [2017-10-16 12:32:24,855] TRACE [kafka-coordinator-heartbeat-thread | nmarket-notifier-client] o.apache.kafka.clients.NetworkClient - Sending {group_id=nmarket-notifier-client,group_generation_id=165,member_id=nmarket-notifier-client-c59e229a-27d0-4c1f-990a-45e5e7a5aca8} to node 2147483646. [2017-10-16 12:32:24,856] TRACE [notifier-akka.kafka.default-dispatcher-26] o.apache.kafka.clients.NetworkClient - Completed receive from node 2147483646, for key 12, received {error_code=27} [2017-10-16 12:32:24,856] DEBUG [notifier-akka.kafka.default-dispatcher-26] o.a.k.c.c.i.AbstractCoordinator - Attempt to heartbeat failed for group nmarket-notifier-client since it is rebalancing. [2017-10-16 12:32:24,857] DEBUG [notifier-akka.kafka.default-dispatcher-26] o.a.kafka.common.metrics.Metrics - Removed sensor with name telepony-mts-event-log-0.records-lag [2017-10-16 12:32:24,858] INFO [notifier-akka.kafka.default-dispatcher-26] o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [] for group nmarket-notifier-client

ai81 avatar Oct 16 '17 10:10 ai81

The same for me, my app always create the followingDEBUG log when I start my project. Although the whole Kafka processes are working well.
But I do not why these log there.. It is a bug?

00:39:13.989 [obp-api-akka.kafka.default-dispatcher-15] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.060 [obp-api-akka.kafka.default-dispatcher-17] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.130 [obp-api-akka.kafka.default-dispatcher-19] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.199 [obp-api-akka.kafka.default-dispatcher-20] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.270 [obp-api-akka.kafka.default-dispatcher-5] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.340 [obp-api-akka.kafka.default-dispatcher-9] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.411 [obp-api-akka.kafka.default-dispatcher-11] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.481 [obp-api-akka.kafka.default-dispatcher-12] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.549 [obp-api-akka.kafka.default-dispatcher-14] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.620 [obp-api-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.691 [obp-api-akka.kafka.default-dispatcher-18] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.759 [obp-api-akka.kafka.default-dispatcher-6] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.831 [obp-api-akka.kafka.default-dispatcher-7] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.901 [obp-api-akka.kafka.default-dispatcher-8] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0

hongwei1 avatar Jun 05 '18 06:06 hongwei1

@evis Any updates ? I've got into the same situation with one of my consumers. I'll try to investigate.

dmi3zkm avatar Dec 12 '18 09:12 dmi3zkm

The Kafka Consumer actor pausing partitions indicates it has no demand for those. We suspect something could have been lost when actor state was updated during WakeupException handling. The release from yesterday 1.0-RC1 does use the new Kafka API without WakeupException.

ennru avatar Dec 12 '18 09:12 ennru

I am facing same issue any update ? version: "2.0.7"

khouloudsa avatar Nov 08 '21 17:11 khouloudsa