kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-17218: kafka-consumer-groups should handle describe groups with deleted topics

Open OmniaGM opened this issue 1 year ago • 5 comments

The script will return "-" as placeholder for offsets and lag if topic partition is unknown when describe groups.

Note:

  • I added integration test only as there is no unit test for this command
  • I moved produceRecord from DeleteOffsetsConsumerGroupCommandIntegrationTest to ConsumerGroupCommandTestUtils to be used in DescribeConsumerGroupTest.
  • I also changed generator method to allow passing server config to disable auto creation of the topic as topic keep getting recreated without auto.create.topics.enable disabled

Committer Checklist (excluded from commit message)

  • [ ] Verify design and implementation
  • [ ] Verify test coverage and CI build status
  • [ ] Verify documentation (including upgrade notes)

OmniaGM avatar Aug 01 '24 13:08 OmniaGM

I tried this code and it didn't really behave quite right I feel. I did:

  • Create 2 topics T1 and T2
  • Reset offsets of consumer group G1 consuming from earliest offset of T1 with bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T1 --group G1 --to-earliest --execute
  • Reset offsets of consumer group G1 consuming from earliest offset of T2 with bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T2 --group G1 --to-earliest --execute
  • Start bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T1 --group G1 --to-earliest --execute under a debugger and run up to a breakpoint in ConsumerGroupCommand.getLogStartOffsets
  • Delete topic T1 so that the new exception handling code is executed.
  • Resume the debugger.

The output is as follows:

[2024-08-02 12:22:13,881] ERROR [AdminClient clientId=adminclient-1] Received unknown topic error for topic T1 (org.apache.kafka.clients.admin.internals.PartitionLeaderStrategy)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
Error getting starting offset of topic partition: T1-0
Option                                  Description                            
------                                  -----------                            
--all-groups                            Apply to all consumer groups.          
--all-topics                            Consider all topics assigned to a      
                                          group in the `reset-offsets` process.
--bootstrap-server <String: server to   REQUIRED: The server(s) to connect to. 
  connect to>                                                                  
--by-duration <String: duration>        Reset offsets to offset by duration    
                                          from current timestamp. Format:      
                                          'PnDTnHnMnS'                         
--command-config <String: command       Property file containing configs to be 
  config property file>                   passed to Admin Client and Consumer. 
--delete                                Pass in groups to delete topic         
                                          partition offsets and ownership      
                                          information over the entire consumer 
                                          group. For instance --group g1 --    
                                          group g2                             
--delete-offsets                        Delete offsets of consumer group.      
                                          Supports one consumer group at the   
                                          time, and multiple topics.           
--describe                              Describe consumer group and list       
                                          offset lag (number of messages not   
                                          yet processed) related to given      
                                          group.                               
--dry-run                               Only show results without executing    
                                          changes on Consumer Groups.          
                                          Supported operations: reset-offsets. 
--execute                               Execute operation. Supported           
                                          operations: reset-offsets.           
--export                                Export operation execution to a CSV    
                                          file. Supported operations: reset-   
                                          offsets.                             
--from-file <String: path to CSV file>  Reset offsets to values defined in CSV 
                                          file.                                
--group <String: consumer group>        The consumer group we wish to act on.  
--help                                  Print usage information.               
--list                                  List all consumer groups.              
--members                               Describe members of the group. This    
                                          option may be used with '--describe' 
                                          and '--bootstrap-server' options     
                                          only.                                
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          members                              
--offsets                               Describe the group and list all topic  
                                          partitions in the group along with   
                                          their offset lag. This is the        
                                          default sub-action of and may be     
                                          used with '--describe' and '--       
                                          bootstrap-server' options only.      
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          offsets                              
--reset-offsets                         Reset offsets of consumer group.       
                                          Supports one consumer group at the   
                                          time, and instances should be        
                                          inactive                             
                                        Has 2 execution options: --dry-run     
                                          (the default) to plan which offsets  
                                          to reset, and --execute to update    
                                          the offsets. Additionally, the --    
                                          export option is used to export the  
                                          results to a CSV format.             
                                        You must choose one of the following   
                                          reset specifications: --to-datetime, 
                                          --by-duration, --to-earliest, --to-  
                                          latest, --shift-by, --from-file, --  
                                          to-current, --to-offset.             
                                        To define the scope use --all-topics   
                                          or --topic. One scope must be        
                                          specified unless you use '--from-    
                                          file'.                               
--shift-by <Long: number-of-offsets>    Reset offsets shifting current offset  
                                          by 'n', where 'n' can be positive or 
                                          negative.                            
--state [String]                        When specified with '--describe',      
                                          includes the state of the group.     
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          state                                
                                        When specified with '--list', it       
                                          displays the state of all groups. It 
                                          can also be used to list groups with 
                                          specific states.                     
                                        Example: --bootstrap-server localhost: 
                                          9092 --list --state stable,empty     
                                        This option may be used with '--       
                                          describe', '--list' and '--bootstrap-
                                          server' options only.                
--timeout <Long: timeout (ms)>          The timeout that can be set for some   
                                          use cases. For example, it can be    
                                          used when describing the group to    
                                          specify the maximum amount of time   
                                          in milliseconds to wait before the   
                                          group stabilizes (when the group is  
                                          just created, or is going through    
                                          some changes). (default: 5000)       
--to-current                            Reset offsets to current offset.       
--to-datetime <String: datetime>        Reset offsets to offset from datetime. 
                                          Format: 'YYYY-MM-DDTHH:mm:SS.sss'    
--to-earliest                           Reset offsets to earliest offset.      
--to-latest                             Reset offsets to latest offset.        
--to-offset <Long: offset>              Reset offsets to a specific offset.    
--topic <String: topic>                 The topic whose consumer group         
                                          information should be deleted or     
                                          topic whose should be included in    
                                          the reset offset process. In `reset- 
                                          offsets` case, partitions can be     
                                          specified using this format: `topic1:
                                          0,1,2`, where 0,1,2 are the          
                                          partition to be included in the      
                                          process. Reset-offsets also supports 
                                          multiple topic inputs.               
--type [String]                         When specified with '--list', it       
                                          displays the types of all the        
                                          groups. It can also be used to list  
                                          groups with specific types.          
                                        Example: --bootstrap-server localhost: 
                                          9092 --list --type classic,consumer  
                                        This option may be used with the '--   
                                          list' option only.                   
--verbose                               Provide additional information, if     
                                          any, when describing the group. This 
                                          option may be used with '--          
                                          offsets'/'--members'/'--state' and   
                                          '--bootstrap-server' options only.   
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          members --verbose                    
--version                               Display Kafka version.   

The command reports the error neatly, but then it seems to print out the command usage message.

AndrewJSchofield avatar Aug 02 '24 11:08 AndrewJSchofield

I tried this code and it didn't really behave quite right I feel. I did:

  • Create 2 topics T1 and T2
  • Reset offsets of consumer group G1 consuming from earliest offset of T1 with bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T1 --group G1 --to-earliest --execute
  • Reset offsets of consumer group G1 consuming from earliest offset of T2 with bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T2 --group G1 --to-earliest --execute
  • Start bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T1 --group G1 --to-earliest --execute under a debugger and run up to a breakpoint in ConsumerGroupCommand.getLogStartOffsets
  • Delete topic T1 so that the new exception handling code is executed.
  • Resume the debugger.

The output is as follows:

[2024-08-02 12:22:13,881] ERROR [AdminClient clientId=adminclient-1] Received unknown topic error for topic T1 (org.apache.kafka.clients.admin.internals.PartitionLeaderStrategy)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
Error getting starting offset of topic partition: T1-0
Option                                  Description                            
------                                  -----------                            
--all-groups                            Apply to all consumer groups.          
--all-topics                            Consider all topics assigned to a      
                                          group in the `reset-offsets` process.
--bootstrap-server <String: server to   REQUIRED: The server(s) to connect to. 
  connect to>                                                                  
--by-duration <String: duration>        Reset offsets to offset by duration    
                                          from current timestamp. Format:      
                                          'PnDTnHnMnS'                         
--command-config <String: command       Property file containing configs to be 
  config property file>                   passed to Admin Client and Consumer. 
--delete                                Pass in groups to delete topic         
                                          partition offsets and ownership      
                                          information over the entire consumer 
                                          group. For instance --group g1 --    
                                          group g2                             
--delete-offsets                        Delete offsets of consumer group.      
                                          Supports one consumer group at the   
                                          time, and multiple topics.           
--describe                              Describe consumer group and list       
                                          offset lag (number of messages not   
                                          yet processed) related to given      
                                          group.                               
--dry-run                               Only show results without executing    
                                          changes on Consumer Groups.          
                                          Supported operations: reset-offsets. 
--execute                               Execute operation. Supported           
                                          operations: reset-offsets.           
--export                                Export operation execution to a CSV    
                                          file. Supported operations: reset-   
                                          offsets.                             
--from-file <String: path to CSV file>  Reset offsets to values defined in CSV 
                                          file.                                
--group <String: consumer group>        The consumer group we wish to act on.  
--help                                  Print usage information.               
--list                                  List all consumer groups.              
--members                               Describe members of the group. This    
                                          option may be used with '--describe' 
                                          and '--bootstrap-server' options     
                                          only.                                
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          members                              
--offsets                               Describe the group and list all topic  
                                          partitions in the group along with   
                                          their offset lag. This is the        
                                          default sub-action of and may be     
                                          used with '--describe' and '--       
                                          bootstrap-server' options only.      
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          offsets                              
--reset-offsets                         Reset offsets of consumer group.       
                                          Supports one consumer group at the   
                                          time, and instances should be        
                                          inactive                             
                                        Has 2 execution options: --dry-run     
                                          (the default) to plan which offsets  
                                          to reset, and --execute to update    
                                          the offsets. Additionally, the --    
                                          export option is used to export the  
                                          results to a CSV format.             
                                        You must choose one of the following   
                                          reset specifications: --to-datetime, 
                                          --by-duration, --to-earliest, --to-  
                                          latest, --shift-by, --from-file, --  
                                          to-current, --to-offset.             
                                        To define the scope use --all-topics   
                                          or --topic. One scope must be        
                                          specified unless you use '--from-    
                                          file'.                               
--shift-by <Long: number-of-offsets>    Reset offsets shifting current offset  
                                          by 'n', where 'n' can be positive or 
                                          negative.                            
--state [String]                        When specified with '--describe',      
                                          includes the state of the group.     
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          state                                
                                        When specified with '--list', it       
                                          displays the state of all groups. It 
                                          can also be used to list groups with 
                                          specific states.                     
                                        Example: --bootstrap-server localhost: 
                                          9092 --list --state stable,empty     
                                        This option may be used with '--       
                                          describe', '--list' and '--bootstrap-
                                          server' options only.                
--timeout <Long: timeout (ms)>          The timeout that can be set for some   
                                          use cases. For example, it can be    
                                          used when describing the group to    
                                          specify the maximum amount of time   
                                          in milliseconds to wait before the   
                                          group stabilizes (when the group is  
                                          just created, or is going through    
                                          some changes). (default: 5000)       
--to-current                            Reset offsets to current offset.       
--to-datetime <String: datetime>        Reset offsets to offset from datetime. 
                                          Format: 'YYYY-MM-DDTHH:mm:SS.sss'    
--to-earliest                           Reset offsets to earliest offset.      
--to-latest                             Reset offsets to latest offset.        
--to-offset <Long: offset>              Reset offsets to a specific offset.    
--topic <String: topic>                 The topic whose consumer group         
                                          information should be deleted or     
                                          topic whose should be included in    
                                          the reset offset process. In `reset- 
                                          offsets` case, partitions can be     
                                          specified using this format: `topic1:
                                          0,1,2`, where 0,1,2 are the          
                                          partition to be included in the      
                                          process. Reset-offsets also supports 
                                          multiple topic inputs.               
--type [String]                         When specified with '--list', it       
                                          displays the types of all the        
                                          groups. It can also be used to list  
                                          groups with specific types.          
                                        Example: --bootstrap-server localhost: 
                                          9092 --list --type classic,consumer  
                                        This option may be used with the '--   
                                          list' option only.                   
--verbose                               Provide additional information, if     
                                          any, when describing the group. This 
                                          option may be used with '--          
                                          offsets'/'--members'/'--state' and   
                                          '--bootstrap-server' options only.   
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          members --verbose                    
--version                               Display Kafka version.   

The command reports the error neatly, but then it seems to print out the command usage message.

I didn't test reset-offset as the jira is about the describe --all-groups which prevent showing metadata for the whole cluster. I think reset-offset for an individual group should behave as it is.

The print most likely because we don't throw anymore the exception I'll fix this and throw it only in this case.

OmniaGM avatar Aug 06 '24 09:08 OmniaGM

I tried this code and it didn't really behave quite right I feel. I did:

  • Create 2 topics T1 and T2
  • Reset offsets of consumer group G1 consuming from earliest offset of T1 with bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T1 --group G1 --to-earliest --execute
  • Reset offsets of consumer group G1 consuming from earliest offset of T2 with bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T2 --group G1 --to-earliest --execute
  • Start bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T1 --group G1 --to-earliest --execute under a debugger and run up to a breakpoint in ConsumerGroupCommand.getLogStartOffsets
  • Delete topic T1 so that the new exception handling code is executed.
  • Resume the debugger.

The output is as follows:

[2024-08-02 12:22:13,881] ERROR [AdminClient clientId=adminclient-1] Received unknown topic error for topic T1 (org.apache.kafka.clients.admin.internals.PartitionLeaderStrategy)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
Error getting starting offset of topic partition: T1-0
Option                                  Description                            
------                                  -----------                            
--all-groups                            Apply to all consumer groups.          
--all-topics                            Consider all topics assigned to a      
                                          group in the `reset-offsets` process.
--bootstrap-server <String: server to   REQUIRED: The server(s) to connect to. 
  connect to>                                                                  
--by-duration <String: duration>        Reset offsets to offset by duration    
                                          from current timestamp. Format:      
                                          'PnDTnHnMnS'                         
--command-config <String: command       Property file containing configs to be 
  config property file>                   passed to Admin Client and Consumer. 
--delete                                Pass in groups to delete topic         
                                          partition offsets and ownership      
                                          information over the entire consumer 
                                          group. For instance --group g1 --    
                                          group g2                             
--delete-offsets                        Delete offsets of consumer group.      
                                          Supports one consumer group at the   
                                          time, and multiple topics.           
--describe                              Describe consumer group and list       
                                          offset lag (number of messages not   
                                          yet processed) related to given      
                                          group.                               
--dry-run                               Only show results without executing    
                                          changes on Consumer Groups.          
                                          Supported operations: reset-offsets. 
--execute                               Execute operation. Supported           
                                          operations: reset-offsets.           
--export                                Export operation execution to a CSV    
                                          file. Supported operations: reset-   
                                          offsets.                             
--from-file <String: path to CSV file>  Reset offsets to values defined in CSV 
                                          file.                                
--group <String: consumer group>        The consumer group we wish to act on.  
--help                                  Print usage information.               
--list                                  List all consumer groups.              
--members                               Describe members of the group. This    
                                          option may be used with '--describe' 
                                          and '--bootstrap-server' options     
                                          only.                                
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          members                              
--offsets                               Describe the group and list all topic  
                                          partitions in the group along with   
                                          their offset lag. This is the        
                                          default sub-action of and may be     
                                          used with '--describe' and '--       
                                          bootstrap-server' options only.      
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          offsets                              
--reset-offsets                         Reset offsets of consumer group.       
                                          Supports one consumer group at the   
                                          time, and instances should be        
                                          inactive                             
                                        Has 2 execution options: --dry-run     
                                          (the default) to plan which offsets  
                                          to reset, and --execute to update    
                                          the offsets. Additionally, the --    
                                          export option is used to export the  
                                          results to a CSV format.             
                                        You must choose one of the following   
                                          reset specifications: --to-datetime, 
                                          --by-duration, --to-earliest, --to-  
                                          latest, --shift-by, --from-file, --  
                                          to-current, --to-offset.             
                                        To define the scope use --all-topics   
                                          or --topic. One scope must be        
                                          specified unless you use '--from-    
                                          file'.                               
--shift-by <Long: number-of-offsets>    Reset offsets shifting current offset  
                                          by 'n', where 'n' can be positive or 
                                          negative.                            
--state [String]                        When specified with '--describe',      
                                          includes the state of the group.     
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          state                                
                                        When specified with '--list', it       
                                          displays the state of all groups. It 
                                          can also be used to list groups with 
                                          specific states.                     
                                        Example: --bootstrap-server localhost: 
                                          9092 --list --state stable,empty     
                                        This option may be used with '--       
                                          describe', '--list' and '--bootstrap-
                                          server' options only.                
--timeout <Long: timeout (ms)>          The timeout that can be set for some   
                                          use cases. For example, it can be    
                                          used when describing the group to    
                                          specify the maximum amount of time   
                                          in milliseconds to wait before the   
                                          group stabilizes (when the group is  
                                          just created, or is going through    
                                          some changes). (default: 5000)       
--to-current                            Reset offsets to current offset.       
--to-datetime <String: datetime>        Reset offsets to offset from datetime. 
                                          Format: 'YYYY-MM-DDTHH:mm:SS.sss'    
--to-earliest                           Reset offsets to earliest offset.      
--to-latest                             Reset offsets to latest offset.        
--to-offset <Long: offset>              Reset offsets to a specific offset.    
--topic <String: topic>                 The topic whose consumer group         
                                          information should be deleted or     
                                          topic whose should be included in    
                                          the reset offset process. In `reset- 
                                          offsets` case, partitions can be     
                                          specified using this format: `topic1:
                                          0,1,2`, where 0,1,2 are the          
                                          partition to be included in the      
                                          process. Reset-offsets also supports 
                                          multiple topic inputs.               
--type [String]                         When specified with '--list', it       
                                          displays the types of all the        
                                          groups. It can also be used to list  
                                          groups with specific types.          
                                        Example: --bootstrap-server localhost: 
                                          9092 --list --type classic,consumer  
                                        This option may be used with the '--   
                                          list' option only.                   
--verbose                               Provide additional information, if     
                                          any, when describing the group. This 
                                          option may be used with '--          
                                          offsets'/'--members'/'--state' and   
                                          '--bootstrap-server' options only.   
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          members --verbose                    
--version                               Display Kafka version.   

The command reports the error neatly, but then it seems to print out the command usage message.

Hi @AndrewJSchofield The code is behaving like this because prepareOffsetsToReset has the following condition https://github.com/apache/kafka/blob/3ddd8d0a0ec02eab8d9083d341ece14961fc0d1c/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java#L1001

While previously we were actually crashing before hitting this point as getLogStartOffsets would crash if topic partition not found. I can add a logic to crash if topic not found in this cases back. But I think the behaviour you are seeing now is the original intent based on the code.

OmniaGM avatar Aug 06 '24 13:08 OmniaGM

Code looks good to me, but I got a failure of the new test:

    org.opentest4j.AssertionFailedError: Condition not met within timeout 4000. Expected a data row and no error in describe results after deleting one of the consumed topics. ==> expected: <true> but was: <false>
        at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
        at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
        at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
        at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
        at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
        at app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:397)
        at app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:445)
        at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:394)
        at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378)
        at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368)
        at app//org.apache.kafka.tools.consumer.group.DescribeConsumerGroupTest.testDescribeAllExistingGroupsWithSomeDeletedTopics(DescribeConsumerGroupTest.java:317)```

AndrewJSchofield avatar Aug 06 '24 14:08 AndrewJSchofield

@dajac can you have a look when you have time?

OmniaGM avatar Aug 29 '24 13:08 OmniaGM

@chia7712 can you have a look when you have time?

OmniaGM avatar Sep 03 '24 12:09 OmniaGM

This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

github-actions[bot] avatar Jan 06 '25 03:01 github-actions[bot]

This PR has been closed since it has not had any activity in 120 days. If you feel like this was a mistake, or you would like to continue working on it, please feel free to re-open the PR and ask for a review.

github-actions[bot] avatar Feb 06 '25 03:02 github-actions[bot]

@OmniaGM sorry for late review. Could you please rebase this PR?

chia7712 avatar Feb 06 '25 10:02 chia7712