Brighter icon indicating copy to clipboard operation
Brighter copied to clipboard

[Bug] Error when number of partition changed using Cooperative rebalance protocol

Open jsyauideagen opened this issue 2 months ago • 6 comments

Describe the bug

After we increase the number of partition of a topic (e.g. from 2 -> 5), the consumer will hit into rebalance related error.

To Reproduce

Increase any existing topic's number of partition and you will hit into this error.

Exceptions (if any)

%4|1713152471.109|ASSIGN|icsa.notification#consumer-6| [thrd:main]: Group "ICSA.Notification.Services.NotificationProcessor": application *assign() call failed: Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE Unhandled exception. Confluent.Kafka.KafkaException: Local: Erroneous state at Confluent.Kafka.Impl.SafeKafkaHandle.AssignImpl(IEnumerable1 partitions, Func3 assignMethodErr, Func3 assignMethodError) at Confluent.Kafka.Impl.SafeKafkaHandle.Assign(IEnumerable1 partitions) at Confluent.Kafka.Consumer2.Unassign() at Confluent.Kafka.Consumer2.RebalanceCallback(IntPtr rk, ErrorCode err, IntPtr partitionsPtr, IntPtr opaque)

Further technical details

  • Brighter version - 9.7.6
  • .NET SDK: Version: 8.0.202 Commit: 25674bb2f4 Workload version: 8.0.200-manifests.8cf8de6d

Runtime Environment: OS Name: Windows OS Version: 10.0.22621 OS Platform: Windows RID: win-x64 Base Path: C:\Program Files\dotnet\sdk\8.0.202\

.NET workloads installed: There are no installed workloads to display.

Host: Version: 8.0.3 Architecture: x64 Commit: 9f4b1f5d66

.NET SDKs installed: 2.1.818 [C:\Program Files\dotnet\sdk] 3.1.426 [C:\Program Files\dotnet\sdk] 5.0.408 [C:\Program Files\dotnet\sdk] 6.0.203 [C:\Program Files\dotnet\sdk] 8.0.103 [C:\Program Files\dotnet\sdk] 8.0.202 [C:\Program Files\dotnet\sdk]

.NET runtimes installed: Microsoft.AspNetCore.All 2.1.30 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.All] Microsoft.AspNetCore.App 2.1.30 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App] Microsoft.AspNetCore.App 3.1.32 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App] Microsoft.AspNetCore.App 5.0.17 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App] Microsoft.AspNetCore.App 6.0.5 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App] Microsoft.AspNetCore.App 6.0.28 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App] Microsoft.AspNetCore.App 7.0.17 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App] Microsoft.AspNetCore.App 7.0.18 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App] Microsoft.AspNetCore.App 8.0.3 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App] Microsoft.NETCore.App 2.0.9 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.NETCore.App 2.1.30 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.NETCore.App 3.1.32 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.NETCore.App 5.0.17 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.NETCore.App 6.0.28 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.NETCore.App 6.0.29 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.NETCore.App 7.0.17 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.NETCore.App 7.0.18 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.NETCore.App 8.0.3 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App] Microsoft.WindowsDesktop.App 3.1.32 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App] Microsoft.WindowsDesktop.App 5.0.17 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App] Microsoft.WindowsDesktop.App 6.0.5 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App] Microsoft.WindowsDesktop.App 6.0.15 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App] Microsoft.WindowsDesktop.App 6.0.28 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App] Microsoft.WindowsDesktop.App 6.0.29 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App] Microsoft.WindowsDesktop.App 7.0.17 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App] Microsoft.WindowsDesktop.App 7.0.18 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App] Microsoft.WindowsDesktop.App 8.0.3 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App]

Other architectures found: x86 [C:\Program Files (x86)\dotnet] registered at [HKLM\SOFTWARE\dotnet\Setup\InstalledVersions\x86\InstallLocation]

Environment variables: Not set

global.json file: Not found

  • The OS - Window
  • Potential fix in Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs
     _consumer = new ConsumerBuilder<string, byte[]>(_consumerConfig) 
      .SetPartitionsAssignedHandler((consumer, partitions) =>
     {
         // Log information about newly assigned partitions
         var partitionInfo = partitions.Select(p => $"{p.Topic} : {p.Partition.Value}");
         s_logger.LogInformation("Partition Assigned {Channels}", String.Join(",", partitionInfo));
    
         // Determine strategy and act accordingly
         if (_consumerConfig.PartitionAssignmentStrategy == PartitionAssignmentStrategy.CooperativeSticky)
         {
             consumer.IncrementalAssign(partitions);
         }
         else
         {
             consumer.Assign(partitions);
         }
     })
     .SetPartitionsRevokedHandler((consumer, partitions) =>
     {
         // Log information about revoked partitions
         var revokedPartitions = partitions.Select(tpo => $"{tpo.Topic} : {tpo.Partition}");
         s_logger.LogInformation("Partitions Revoked {Channels}", string.Join(", ", revokedPartitions));
    
         // Determine strategy and act accordingly
         if (_consumerConfig.PartitionAssignmentStrategy == PartitionAssignmentStrategy.CooperativeSticky)
         {
             consumer.IncrementalUnassign(partitions);
         }
         else
         {
             consumer.Unassign();
         }
     })
     .SetPartitionsLostHandler((consumer, partitions) =>
     {
         // Log information about lost partitions
         var lostPartitions = partitions.Select(tpo => $"{tpo.Topic} : {tpo.Partition}");
         s_logger.LogInformation("Partitions Lost {Channels}", string.Join(", ", lostPartitions));
    
         // This is typically treated the same as revocation
         consumer.IncrementalUnassign(partitions);
     })
     .SetErrorHandler((consumer, error) =>
     {
         s_logger.LogError("Error in Kafka Consumer: {ErrorCode}, Reason: {ErrorMessage}, Fatal: {FatalError}", 
             error.Code, error.Reason, error.IsFatal);
     })
     .Build();
    

jsyauideagen avatar Apr 16 '24 02:04 jsyauideagen