question: Correct way to check consumer/producer health using KafkaJS API
First of all, I just want to say how happy I am to see confluent release an official JS client. Love the idea to make it compatible with KafkaJS/node-rdkafka. :+1:
Now for the issue
I don't see a way to get the current state / health of a consumer or producer using the KafkaJS api.
When using the node-rdkafka api, it looks like I can hook into lifecycle events using consumer.on('ready',...) etc.
But when using the KafkaJS api, the .on implementation is not implemented, and the state is marked private. With KafkaJS we were previously hooking into these lifecycle events to keep track of the consumer state, which we used for our Kubernetes readiness probes.
Is there any plan to implement events on the KafkaJS api?
Thanks for the feedback!
We don't yet plan to implement the on stuff in the near future. Using await on the connect method is identical to consumer.on('ready' ...), since internally, we're using that lifecycle event to determine when the connect promise resolves.
What states are you looking to track? I can try and help you with those particular cases (for instance, for rebalance, we have made a rebalance callback available), and think if a more generalized approach is possible.
Thanks for the quick response.
We need to know the ongoing health / connectivity for the clients. For example, if our consumer gets removed from the group, or disconnects for some reason, our service would report this to Kubernetes as unhealthy and our service will be gracefully shutdown.
Specifying rebalance_cb won't work since as soon as that callback is specified then we lose the default rebalancing behaviour, and need to implement it ourselves (we could, but that's overkill when all you want is the current state).
We're doing something like this with KafkaJS. Basically, just need to know if the consumer is connected or not.
export function isConsumerHealthy() {
return consumerConnected || rebalancing
}
const { CONNECT, DISCONNECT, REBALANCING, GROUP_JOIN } = kafkaConsumer.events
kafkaConsumer.on(CONNECT, async (e) => {
consumerConnected = true
rebalancing = false
})
kafkaConsumer.on(GROUP_JOIN, async () => {
consumerConnected = true
rebalancing = false
})
kafkaConsumer.on(DISCONNECT, (e) => {
consumerConnected = false
rebalancing = false
})
kafkaConsumer.on(REBALANCING, (e) => {
rebalancing = true
})
It would be much easier to just be able to access the state of the consumer/producer/admin client. But events do give us a chance to react to state changes if necessary.
Any reason why there's no plans to expose events in the KafkaJS api? It's supported through the rd-kafka API so it doesn't seem too difficult to implement.
Possible solutions
-
Make
statea public property getter on consumer/producer/admin.- example
kafkaConsumer.state === ConsumerStates.CONNECTED
- example
-
Expose isConnected() etc from
internalClient- example
kafkaProducer.isConnected()
- example
-
Expose internalClient eventEmitter passthrough
- example
kafkaConsumer.on('rebalancing', () => {})
Could be implemented by just passing the listener to
internalClientwithout having to exposeinternalClient, I don't think we need to support event parity with KafkaJS, so just exposingKafkaConsumerEventswould be fine. - example
Maybe something like this
// lib/kafkajs/_consumer.js
// Sorry for the typescript :)
on(event: KafkaConsumerEvents, listener: EventListener<KafkaConsumerEvents>) {
return #internalConsumer.on(event, listener)
}
Hey @SeanReece, we're generally very slow to expose any public interface because it means that we'll have to support it forever (we go as far as to deprecate something, but removing it completely is something and breaking something that previously worked is almost impossible).
In general, the idea of re-emitting events actually does make sense, because one can get them through the node-rdkafka API already. It's probably possible to emit a subset of the 'on' events, and throw an error if someone tries to register a function for an unsupported event type.
I'll discuss this internally and see if we can possibly expose such an API. We also need a way to expose client-level errors, which isn't present at the moment, so maybe we could use this for that reason too.
Two things specifically for your case though:
- since we're using librdkafka internally, the consumer will never disconnect without the user calling disconnect explicitly. Even if there is a network issue and we lose connectivity to the brokers, it'll keep on trying to connect forever (though eventually it'll leave the group on its own, and there will be no messages).
- there's no way to indicate the start of a rebalance. We raise an event for the rebalance after a few network calls with the broker, when we are actually assigned some partitions (or if some partitions are revoked). If there's a misbehaving client, it might take up to
sessionTimeout(default 30) seconds between the start of the rebalance and the assignment of partitions.
You can specify rebalance_cb in the KafkaJS consumer interface without needing to implement rebalancing, I've been testing this for a while. As long as you return undefined, the client will take that as a signal you aren't changing the assignment and will use the default implementation, or whatever partitionAssigners you configured.
In my rebalance_cb, i apply a small debounce and then emit an event to higher levels of the system