pulsar
pulsar copied to clipboard
[improve][broker] Improve checking topic exists
Motivation
The org.apache.pulsar.broker.admin.AdminResource#checkTopicExistsAsync
cannot correctly check the topic exists.
Create a non-persistent topic with partitions 3:
admin.topics().createPartitionedTopic(partitionedTopicName, 3); // 1
admin.topics().createPartitionedTopic(partitionedTopicName, 3); // 2
See the following log:
2023-12-26T15:33:21,419 - WARN - [AsyncHttpClient-83-1:BaseResource$1@135] - [http://localhost:57405/admin/v2/non-persistent/my-property/my-ns/tp_-5d732a7b-6ace-4a71-b2f9-35d103215fc1/partitions?createLocalTopicOnly=false] Failed to perform http put request: javax.ws.rs.ClientErrorException: HTTP 409 {"reason":"Partitioned topic already exists"}
Partitioned topic already exists
has been thrown by org.apache.pulsar.broker.admin.AdminResource#provisionPartitionedTopicPath
, but I expect this error is This topic already exists
:
protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions,
boolean createLocalTopicOnly, Map<String, String> properties) {
// ....
.thenCompose(__ -> checkTopicExistsAsync(topicName))
.thenAccept(exists -> {
if (exists) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
throw new RestException(Status.CONFLICT, "This topic already exists");
}
})
.thenCompose(__ -> provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties))
.thenCompose(__ -> tryCreatePartitionsAsync(numPartitions))
// ....
}
This means checkTopicExistsAsync
is incorrect.
Modifications
- Make the
checkTopicExistsAsync
to call theorg.apache.pulsar.broker.namespace.NamespaceService#checkTopicExists
.
Verifying this change
org.apache.pulsar.broker.namespace.NamespaceServiceTest#testCheckTopicExists
can cover this change.
Documentation
- [ ]
doc
- [ ]
doc-required
- [x]
doc-not-needed
- [ ]
doc-complete
/pulsarbot rerun-failure-checks
/pulsarbot rerun-failure-checks