kop icon indicating copy to clipboard operation
kop copied to clipboard

[BUG] Topics that are not subscribed will not be deleted from zk

Open wenbingshen opened this issue 3 years ago • 0 comments

Describe the bug A clear and concise description of what the bug is.

When a topic is not consumed, call KafkaAdminClient to delete this topic. This topic will create a child node under the zk path /kop//delete_topics. Since there is no subscription information, it will not be deleted from io.streamnative.pulsar. handlers.kop.KopEventManager.DeleteTopicsEvent

The following test method can verify that the zk node will always exist on zk after deleting this topic.

To Reproduce This is an existing test method which from io.streamnative.pulsar.handlers.kop.KafkaRequestHandlerTest#testDeleteTopicsAndCheckChildPath

    @Test(timeOut = 10000)
    public void testDeleteTopicsAndCheckChildPath() throws Exception {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort());

        @Cleanup
        AdminClient kafkaAdmin = AdminClient.create(props);
        Map<String, Integer> topicToNumPartitions = new HashMap<String, Integer>(){{
            put("testCreateTopics-0", 1);
            put("testCreateTopics-1", 3);
            put("my-tenant/my-ns/testCreateTopics-2", 1);
            put("persistent://my-tenant/my-ns/testCreateTopics-3", 5);
        }};
        // create
        createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions);
        verifyTopicsCreatedByPulsarAdmin(topicToNumPartitions);
        // delete
        deleteTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions.keySet());
        verifyTopicsDeletedByPulsarAdmin(topicToNumPartitions);
        // check deleted topics path
        Set<String> deletedTopics = handler.getPulsarService()
                .getBrokerService()
                .getPulsar()
                .getLocalMetadataStore()
                .getChildren(KopEventManager.getDeleteTopicsPath())
                .join()
                .stream()
                .map((TopicNameUtils::getTopicNameWithUrlDecoded))
                .collect(Collectors.toSet());

        assertEquals(topicToNumPartitions.keySet(), deletedTopics);
    }

wenbingshen avatar Nov 24 '21 10:11 wenbingshen