KAFKA-17218: kafka-consumer-groups should handle describe groups with deleted topics
The script will return "-" as placeholder for offsets and lag if topic partition is unknown when describe groups.
Note:
- I added integration test only as there is no unit test for this command
- I moved
produceRecordfromDeleteOffsetsConsumerGroupCommandIntegrationTesttoConsumerGroupCommandTestUtilsto be used inDescribeConsumerGroupTest. - I also changed
generatormethod to allow passing server config to disable auto creation of the topic as topic keep getting recreated withoutauto.create.topics.enabledisabled
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
I tried this code and it didn't really behave quite right I feel. I did:
- Create 2 topics T1 and T2
- Reset offsets of consumer group G1 consuming from earliest offset of T1 with
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T1 --group G1 --to-earliest --execute - Reset offsets of consumer group G1 consuming from earliest offset of T2 with
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T2 --group G1 --to-earliest --execute - Start
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T1 --group G1 --to-earliest --executeunder a debugger and run up to a breakpoint inConsumerGroupCommand.getLogStartOffsets - Delete topic T1 so that the new exception handling code is executed.
- Resume the debugger.
The output is as follows:
[2024-08-02 12:22:13,881] ERROR [AdminClient clientId=adminclient-1] Received unknown topic error for topic T1 (org.apache.kafka.clients.admin.internals.PartitionLeaderStrategy)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
Error getting starting offset of topic partition: T1-0
Option Description
------ -----------
--all-groups Apply to all consumer groups.
--all-topics Consider all topics assigned to a
group in the `reset-offsets` process.
--bootstrap-server <String: server to REQUIRED: The server(s) to connect to.
connect to>
--by-duration <String: duration> Reset offsets to offset by duration
from current timestamp. Format:
'PnDTnHnMnS'
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client and Consumer.
--delete Pass in groups to delete topic
partition offsets and ownership
information over the entire consumer
group. For instance --group g1 --
group g2
--delete-offsets Delete offsets of consumer group.
Supports one consumer group at the
time, and multiple topics.
--describe Describe consumer group and list
offset lag (number of messages not
yet processed) related to given
group.
--dry-run Only show results without executing
changes on Consumer Groups.
Supported operations: reset-offsets.
--execute Execute operation. Supported
operations: reset-offsets.
--export Export operation execution to a CSV
file. Supported operations: reset-
offsets.
--from-file <String: path to CSV file> Reset offsets to values defined in CSV
file.
--group <String: consumer group> The consumer group we wish to act on.
--help Print usage information.
--list List all consumer groups.
--members Describe members of the group. This
option may be used with '--describe'
and '--bootstrap-server' options
only.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
members
--offsets Describe the group and list all topic
partitions in the group along with
their offset lag. This is the
default sub-action of and may be
used with '--describe' and '--
bootstrap-server' options only.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
offsets
--reset-offsets Reset offsets of consumer group.
Supports one consumer group at the
time, and instances should be
inactive
Has 2 execution options: --dry-run
(the default) to plan which offsets
to reset, and --execute to update
the offsets. Additionally, the --
export option is used to export the
results to a CSV format.
You must choose one of the following
reset specifications: --to-datetime,
--by-duration, --to-earliest, --to-
latest, --shift-by, --from-file, --
to-current, --to-offset.
To define the scope use --all-topics
or --topic. One scope must be
specified unless you use '--from-
file'.
--shift-by <Long: number-of-offsets> Reset offsets shifting current offset
by 'n', where 'n' can be positive or
negative.
--state [String] When specified with '--describe',
includes the state of the group.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
state
When specified with '--list', it
displays the state of all groups. It
can also be used to list groups with
specific states.
Example: --bootstrap-server localhost:
9092 --list --state stable,empty
This option may be used with '--
describe', '--list' and '--bootstrap-
server' options only.
--timeout <Long: timeout (ms)> The timeout that can be set for some
use cases. For example, it can be
used when describing the group to
specify the maximum amount of time
in milliseconds to wait before the
group stabilizes (when the group is
just created, or is going through
some changes). (default: 5000)
--to-current Reset offsets to current offset.
--to-datetime <String: datetime> Reset offsets to offset from datetime.
Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest Reset offsets to earliest offset.
--to-latest Reset offsets to latest offset.
--to-offset <Long: offset> Reset offsets to a specific offset.
--topic <String: topic> The topic whose consumer group
information should be deleted or
topic whose should be included in
the reset offset process. In `reset-
offsets` case, partitions can be
specified using this format: `topic1:
0,1,2`, where 0,1,2 are the
partition to be included in the
process. Reset-offsets also supports
multiple topic inputs.
--type [String] When specified with '--list', it
displays the types of all the
groups. It can also be used to list
groups with specific types.
Example: --bootstrap-server localhost:
9092 --list --type classic,consumer
This option may be used with the '--
list' option only.
--verbose Provide additional information, if
any, when describing the group. This
option may be used with '--
offsets'/'--members'/'--state' and
'--bootstrap-server' options only.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
members --verbose
--version Display Kafka version.
The command reports the error neatly, but then it seems to print out the command usage message.
I tried this code and it didn't really behave quite right I feel. I did:
- Create 2 topics T1 and T2
- Reset offsets of consumer group G1 consuming from earliest offset of T1 with
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T1 --group G1 --to-earliest --execute- Reset offsets of consumer group G1 consuming from earliest offset of T2 with
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T2 --group G1 --to-earliest --execute- Start
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T1 --group G1 --to-earliest --executeunder a debugger and run up to a breakpoint inConsumerGroupCommand.getLogStartOffsets- Delete topic T1 so that the new exception handling code is executed.
- Resume the debugger.
The output is as follows:
[2024-08-02 12:22:13,881] ERROR [AdminClient clientId=adminclient-1] Received unknown topic error for topic T1 (org.apache.kafka.clients.admin.internals.PartitionLeaderStrategy) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. Error getting starting offset of topic partition: T1-0 Option Description ------ ----------- --all-groups Apply to all consumer groups. --all-topics Consider all topics assigned to a group in the `reset-offsets` process. --bootstrap-server <String: server to REQUIRED: The server(s) to connect to. connect to> --by-duration <String: duration> Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS' --command-config <String: command Property file containing configs to be config property file> passed to Admin Client and Consumer. --delete Pass in groups to delete topic partition offsets and ownership information over the entire consumer group. For instance --group g1 -- group g2 --delete-offsets Delete offsets of consumer group. Supports one consumer group at the time, and multiple topics. --describe Describe consumer group and list offset lag (number of messages not yet processed) related to given group. --dry-run Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets. --execute Execute operation. Supported operations: reset-offsets. --export Export operation execution to a CSV file. Supported operations: reset- offsets. --from-file <String: path to CSV file> Reset offsets to values defined in CSV file. --group <String: consumer group> The consumer group we wish to act on. --help Print usage information. --list List all consumer groups. --members Describe members of the group. This option may be used with '--describe' and '--bootstrap-server' options only. Example: --bootstrap-server localhost: 9092 --describe --group group1 -- members --offsets Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with '--describe' and '-- bootstrap-server' options only. Example: --bootstrap-server localhost: 9092 --describe --group group1 -- offsets --reset-offsets Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets. Additionally, the -- export option is used to export the results to a CSV format. You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, --to- latest, --shift-by, --from-file, -- to-current, --to-offset. To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from- file'. --shift-by <Long: number-of-offsets> Reset offsets shifting current offset by 'n', where 'n' can be positive or negative. --state [String] When specified with '--describe', includes the state of the group. Example: --bootstrap-server localhost: 9092 --describe --group group1 -- state When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. Example: --bootstrap-server localhost: 9092 --list --state stable,empty This option may be used with '-- describe', '--list' and '--bootstrap- server' options only. --timeout <Long: timeout (ms)> The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000) --to-current Reset offsets to current offset. --to-datetime <String: datetime> Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss' --to-earliest Reset offsets to earliest offset. --to-latest Reset offsets to latest offset. --to-offset <Long: offset> Reset offsets to a specific offset. --topic <String: topic> The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. In `reset- offsets` case, partitions can be specified using this format: `topic1: 0,1,2`, where 0,1,2 are the partition to be included in the process. Reset-offsets also supports multiple topic inputs. --type [String] When specified with '--list', it displays the types of all the groups. It can also be used to list groups with specific types. Example: --bootstrap-server localhost: 9092 --list --type classic,consumer This option may be used with the '-- list' option only. --verbose Provide additional information, if any, when describing the group. This option may be used with '-- offsets'/'--members'/'--state' and '--bootstrap-server' options only. Example: --bootstrap-server localhost: 9092 --describe --group group1 -- members --verbose --version Display Kafka version.The command reports the error neatly, but then it seems to print out the command usage message.
I didn't test reset-offset as the jira is about the describe --all-groups which prevent showing metadata for the whole cluster. I think reset-offset for an individual group should behave as it is.
The print most likely because we don't throw anymore the exception I'll fix this and throw it only in this case.
I tried this code and it didn't really behave quite right I feel. I did:
- Create 2 topics T1 and T2
- Reset offsets of consumer group G1 consuming from earliest offset of T1 with
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T1 --group G1 --to-earliest --execute- Reset offsets of consumer group G1 consuming from earliest offset of T2 with
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T2 --group G1 --to-earliest --execute- Start
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --topic T1 --group G1 --to-earliest --executeunder a debugger and run up to a breakpoint inConsumerGroupCommand.getLogStartOffsets- Delete topic T1 so that the new exception handling code is executed.
- Resume the debugger.
The output is as follows:
[2024-08-02 12:22:13,881] ERROR [AdminClient clientId=adminclient-1] Received unknown topic error for topic T1 (org.apache.kafka.clients.admin.internals.PartitionLeaderStrategy) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. Error getting starting offset of topic partition: T1-0 Option Description ------ ----------- --all-groups Apply to all consumer groups. --all-topics Consider all topics assigned to a group in the `reset-offsets` process. --bootstrap-server <String: server to REQUIRED: The server(s) to connect to. connect to> --by-duration <String: duration> Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS' --command-config <String: command Property file containing configs to be config property file> passed to Admin Client and Consumer. --delete Pass in groups to delete topic partition offsets and ownership information over the entire consumer group. For instance --group g1 -- group g2 --delete-offsets Delete offsets of consumer group. Supports one consumer group at the time, and multiple topics. --describe Describe consumer group and list offset lag (number of messages not yet processed) related to given group. --dry-run Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets. --execute Execute operation. Supported operations: reset-offsets. --export Export operation execution to a CSV file. Supported operations: reset- offsets. --from-file <String: path to CSV file> Reset offsets to values defined in CSV file. --group <String: consumer group> The consumer group we wish to act on. --help Print usage information. --list List all consumer groups. --members Describe members of the group. This option may be used with '--describe' and '--bootstrap-server' options only. Example: --bootstrap-server localhost: 9092 --describe --group group1 -- members --offsets Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with '--describe' and '-- bootstrap-server' options only. Example: --bootstrap-server localhost: 9092 --describe --group group1 -- offsets --reset-offsets Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets. Additionally, the -- export option is used to export the results to a CSV format. You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, --to- latest, --shift-by, --from-file, -- to-current, --to-offset. To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from- file'. --shift-by <Long: number-of-offsets> Reset offsets shifting current offset by 'n', where 'n' can be positive or negative. --state [String] When specified with '--describe', includes the state of the group. Example: --bootstrap-server localhost: 9092 --describe --group group1 -- state When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. Example: --bootstrap-server localhost: 9092 --list --state stable,empty This option may be used with '-- describe', '--list' and '--bootstrap- server' options only. --timeout <Long: timeout (ms)> The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000) --to-current Reset offsets to current offset. --to-datetime <String: datetime> Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss' --to-earliest Reset offsets to earliest offset. --to-latest Reset offsets to latest offset. --to-offset <Long: offset> Reset offsets to a specific offset. --topic <String: topic> The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. In `reset- offsets` case, partitions can be specified using this format: `topic1: 0,1,2`, where 0,1,2 are the partition to be included in the process. Reset-offsets also supports multiple topic inputs. --type [String] When specified with '--list', it displays the types of all the groups. It can also be used to list groups with specific types. Example: --bootstrap-server localhost: 9092 --list --type classic,consumer This option may be used with the '-- list' option only. --verbose Provide additional information, if any, when describing the group. This option may be used with '-- offsets'/'--members'/'--state' and '--bootstrap-server' options only. Example: --bootstrap-server localhost: 9092 --describe --group group1 -- members --verbose --version Display Kafka version.The command reports the error neatly, but then it seems to print out the command usage message.
Hi @AndrewJSchofield The code is behaving like this because prepareOffsetsToReset has the following condition
https://github.com/apache/kafka/blob/3ddd8d0a0ec02eab8d9083d341ece14961fc0d1c/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java#L1001
While previously we were actually crashing before hitting this point as getLogStartOffsets would crash if topic partition not found. I can add a logic to crash if topic not found in this cases back. But I think the behaviour you are seeing now is the original intent based on the code.
Code looks good to me, but I got a failure of the new test:
org.opentest4j.AssertionFailedError: Condition not met within timeout 4000. Expected a data row and no error in describe results after deleting one of the consumed topics. ==> expected: <true> but was: <false>
at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
at app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:397)
at app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:445)
at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:394)
at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378)
at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368)
at app//org.apache.kafka.tools.consumer.group.DescribeConsumerGroupTest.testDescribeAllExistingGroupsWithSomeDeletedTopics(DescribeConsumerGroupTest.java:317)```
@dajac can you have a look when you have time?
@chia7712 can you have a look when you have time?
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.
If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).
If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.
This PR has been closed since it has not had any activity in 120 days. If you feel like this was a mistake, or you would like to continue working on it, please feel free to re-open the PR and ask for a review.
@OmniaGM sorry for late review. Could you please rebase this PR?