kop
kop copied to clipboard
[BUG] Topics that are not subscribed will not be deleted from zk
Describe the bug A clear and concise description of what the bug is.
When a topic is not consumed, call KafkaAdminClient to delete this topic. This topic will create a child node under the zk path /kop//delete_topics. Since there is no subscription information, it will not be deleted from io.streamnative.pulsar. handlers.kop.KopEventManager.DeleteTopicsEvent
The following test method can verify that the zk node will always exist on zk after deleting this topic.
To Reproduce This is an existing test method which from io.streamnative.pulsar.handlers.kop.KafkaRequestHandlerTest#testDeleteTopicsAndCheckChildPath
@Test(timeOut = 10000)
public void testDeleteTopicsAndCheckChildPath() throws Exception {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort());
@Cleanup
AdminClient kafkaAdmin = AdminClient.create(props);
Map<String, Integer> topicToNumPartitions = new HashMap<String, Integer>(){{
put("testCreateTopics-0", 1);
put("testCreateTopics-1", 3);
put("my-tenant/my-ns/testCreateTopics-2", 1);
put("persistent://my-tenant/my-ns/testCreateTopics-3", 5);
}};
// create
createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions);
verifyTopicsCreatedByPulsarAdmin(topicToNumPartitions);
// delete
deleteTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions.keySet());
verifyTopicsDeletedByPulsarAdmin(topicToNumPartitions);
// check deleted topics path
Set<String> deletedTopics = handler.getPulsarService()
.getBrokerService()
.getPulsar()
.getLocalMetadataStore()
.getChildren(KopEventManager.getDeleteTopicsPath())
.join()
.stream()
.map((TopicNameUtils::getTopicNameWithUrlDecoded))
.collect(Collectors.toSet());
assertEquals(topicToNumPartitions.keySet(), deletedTopics);
}