confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
Unhandled rebalance exception using StickyCooperative partitioner
Description
The following exception was thrown during a rebalance, and caused a consumer service to crash :
Unhandled exception. Confluent.Kafka.KafkaException: Local: Erroneous state
at Confluent.Kafka.Impl.SafeKafkaHandle.AssignImpl(IEnumerable1 partitions, Func3 assignMethodErr, Func3 assignMethodError) at Confluent.Kafka.Impl.SafeKafkaHandle.Assign(IEnumerable1 partitions)
at Confluent.Kafka.Consumer2.Unassign() at Confluent.Kafka.Consumer2.RebalanceCallback(IntPtr rk, ErrorCode err, IntPtr partitionsPtr, IntPtr opaque)
How to reproduce
One cluster of 3 brokers, and 5 services in one consumer group, each consuming 150 partitions. The error seemed to happen when partitions were assigned to a consumer after another consumer lost partitions because of a disconnection.
Side note: The clients consume messages in batches, pass them to workers and store message offsets when processing is done (offsets are not stored synchronously). This process seems to cause many commit bad generation errors which cause partition losses at times, leading to more rebalacings.
Checklist
- Confluent.Kafka 1.8.2.
- Kafka-2.8.0
- The configuration of the consumers is the following:
{ "TopicReplicationFactor": 3, "TopicMetadataPropagationTimeoutMs": 3000, "MetadataMaxAgeMs": 300000, "ConsumerSessionTimeoutMs": 90000, "ConsumerHeartbeatIntervalMs": 15000, "CommitOffsetsIntervalMs": 2000, "PartitionAssignmentStrategy" : CooperativeSticky }
-
[ CentOS Linux] Operating system.
-
Client logs:
2021-12-16 14:58:38,195 [INFO] [] Kafka service-q6prq-CGroup1#consumer-4: [REQTMOUT-service-q6prq-CGroup1#consumer-4] [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out OffsetCommitRequest in flight (after 60059ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 241386ms
2021-12-16 14:58:38,195 [WARN] [] Kafka service-q6prq-CGroup1#consumer-4: [REQTMOUT-service-q6prq-CGroup1#consumer-4] [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
2021-12-16 14:58:38,196 [ERROR] [] Kafka service-q6prq-CGroup1#consumer-4: [FAIL-service-q6prq-CGroup1#consumer-4] [thrd:GroupCoordinator]: GroupCoordinator: backend-kafka-kafka-0.backend-kafka-kafka-brokers.kafka.svc:9092: 1 request(s) timed out: disconnect (after 2417266ms in state UP)
2021-12-16 14:58:38,196 [WARN] [] Kafka service-q6prq-CGroup1#consumer-4: Local Error [Local_TimedOut] GroupCoordinator: backend-kafka-kafka-0.backend-kafka-kafka-brokers.kafka.svc:9092: 1 request(s) timed out: disconnect (after 2417266ms in state UP)
2021-12-16 14:58:51,652 [WARN] [] Confluent.Kafka.KafkaException: Broker: Specified group generation id is not valid
at Confluent.Kafka.Impl.SafeKafkaHandle.Commit(IEnumerable1 offsets) at Confluent.Kafka.Consumer2.Commit(ConsumeResult2 result) 2021-12-16 15:00:02,185 [INFO] [] Kafka service-q6prq-CGroup1#consumer-4: [REQTMOUT-service-q6prq-CGroup1#consumer-4] [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out OffsetCommitRequest in flight (after 60059ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 242839ms 2021-12-16 15:00:02,186 [WARN] [] Kafka service-q6prq-CGroup1#consumer-4: [REQTMOUT-service-q6prq-CGroup1#consumer-4] [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests 2021-12-16 15:00:02,186 [ERROR] [] Kafka service-q6prq-CGroup1#consumer-4: [FAIL-service-q6prq-CGroup1#consumer-4] [thrd:GroupCoordinator]: GroupCoordinator: kafka-brokers: 1 request(s) timed out: disconnect (after 83986ms in state UP, 1 identical error(s) suppressed) 2021-12-16 15:00:02,186 [WARN ] [] Kafka service-q6prq-CGroup1#consumer-4: Local Error [Local_TimedOut] GroupCoordinator: kafka-brokers: 1 request(s) timed out: disconnect (after 83986ms in state UP, 1 identical error(s) suppressed) 2021-12-16 15:00:02,318 [WARN ] [] Kafka service-q6prq-CGroup1#consumer-4: [ASSIGN-service-q6prq-CGroup1#consumer-4] [thrd:main]: Group "CGroup1": application *assign() call failed: Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE Unhandled exception. Confluent.Kafka.KafkaException: Local: Erroneous state at Confluent.Kafka.Impl.SafeKafkaHandle.AssignImpl(IEnumerable1 partitions, Func3 assignMethodErr, Func3 assignMethodError)
at Confluent.Kafka.Impl.SafeKafkaHandle.Assign(IEnumerable1 partitions) at Confluent.Kafka.Consumer2.Unassign()
at Confluent.Kafka.Consumer`2.RebalanceCallback(IntPtr rk, ErrorCode err, IntPtr partitionsPtr, IntPtr opaque)
- Broker log excerpts
2021-12-16 14:58:51,649 INFO [GroupCoordinator 0]: Member service-7hwzp-CGroup1-7de10950-493d-4e53-a089-4a79a8a8d3f1 in group CGroup1 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [executor-Heartbeat] 2021-12-16 14:58:51,649 INFO [GroupCoordinator 0]: Stabilized group CGroup1 generation 402 (__consumer_offsets-45) with 4 members (kafka.coordinator.group.GroupCoordinator) [executor-Heartbeat] 2021-12-16 14:59:02,024 INFO [GroupCoordinator 0]: Preparing to rebalance group CGroup1 in state PreparingRebalance with old generation 402 (__consumer_offsets-45) (reason: Adding new member service-9jbqc-CGroup1-bacb5f0e-96a5-4acb-8b75-391134b2ac1e with group instance id None) (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-2]
- Critical issue : Process crash
this report warrants further testing on our side i think.
Seeing something similar when using CooperativeSticky:
%4|1656678616.102|ASSIGN|event-consumer#consumer-1| [thrd:main]: Group "backend-tester_ConcurrencyTest": application *assign() call failed: Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE
Unhandled exception. Confluent.Kafka.KafkaException: Local: Erroneous state
at Confluent.Kafka.Impl.SafeKafkaHandle.AssignImpl(IEnumerable`1 partitions, Func`3 assignMethodErr, Func`3 assignMethodError)
at Confluent.Kafka.Impl.SafeKafkaHandle.Assign(IEnumerable`1 partitions)
at Confluent.Kafka.Consumer`2.Unassign()
at Confluent.Kafka.Consumer`2.RebalanceCallback(IntPtr rk, ErrorCode err, IntPtr partitionsPtr, IntPtr opaque)
I started two consumer instances one after another, expecting some partitions to be revoked from the first instance and assigned to the second instance once that came up. Instead the first instance threw this error and died.
@Khazuar - is this with v1.9.0?
@Khazuar - is this with v1.9.0?
yes.
@ZakChar @Khazuar I tried reproducing this with a sample consumer using the scenarios you described above, but wasn't able to reproduce this error with the latest version(1.9.3).
Do you have any sample application that I can run locally to verify this issue?
I have recently run into this same behavior for CooperativeSticky assignment and came across this active issue. I do not have a sample application for verification, but can provide the following details:
Kafka Cluster - v2.8.0 Confluent.Kafka - v1.9.3 (with BrokerVersionFallback set to 2.8.0 as well)
%4|1668113437.046|ASSIGN|rdkafka#consumer-3| [thrd:main]: Group "OMITTED": application *assign() call failed: Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE
Unhandled exception. Confluent.Kafka.KafkaException: Local: Erroneous state
at Confluent.Kafka.Impl.SafeKafkaHandle.AssignImpl(IEnumerable`1 partitions, Func`3 assignMethodErr, Func`3 assignMethodError)
at Confluent.Kafka.Impl.SafeKafkaHandle.Assign(IEnumerable`1 partitions)
at Confluent.Kafka.Consumer`2.Unassign()
at Confluent.Kafka.Consumer`2.RebalanceCallback(IntPtr rk, ErrorCode err, IntPtr partitionsPtr, IntPtr opaque)
at Confluent.Kafka.Impl.NativeMethods.NativeMethods_Centos6.rd_kafka_consumer_poll(IntPtr rk, IntPtr timeout_ms)
at Confluent.Kafka.Impl.Librdkafka.consumer_poll(IntPtr rk, IntPtr timeout_ms)
at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
Fatal error. Internal CLR error. (0x80131506)
at Confluent.Kafka.Impl.NativeMethods.NativeMethods_Centos6.rd_kafka_consumer_poll(IntPtr, IntPtr)
at Confluent.Kafka.Impl.NativeMethods.NativeMethods_Centos6.rd_kafka_consumer_poll(IntPtr, IntPtr)
at Confluent.Kafka.Impl.Librdkafka.consumer_poll(IntPtr, IntPtr)
Hoping for further contributions from those mentioned above, but wanted to ensure this issue was bumped as ongoing