kafka icon indicating copy to clipboard operation
kafka copied to clipboard

How to check if topic exists?

Open jdconley opened this issue 7 years ago • 2 comments

Is there a quick way to check if a topic exists? I understand not wanting to support auto create, but in my integration test scripts I typically use the auto-create feature with other libraries.

So, with no-kafka, in lieu of auto-create I am using the wurstmeister/kafka Docker image which has a nice KAFKA_CREATE_TOPICS environment variable which lets you auto-create topics at startup.

Right now I just have an arbitrary timeout at the beginning of the test scripts that knows about how long it takes the topics to be available, but would like to just poll kafka instead at the beginning of the script to make the test run as quickly as possible.

jdconley avatar Jul 19 '17 18:07 jdconley

You can for example create Producer instance and call producer.client.getTopicPartitions() untill it stops being rejected.

oleksiyk avatar Jul 19 '17 19:07 oleksiyk

Ok, this has been a huge pain, but I finally figured it out. Hopefully this code can help others.

// Just a function that waits x ms, you can also use a setTimeout
const sleep = async (ms) => {
    return new Promise((resolve, reject) => {
        setTimeout(() => resolve(), ms)
    })
}

// takes in a list of strategies (eg [{ subscriptions: ['a', 'b'], handler: a }, { subscriptions: ['c', 'd'], handler: b }]
// and keeps trying to get topic partitions until none of them throws
const allTopicsCreated = async (producer, strategies, interval = 1000) => {
    const topics = _.flatten(_.map(strategies, (s) => s.subscriptions))
    try {
        await Promise.all(_.map(topics, async (t) => await producer.client.getTopicPartitions(t)))
        return true
    } catch (err) {
        await sleep(interval)
        return allTopicsCreated(producer, strategies, interval)
    }
}

First you need to create and init a Producer, for example (in some async function):

const producer = new Producer({
    clientId: "check-topics",
    connectionString: "kafka:9092"
})

// creates a producer for the sole purpose of checking when topics are ready
await producer.init()
// returns when topics have all been created
await allTopicsCreated(producer, strategies)
// finally init your consumer(s)
consumer.init(strategies)

ale316 avatar Dec 15 '17 06:12 ale316