pulsar
pulsar copied to clipboard
[fix][client] Fix pattern consumer create crash if a part of partitions of a topic have been deleted
Motivation
Background
1.Regarding the API getting topics by regexp pattern, there are two implementations:*
List<String> HttpLookupService.getPartitionedTopicMetadata(...)CommandGetTopicsOfNamespace BinaryProtoLookupService.getPartitionedTopicMetadata(...)
Pulsar transferred both response types List<String> and CommandGetTopicsOfNamespace to a GetTopicsResult object. And discarded the partition information when doing transference. For example:
- Get a list
topic-1-partition-0,topic-1-partition-1. - The transferring operation will group them to
topic-1.
2.The behavior of Patten consumers
- Before https://github.com/apache/pulsar/pull/5230, Patten consumers will try to create the missing partitions when it starts up.
- After https://github.com/apache/pulsar/pull/5230, Patten consumers only subscribe to the partitions that exist to lead the topics can be deleted automatically. In other words, it sets the variable
createTopicIfDoesNotExistof Multi Topics Consumer tofalse.
Issue
- When users are starting a Pattern consumer, the consumer will try to subscribe to all the partitions even if some partitions have been deleted before, and then the client crashes due to a Topic Not Exists Exception. You can reproduce the issue by the test testConsumerAfterOnePartDeleted
- The Patten consumer that is started removes all partitions when one partition was deleted, even if there are still half of the partitions exist.
Modifications
- Multi Topics Consumer only subscribes to the existing partitions if the config
createTopicIfDoesNotExistisfalse. - The Patten consumer that is started only removes the partitions that were deleted.
Addational things
flink-connector-pulsar also has the same issue
Documentation
- [ ]
doc - [ ]
doc-required - [x]
doc-not-needed - [ ]
doc-complete
Matching PR in forked repository
PR in forked repository: x
/pulsarbot rerun-failure-checks
Rebased master
Closing and reopening to trigger CI with latest master branch changes.
Codecov Report
Attention: Patch coverage is 73.77049% with 80 lines in your changes missing coverage. Please review.
Project coverage is 73.44%. Comparing base (
bbc6224) to head (8a7bafa). Report is 443 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #22854 +/- ##
============================================
- Coverage 73.57% 73.44% -0.13%
- Complexity 32624 33337 +713
============================================
Files 1877 1913 +36
Lines 139502 143383 +3881
Branches 15299 15637 +338
============================================
+ Hits 102638 105308 +2670
- Misses 28908 30038 +1130
- Partials 7956 8037 +81
| Flag | Coverage Δ | |
|---|---|---|
| inttests | 27.77% <6.88%> (+3.19%) |
:arrow_up: |
| systests | 24.69% <15.73%> (+0.36%) |
:arrow_up: |
| unittests | 72.50% <73.77%> (-0.35%) |
:arrow_down: |
Flags with carried forward coverage won't be shown. Click here to find out more.
| Files | Coverage Δ | |
|---|---|---|
| ...va/org/apache/pulsar/client/impl/ConsumerBase.java | 74.21% <100.00%> (+0.09%) |
:arrow_up: |
| ...a/org/apache/pulsar/client/impl/LookupService.java | 0.00% <ø> (ø) |
|
| ...rg/apache/pulsar/client/impl/TopicListWatcher.java | 66.91% <100.00%> (-0.95%) |
:arrow_down: |
| ...g/apache/pulsar/common/lookup/GetTopicsResult.java | 92.50% <100.00%> (+30.00%) |
:arrow_up: |
| ...ava/org/apache/pulsar/common/naming/TopicName.java | 96.09% <100.00%> (+0.03%) |
:arrow_up: |
| ...java/org/apache/pulsar/common/util/FutureUtil.java | 76.61% <50.00%> (+2.06%) |
:arrow_up: |
| ...rg/apache/pulsar/client/impl/PulsarClientImpl.java | 73.94% <12.50%> (-0.37%) |
:arrow_down: |
| ...he/pulsar/client/impl/MultiTopicsConsumerImpl.java | 77.58% <71.66%> (-0.15%) |
:arrow_down: |
| ...pulsar/client/impl/PatternConsumerUpdateQueue.java | 79.61% <79.61%> (ø) |
|
| ...ar/client/impl/PatternMultiTopicsConsumerImpl.java | 76.88% <69.64%> (-6.58%) |
:arrow_down: |