confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

Unhandled rebalance exception using StickyCooperative partitioner

Open ZakChar opened this issue 3 years ago • 4 comments

Description

The following exception was thrown during a rebalance, and caused a consumer service to crash :

Unhandled exception. Confluent.Kafka.KafkaException: Local: Erroneous state at Confluent.Kafka.Impl.SafeKafkaHandle.AssignImpl(IEnumerable1 partitions, Func3 assignMethodErr, Func3 assignMethodError) at Confluent.Kafka.Impl.SafeKafkaHandle.Assign(IEnumerable1 partitions) at Confluent.Kafka.Consumer2.Unassign() at Confluent.Kafka.Consumer2.RebalanceCallback(IntPtr rk, ErrorCode err, IntPtr partitionsPtr, IntPtr opaque)

How to reproduce

One cluster of 3 brokers, and 5 services in one consumer group, each consuming 150 partitions. The error seemed to happen when partitions were assigned to a consumer after another consumer lost partitions because of a disconnection.

Side note: The clients consume messages in batches, pass them to workers and store message offsets when processing is done (offsets are not stored synchronously). This process seems to cause many commit bad generation errors which cause partition losses at times, leading to more rebalacings.

Checklist

  • Confluent.Kafka 1.8.2.
  • Kafka-2.8.0
  • The configuration of the consumers is the following:

{ "TopicReplicationFactor": 3, "TopicMetadataPropagationTimeoutMs": 3000, "MetadataMaxAgeMs": 300000, "ConsumerSessionTimeoutMs": 90000, "ConsumerHeartbeatIntervalMs": 15000, "CommitOffsetsIntervalMs": 2000, "PartitionAssignmentStrategy" : CooperativeSticky }

  • [ CentOS Linux] Operating system.

  • Client logs:

2021-12-16 14:58:38,195 [INFO] [] Kafka service-q6prq-CGroup1#consumer-4: [REQTMOUT-service-q6prq-CGroup1#consumer-4] [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out OffsetCommitRequest in flight (after 60059ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 241386ms 2021-12-16 14:58:38,195 [WARN] [] Kafka service-q6prq-CGroup1#consumer-4: [REQTMOUT-service-q6prq-CGroup1#consumer-4] [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests 2021-12-16 14:58:38,196 [ERROR] [] Kafka service-q6prq-CGroup1#consumer-4: [FAIL-service-q6prq-CGroup1#consumer-4] [thrd:GroupCoordinator]: GroupCoordinator: backend-kafka-kafka-0.backend-kafka-kafka-brokers.kafka.svc:9092: 1 request(s) timed out: disconnect (after 2417266ms in state UP) 2021-12-16 14:58:38,196 [WARN] [] Kafka service-q6prq-CGroup1#consumer-4: Local Error [Local_TimedOut] GroupCoordinator: backend-kafka-kafka-0.backend-kafka-kafka-brokers.kafka.svc:9092: 1 request(s) timed out: disconnect (after 2417266ms in state UP) 2021-12-16 14:58:51,652 [WARN] [] Confluent.Kafka.KafkaException: Broker: Specified group generation id is not valid at Confluent.Kafka.Impl.SafeKafkaHandle.Commit(IEnumerable1 offsets) at Confluent.Kafka.Consumer2.Commit(ConsumeResult2 result) 2021-12-16 15:00:02,185 [INFO] [] Kafka service-q6prq-CGroup1#consumer-4: [REQTMOUT-service-q6prq-CGroup1#consumer-4] [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out OffsetCommitRequest in flight (after 60059ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 242839ms 2021-12-16 15:00:02,186 [WARN] [] Kafka service-q6prq-CGroup1#consumer-4: [REQTMOUT-service-q6prq-CGroup1#consumer-4] [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests 2021-12-16 15:00:02,186 [ERROR] [] Kafka service-q6prq-CGroup1#consumer-4: [FAIL-service-q6prq-CGroup1#consumer-4] [thrd:GroupCoordinator]: GroupCoordinator: kafka-brokers: 1 request(s) timed out: disconnect (after 83986ms in state UP, 1 identical error(s) suppressed) 2021-12-16 15:00:02,186 [WARN ] [] Kafka service-q6prq-CGroup1#consumer-4: Local Error [Local_TimedOut] GroupCoordinator: kafka-brokers: 1 request(s) timed out: disconnect (after 83986ms in state UP, 1 identical error(s) suppressed) 2021-12-16 15:00:02,318 [WARN ] [] Kafka service-q6prq-CGroup1#consumer-4: [ASSIGN-service-q6prq-CGroup1#consumer-4] [thrd:main]: Group "CGroup1": application *assign() call failed: Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE Unhandled exception. Confluent.Kafka.KafkaException: Local: Erroneous state at Confluent.Kafka.Impl.SafeKafkaHandle.AssignImpl(IEnumerable1 partitions, Func3 assignMethodErr, Func3 assignMethodError) at Confluent.Kafka.Impl.SafeKafkaHandle.Assign(IEnumerable1 partitions) at Confluent.Kafka.Consumer2.Unassign() at Confluent.Kafka.Consumer`2.RebalanceCallback(IntPtr rk, ErrorCode err, IntPtr partitionsPtr, IntPtr opaque)

  • Broker log excerpts

2021-12-16 14:58:51,649 INFO [GroupCoordinator 0]: Member service-7hwzp-CGroup1-7de10950-493d-4e53-a089-4a79a8a8d3f1 in group CGroup1 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [executor-Heartbeat] 2021-12-16 14:58:51,649 INFO [GroupCoordinator 0]: Stabilized group CGroup1 generation 402 (__consumer_offsets-45) with 4 members (kafka.coordinator.group.GroupCoordinator) [executor-Heartbeat] 2021-12-16 14:59:02,024 INFO [GroupCoordinator 0]: Preparing to rebalance group CGroup1 in state PreparingRebalance with old generation 402 (__consumer_offsets-45) (reason: Adding new member service-9jbqc-CGroup1-bacb5f0e-96a5-4acb-8b75-391134b2ac1e with group instance id None) (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-2]

  • Critical issue : Process crash

ZakChar avatar Dec 17 '21 16:12 ZakChar

this report warrants further testing on our side i think.

mhowlett avatar Mar 14 '22 17:03 mhowlett

Seeing something similar when using CooperativeSticky:

%4|1656678616.102|ASSIGN|event-consumer#consumer-1| [thrd:main]: Group "backend-tester_ConcurrencyTest": application *assign() call failed: Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE
Unhandled exception. Confluent.Kafka.KafkaException: Local: Erroneous state
   at Confluent.Kafka.Impl.SafeKafkaHandle.AssignImpl(IEnumerable`1 partitions, Func`3 assignMethodErr, Func`3 assignMethodError)
   at Confluent.Kafka.Impl.SafeKafkaHandle.Assign(IEnumerable`1 partitions)
   at Confluent.Kafka.Consumer`2.Unassign()
   at Confluent.Kafka.Consumer`2.RebalanceCallback(IntPtr rk, ErrorCode err, IntPtr partitionsPtr, IntPtr opaque)

I started two consumer instances one after another, expecting some partitions to be revoked from the first instance and assigned to the second instance once that came up. Instead the first instance threw this error and died.

Khazuar avatar Jul 01 '22 14:07 Khazuar

@Khazuar - is this with v1.9.0?

mhowlett avatar Jul 01 '22 14:07 mhowlett

@Khazuar - is this with v1.9.0?

yes.

Khazuar avatar Jul 01 '22 15:07 Khazuar

@ZakChar @Khazuar I tried reproducing this with a sample consumer using the scenarios you described above, but wasn't able to reproduce this error with the latest version(1.9.3).

Do you have any sample application that I can run locally to verify this issue?

anchitj avatar Nov 10 '22 12:11 anchitj

I have recently run into this same behavior for CooperativeSticky assignment and came across this active issue. I do not have a sample application for verification, but can provide the following details:

Kafka Cluster - v2.8.0 Confluent.Kafka - v1.9.3 (with BrokerVersionFallback set to 2.8.0 as well)

%4|1668113437.046|ASSIGN|rdkafka#consumer-3| [thrd:main]: Group "OMITTED": application *assign() call failed: Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE
Unhandled exception. Confluent.Kafka.KafkaException: Local: Erroneous state
at Confluent.Kafka.Impl.SafeKafkaHandle.AssignImpl(IEnumerable`1 partitions, Func`3 assignMethodErr, Func`3 assignMethodError)
at Confluent.Kafka.Impl.SafeKafkaHandle.Assign(IEnumerable`1 partitions)
at Confluent.Kafka.Consumer`2.Unassign()
at Confluent.Kafka.Consumer`2.RebalanceCallback(IntPtr rk, ErrorCode err, IntPtr partitionsPtr, IntPtr opaque)
at Confluent.Kafka.Impl.NativeMethods.NativeMethods_Centos6.rd_kafka_consumer_poll(IntPtr rk, IntPtr timeout_ms)
at Confluent.Kafka.Impl.Librdkafka.consumer_poll(IntPtr rk, IntPtr timeout_ms)
at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout) 
Fatal error. Internal CLR error. (0x80131506)
at Confluent.Kafka.Impl.NativeMethods.NativeMethods_Centos6.rd_kafka_consumer_poll(IntPtr, IntPtr)
at Confluent.Kafka.Impl.NativeMethods.NativeMethods_Centos6.rd_kafka_consumer_poll(IntPtr, IntPtr)
at Confluent.Kafka.Impl.Librdkafka.consumer_poll(IntPtr, IntPtr) 

Hoping for further contributions from those mentioned above, but wanted to ensure this issue was bumped as ongoing

DaveStonge avatar Nov 10 '22 21:11 DaveStonge