alpakka-kafka
alpakka-kafka copied to clipboard
KafkaConsumerActor only pauses partition, doesn't resume, for a long time
KafkaConsumerActor#poll() method should resume underlying Kafka consumer, if there are partitions ot fetch: https://github.com/akka/reactive-kafka/blob/master/core/src/main/scala/akka/kafka/KafkaConsumerActor.scala#L255 . Though, I faced situation, when it is unable to resume it for a long time (despite the fact, that partitions were alive). I enabled debug logging, here is part of log:
[2017-10-10 18:05:23,882] DEBUG [my-app-akka.kafka.default-dispatcher-43] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:23,992] DEBUG [my-app-akka.kafka.default-dispatcher-45] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:24,102] DEBUG [my-app-akka.kafka.default-dispatcher-46] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:24,213] DEBUG [my-app-akka.kafka.default-dispatcher-49] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:24,333] DEBUG [my-app-akka.kafka.default-dispatcher-50] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:24,453] DEBUG [my-app-akka.kafka.default-dispatcher-51] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:24,573] DEBUG [my-app-akka.kafka.default-dispatcher-41] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:24,692] DEBUG [my-app-akka.kafka.default-dispatcher-42] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:24,802] DEBUG [my-app-akka.kafka.default-dispatcher-44] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:24,912] DEBUG [my-app-akka.kafka.default-dispatcher-48] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,022] DEBUG [my-app-akka.kafka.default-dispatcher-47] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,132] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,242] DEBUG [my-app-akka.kafka.default-dispatcher-37] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,353] DEBUG [my-app-akka.kafka.default-dispatcher-38] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,473] DEBUG [my-app-akka.kafka.default-dispatcher-40] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,592] DEBUG [my-app-akka.kafka.default-dispatcher-39] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,702] DEBUG [my-app-akka.kafka.default-dispatcher-43] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,812] DEBUG [my-app-akka.kafka.default-dispatcher-45] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:25,922] DEBUG [my-app-akka.kafka.default-dispatcher-46] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:26,033] DEBUG [my-app-akka.kafka.default-dispatcher-49] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:26,153] DEBUG [my-app-akka.kafka.default-dispatcher-50] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:26,273] DEBUG [my-app-akka.kafka.default-dispatcher-51] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:26,382] DEBUG [my-app-akka.kafka.default-dispatcher-41] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:26,492] DEBUG [my-app-akka.kafka.default-dispatcher-42] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:26,603] DEBUG [my-app-akka.kafka.default-dispatcher-44] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:26,723] DEBUG [my-app-akka.kafka.default-dispatcher-48] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:26,843] DEBUG [my-app-akka.kafka.default-dispatcher-47] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0
Consumers haven't been resumed for a couple of hours, until I restarted application. My application is cluster of four nodes, and there is one consumer at each node. All consumers belong to one consumer group.
When app at one node began to restart, only then consumers at other nodes began to receive messages:
[2017-10-10 18:05:45,112] DEBUG [my-app-akka.kafka.default-dispatcher-47] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:45,223] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:45,262] DEBUG [kafka-coordinator-heartbeat-thread | my-app] o.a.k.c.c.i.AbstractCoordinator - Sending Heartbeat request for group my-consumer-group to coordinator my-kafka-host-1:9092 (id: 2147483646 rack: null) [2017-10-10 18:05:45,263] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.AbstractCoordinator - Attempt to heartbeat failed for group my-consumer-group since it is rebalancing. [2017-10-10 18:05:45,263] INFO [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [my-topic-0] for group my-consumer-group [2017-10-10 18:05:45,263] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:45,263] INFO [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.ConsumerCoordinator - Revoking previously assigned partitions [my-topic-0] for group my-consumer-group [2017-10-10 18:05:45,263] INFO [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group my-consumer-group [2017-10-10 18:05:45,263] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.AbstractCoordinator - Sending JoinGroup ((type: JoinGroupRequest, groupId=my-consumer-group, sessionTimeout=10000, rebalanceTimeout=300000, memberId=consumer-1-4ccc4a57-7b00-498d-8a75-070257633afc, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@39ba9197)) to coordinator my-kafka-host-1:9092 (id: 2147483646 rack: null) [2017-10-10 18:05:47,397] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.AbstractCoordinator - Received successful JoinGroup response for group my-consumer-group: {error_code=0,generation_id=5510,group_protocol=range,leader_id=consumer-1-7121559e-a3c4-49dc-be09-cc3cb38e6544,member_id=consumer-1-4ccc4a57-7b00-498d-8a75-070257633afc,members=[]} [2017-10-10 18:05:47,397] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.AbstractCoordinator - Sending follower SyncGroup for group my-consumer-group to coordinator my-kafka-host-1:9092 (id: 2147483646 rack: null): (type=SyncGroupRequest, groupId=my-consumer-group, generationId=5510, memberId=consumer-1-4ccc4a57-7b00-498d-8a75-070257633afc, groupAssignment=) [2017-10-10 18:05:47,415] INFO [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.AbstractCoordinator - Successfully joined group my-consumer-group with generation 5510 [2017-10-10 18:05:47,415] INFO [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [my-topic-0] for group my-consumer-group [2017-10-10 18:05:47,415] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:47,415] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.c.i.ConsumerCoordinator - Group my-consumer-group fetching committed offsets for partitions: [my-topic-0] [2017-10-10 18:05:47,416] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition my-topic-0 to the committed offset 39435637 [2017-10-10 18:05:47,416] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.clients.consumer.KafkaConsumer - Resuming partition my-topic-0 [2017-10-10 18:05:47,416] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions [my-topic-0] to broker my-kafka-host-2:9092 (id: 3 rack: null) [2017-10-10 18:05:47,466] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.clients.consumer.KafkaConsumer - Pausing partition my-topic-0 [2017-10-10 18:05:47,516] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.clients.consumer.KafkaConsumer - Resuming partition my-topic-0 [2017-10-10 18:05:47,566] DEBUG [my-app-akka.kafka.default-dispatcher-36] o.a.k.clients.consumer.KafkaConsumer - Resuming partition my-topic-0 [2017-10-10 18:05:47,682] DEBUG [my-app-akka.kafka.default-dispatcher-50] o.a.k.clients.consumer.KafkaConsumer - Resuming partition my-topic-0 [2017-10-10 18:05:47,854] DEBUG [my-app-akka.kafka.default-dispatcher-50] o.a.kafka.common.metrics.Metrics - Added sensor with name my-topic-0.records-lag
I wonder, what could be a reason of this KafkaConsumerActor's behaviour?
p.s. Sadly, I turned on Kafka debug logging only after problem has occurred :(
This looks like #302 to me. I'm guessing that there was a wake up exception at some point during a previous rebalance before the consumer got into this state of pausing forever. The rebalance changed up the partitions and there wasn't another wake up exception so everything was fine again.
We have similar bug. With trace log we see this messages during hang:
[2017-10-16 06:32:46,905] DEBUG [kafka-coordinator-heartbeat-thread | nmarket-notifier-client] o.a.k.c.c.i.AbstractCoordinator - Sending Heartbeat request for group nmarket-notifier-client to coordinator kafka-01-myt.test.vertis.yandex.net:9092 (id: 2147483646 rack: null) [2017-10-16 06:32:46,905] TRACE [kafka-coordinator-heartbeat-thread | nmarket-notifier-client] o.apache.kafka.clients.NetworkClient - Sending {group_id=nmarket-notifier-client,group_generation_id=165,member_id=nmarket-notifier-client-c59e229a-27d0-4c1f-990a-45e5e7a5aca8} to node 2147483646. [2017-10-16 06:32:46,906] TRACE [notifier-akka.kafka.default-dispatcher-33] o.apache.kafka.clients.NetworkClient - Completed receive from node 2147483646, for key 12, received {error_code=0} [2017-10-16 06:32:46,906] DEBUG [notifier-akka.kafka.default-dispatcher-33] o.a.k.c.c.i.AbstractCoordinator - Received successful Heartbeat response for group nmarket-notifier-client [2017-10-16 06:32:46,984] DEBUG [notifier-akka.kafka.default-dispatcher-22] o.a.k.clients.consumer.KafkaConsumer - Pausing partition telepony-mts-event-log-0 [2017-10-16 06:32:47,095] DEBUG [notifier-akka.kafka.default-dispatcher-21] o.a.k.clients.consumer.KafkaConsumer - Pausing partition telepony-mts-event-log-0 ...
and so on. And when this hang itself fixed logs changed:
[2017-10-16 12:32:24,815] DEBUG [notifier-akka.kafka.default-dispatcher-26] o.a.k.clients.consumer.KafkaConsumer - Pausing partition telepony-mts-event-log-0 [2017-10-16 12:32:24,855] DEBUG [kafka-coordinator-heartbeat-thread | nmarket-notifier-client] o.a.k.c.c.i.AbstractCoordinator - Sending Heartbeat request for group nmarket-notifier-client to coordinator kafka-01-myt.test.vertis.yandex.net:9092 (id: 2147483646 rack: null) [2017-10-16 12:32:24,855] TRACE [kafka-coordinator-heartbeat-thread | nmarket-notifier-client] o.apache.kafka.clients.NetworkClient - Sending {group_id=nmarket-notifier-client,group_generation_id=165,member_id=nmarket-notifier-client-c59e229a-27d0-4c1f-990a-45e5e7a5aca8} to node 2147483646. [2017-10-16 12:32:24,856] TRACE [notifier-akka.kafka.default-dispatcher-26] o.apache.kafka.clients.NetworkClient - Completed receive from node 2147483646, for key 12, received {error_code=27} [2017-10-16 12:32:24,856] DEBUG [notifier-akka.kafka.default-dispatcher-26] o.a.k.c.c.i.AbstractCoordinator - Attempt to heartbeat failed for group nmarket-notifier-client since it is rebalancing. [2017-10-16 12:32:24,857] DEBUG [notifier-akka.kafka.default-dispatcher-26] o.a.kafka.common.metrics.Metrics - Removed sensor with name telepony-mts-event-log-0.records-lag [2017-10-16 12:32:24,858] INFO [notifier-akka.kafka.default-dispatcher-26] o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [] for group nmarket-notifier-client
The same for me, my app always create the followingDEBUG log when I start my project.
Although the whole Kafka processes are working well.
But I do not why these log there..
It is a bug?
00:39:13.989 [obp-api-akka.kafka.default-dispatcher-15] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.060 [obp-api-akka.kafka.default-dispatcher-17] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.130 [obp-api-akka.kafka.default-dispatcher-19] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.199 [obp-api-akka.kafka.default-dispatcher-20] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.270 [obp-api-akka.kafka.default-dispatcher-5] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.340 [obp-api-akka.kafka.default-dispatcher-9] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.411 [obp-api-akka.kafka.default-dispatcher-11] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.481 [obp-api-akka.kafka.default-dispatcher-12] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.549 [obp-api-akka.kafka.default-dispatcher-14] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.620 [obp-api-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.691 [obp-api-akka.kafka.default-dispatcher-18] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.759 [obp-api-akka.kafka.default-dispatcher-6] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.831 [obp-api-akka.kafka.default-dispatcher-7] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0 00:39:14.901 [obp-api-akka.kafka.default-dispatcher-8] DEBUG o.a.k.c.c.KafkaConsumer - Pausing partition obp.JuneYellow2017.N.OutboundGetBank-0
@evis Any updates ? I've got into the same situation with one of my consumers. I'll try to investigate.
The Kafka Consumer actor pausing partitions indicates it has no demand for those.
We suspect something could have been lost when actor state was updated during WakeupException handling. The release from yesterday 1.0-RC1 does use the new Kafka API without WakeupException.
I am facing same issue any update ? version: "2.0.7"