kafka
kafka copied to clipboard
How to check if topic exists?
Is there a quick way to check if a topic exists? I understand not wanting to support auto create, but in my integration test scripts I typically use the auto-create feature with other libraries.
So, with no-kafka, in lieu of auto-create I am using the wurstmeister/kafka Docker image which has a nice KAFKA_CREATE_TOPICS
environment variable which lets you auto-create topics at startup.
Right now I just have an arbitrary timeout at the beginning of the test scripts that knows about how long it takes the topics to be available, but would like to just poll kafka instead at the beginning of the script to make the test run as quickly as possible.
You can for example create Producer
instance and call producer.client.getTopicPartitions()
untill it stops being rejected.
Ok, this has been a huge pain, but I finally figured it out. Hopefully this code can help others.
// Just a function that waits x ms, you can also use a setTimeout
const sleep = async (ms) => {
return new Promise((resolve, reject) => {
setTimeout(() => resolve(), ms)
})
}
// takes in a list of strategies (eg [{ subscriptions: ['a', 'b'], handler: a }, { subscriptions: ['c', 'd'], handler: b }]
// and keeps trying to get topic partitions until none of them throws
const allTopicsCreated = async (producer, strategies, interval = 1000) => {
const topics = _.flatten(_.map(strategies, (s) => s.subscriptions))
try {
await Promise.all(_.map(topics, async (t) => await producer.client.getTopicPartitions(t)))
return true
} catch (err) {
await sleep(interval)
return allTopicsCreated(producer, strategies, interval)
}
}
First you need to create and init a Producer
, for example (in some async
function):
const producer = new Producer({
clientId: "check-topics",
connectionString: "kafka:9092"
})
// creates a producer for the sole purpose of checking when topics are ready
await producer.init()
// returns when topics have all been created
await allTopicsCreated(producer, strategies)
// finally init your consumer(s)
consumer.init(strategies)