Brighter
Brighter copied to clipboard
[Bug] Error when number of partition changed using Cooperative rebalance protocol
Describe the bug
After we increase the number of partition of a topic (e.g. from 2 -> 5), the consumer will hit into rebalance related error.
To Reproduce
Increase any existing topic's number of partition and you will hit into this error.
Exceptions (if any)
%4|1713152471.109|ASSIGN|icsa.notification#consumer-6| [thrd:main]: Group "ICSA.Notification.Services.NotificationProcessor": application *assign() call failed: Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE Unhandled exception. Confluent.Kafka.KafkaException: Local: Erroneous state at Confluent.Kafka.Impl.SafeKafkaHandle.AssignImpl(IEnumerable
1 partitions, Func
3 assignMethodErr, Func3 assignMethodError) at Confluent.Kafka.Impl.SafeKafkaHandle.Assign(IEnumerable
1 partitions) at Confluent.Kafka.Consumer2.Unassign() at Confluent.Kafka.Consumer
2.RebalanceCallback(IntPtr rk, ErrorCode err, IntPtr partitionsPtr, IntPtr opaque)
Further technical details
- Brighter version - 9.7.6
- .NET SDK: Version: 8.0.202 Commit: 25674bb2f4 Workload version: 8.0.200-manifests.8cf8de6d
Runtime Environment: OS Name: Windows OS Version: 10.0.22621 OS Platform: Windows RID: win-x64 Base Path: C:\Program Files\dotnet\sdk\8.0.202\
.NET workloads installed: There are no installed workloads to display.
Host: Version: 8.0.3 Architecture: x64 Commit: 9f4b1f5d66
.NET SDKs installed: 2.1.818 [C:\Program Files\dotnet\sdk] 3.1.426 [C:\Program Files\dotnet\sdk] 5.0.408 [C:\Program Files\dotnet\sdk] 6.0.203 [C:\Program Files\dotnet\sdk] 8.0.103 [C:\Program Files\dotnet\sdk] 8.0.202 [C:\Program Files\dotnet\sdk]
.NET runtimes installed: Microsoft.AspNetCore.All 2.1.30 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.All] Microsoft.AspNetCore.App 2.1.30 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App] Microsoft.AspNetCore.App 3.1.32 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App] Microsoft.AspNetCore.App 5.0.17 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App] Microsoft.AspNetCore.App 6.0.5 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App] Microsoft.AspNetCore.App 6.0.28 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App] Microsoft.AspNetCore.App 7.0.17 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App] Microsoft.AspNetCore.App 7.0.18 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App] Microsoft.AspNetCore.App 8.0.3 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App] Microsoft.NETCore.App 2.0.9 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.NETCore.App 2.1.30 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.NETCore.App 3.1.32 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.NETCore.App 5.0.17 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.NETCore.App 6.0.28 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.NETCore.App 6.0.29 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.NETCore.App 7.0.17 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.NETCore.App 7.0.18 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.NETCore.App 8.0.3 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.WindowsDesktop.App 3.1.32 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App] Microsoft.WindowsDesktop.App 5.0.17 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App] Microsoft.WindowsDesktop.App 6.0.5 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App] Microsoft.WindowsDesktop.App 6.0.15 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App] Microsoft.WindowsDesktop.App 6.0.28 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App] Microsoft.WindowsDesktop.App 6.0.29 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App] Microsoft.WindowsDesktop.App 7.0.17 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App] Microsoft.WindowsDesktop.App 7.0.18 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App] Microsoft.WindowsDesktop.App 8.0.3 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App]
Other architectures found: x86 [C:\Program Files (x86)\dotnet] registered at [HKLM\SOFTWARE\dotnet\Setup\InstalledVersions\x86\InstallLocation]
Environment variables: Not set
global.json file: Not found
- The OS - Window
- Potential fix in Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs
_consumer = new ConsumerBuilder<string, byte[]>(_consumerConfig) .SetPartitionsAssignedHandler((consumer, partitions) => { // Log information about newly assigned partitions var partitionInfo = partitions.Select(p => $"{p.Topic} : {p.Partition.Value}"); s_logger.LogInformation("Partition Assigned {Channels}", String.Join(",", partitionInfo)); // Determine strategy and act accordingly if (_consumerConfig.PartitionAssignmentStrategy == PartitionAssignmentStrategy.CooperativeSticky) { consumer.IncrementalAssign(partitions); } else { consumer.Assign(partitions); } }) .SetPartitionsRevokedHandler((consumer, partitions) => { // Log information about revoked partitions var revokedPartitions = partitions.Select(tpo => $"{tpo.Topic} : {tpo.Partition}"); s_logger.LogInformation("Partitions Revoked {Channels}", string.Join(", ", revokedPartitions)); // Determine strategy and act accordingly if (_consumerConfig.PartitionAssignmentStrategy == PartitionAssignmentStrategy.CooperativeSticky) { consumer.IncrementalUnassign(partitions); } else { consumer.Unassign(); } }) .SetPartitionsLostHandler((consumer, partitions) => { // Log information about lost partitions var lostPartitions = partitions.Select(tpo => $"{tpo.Topic} : {tpo.Partition}"); s_logger.LogInformation("Partitions Lost {Channels}", string.Join(", ", lostPartitions)); // This is typically treated the same as revocation consumer.IncrementalUnassign(partitions); }) .SetErrorHandler((consumer, error) => { s_logger.LogError("Error in Kafka Consumer: {ErrorCode}, Reason: {ErrorMessage}, Fatal: {FatalError}", error.Code, error.Reason, error.IsFatal); }) .Build();