`KafkaConsumer.consume()` Timeout Not Respected
Description
It appears that the consume method in Kafka.KafkaConsumer does not adhere to the specified setDefaultConsumeTimeout. Instead, it waits for the full batch to be filled before triggering the callback. This behaviour causes delays when consuming from low-traffic topics, as the consumer remains idle until the batch size is satisfied.
Expected Behaviour
The consume method should respect the setDefaultConsumeTimeout and return the available messages within the given time frame, even if the batch is not fully populated.
Actual Behaviour
The consumer only returns after the batch size (1000 in this case) is completely filled, ignoring the timeout.
Code to Reproduce
const Kafka = require("@confluentinc/kafka-javascript");
var consumer = new Kafka.KafkaConsumer({
"metadata.broker.list": "localhost:19092",
"group.id": "confluent-kafka-javascript-consumer-flow-example",
});
const topicName = "slowtopic";
const batch = 1000;
consumer.setDefaultConsumeTimeout(100);
consumer.on("ready", function () {
console.log("Consumer ready.");
consumer.subscribe([topicName]);
consume();
});
function consume() {
if (!consumer.isConnected()) return;
console.time("Consume batch timing");
consumer.consume(batch, (err, messages) => {
if (err) {
console.error(err);
return;
}
console.log(`Received ${messages.length} messages`);
console.timeEnd("Consume batch timing");
consume();
});
}
consumer.connect();
Received 1000 messages
Consume batch timing: 10.068s
Received 1000 messages
Consume batch timing: 10.083s
Received 1000 messages
Consume batch timing: 10.074s
Environment
- Node.js Version:
v22.4.0 - confluent-kafka-javascript version :
1.2.0 - OS:
Linux
Impact
This issue affects consumers in low-traffic environments, causing significant delays and inefficient processing during quiet periods.
Potential Workarounds
Currently, need to write code to dynamically adjust the batch size based on traffic.
Many thanks Pat
Thanks for reporting this @patrykwegrzyn . I can see it.
While we fix it, another potential workaround (with less work required for you), is to use consumer.setDefaultIsTimeoutOnlyForFirstMessage(true).
It will set the timeout so it's only applicable for the first message. We wait upto timeout ms for the first message, and then we don't wait after that, we only gather up any prefetched messages for that topic. Whether this'll work will ultimately depend on your usecase, but I think it's worth testing out, especially if the topic is low-traffic occasionally.
@milindl Thanks for coming back to me so quickly!!!
Regarding consumer.setDefaultIsTimeoutOnlyForFirstMessage(true) 2 issues here
- no types , it does not appear in auto completion in (probably why i have not tried it)
- it does not respect count at all and send 1 message at the time (sort of opposite)
Consume batch timing: 10.04ms
Received 1 messages
Consume batch timing: 9.087ms
Received 1 messages
Consume batch timing: 9.91ms
Received 1 messages
test topic rate
10 msg/s - 1 message in callback
I can see this being useful for low latency use cases, but callback overhead may be quite high , ill wait for the fix as intended setDefaultConsumeTimeout behaviour sound like a good middle ground
@milindl any expected ETA on this fix? It seems like a pretty bad bug, we have quite a lot of latency sensitive setups, where batch sizes are still important and thus far we've managed this via the timeout setting. We just migrated over from node-rdkafka and ran into this issue.
I could lend a hand in fixing, but I don't quite understand the bug (the code being called seems to be in librdkafka and my familiarity with that is quite minimal).
I ended up running into this, and for our use case (subscription API updating a frontend) it's not ideal behavior. I'll go into the details after the quick fix.
Quick Fix Solution
const consumer = new KafkaConsumer(...);
consumer.setDefaultConsumeTimeout(0);
This will make the consume non-blocking. If you don't have your own delay on calls to JS.Confluent.KafkaConsumer.consume and rely on it's blocking nature to prevent the next call, then I'd probably think twice about applying this or do some testing.
If you control your calls to consume with your own delay, then the above solution is probably preferred regardless, as it will prevent the calls to consume from blocking until messages arrive.
This solution still triggers a timeout error to exit the loop, so the behavior is similar to a non-zero timeout.
Also, I guess if you aren't aware of this default timeout, there's a guaranteed 1 second delay on grabbing a ready message, so can see better responsiveness at the call site updating the timeout this way.
Expected Behavior
When consuming from a topic with:
consumer.setDefaultConsumeTimeout(DEFAULT_CONSUME_TIME_OUT); // Default: 1000ms
consumer.consume(BATCH_COUNT); // e.g. 500
It's expected that after 1000ms, the call to consume will conclude and return whatever messages are available.
Actual Behavior
The timeout does not happen on a batch basis, but instead only returns if a message fails to continuously appear on the topic within under 1000ms, up to BATCH_COUNT messages. So, either the call for a batch of messages will return when BATCH_COUNT messages are available, or when it has been 1000ms since a message has arrived on the topic.
Essentially, the batch consume is debounced until messages stop appearing.
My Situation
I have a topic that consistently receives around 250-500 rpm. These messages are processed very quickly, < 8ms. I noticed two things:
- The lag on the topic would keep increasing to our
BATCH_COUNTand then immediately disappear - Our consumer was only processing messages in small 3-5 second chunks and then idling
This resulted in our consumer - that has a theoretical throughput of 3000 rpm - being capped at around 500 rpm.
What's Happening
Callstack
JS.Confluent.KafkaConsumer.consume -> JS.Confluent.KafkaConsumer._consumeNum -> Cpp.Confluent.KafkaConsumer.NodeConsume -> Cpp.Confluent.Worker.KafkaConsumerConsumeNum (Main Loop) -> Cpp.Confluent.KafkaConsumer.Consume -> Cpp.librdkafka.KafkaConsumerImpl.consume -> C.librdkafka.rd_kafka_consume -> C.librdkafka.rd_kafka_consume0 -> etc.
Words
The timeout gets passed to Cpp.Confluent.Worker.KafkaConsumerConsumeNum which gets passed to individual Cpp.Confluent.KafkaConsumer.Consume calls that are only timeouts for each call, rather than the whole batch. The loop gets essentially debounced by receiving new messages and not erroring with a timeout to exit.
Shorter Term Solution
Behave as librdkafka does, check the timeout on a batch basis. I have a version of this here: https://github.com/confluentinc/confluent-kafka-javascript/pull/330
Longer Term Solution
librdkafka does have a batch consume in their API that could be used instead, but would probably take a lot of validation of functionality to shift to and not sure about the EOF event implications.