node-rdkafka
node-rdkafka copied to clipboard
unable to consume specific amount of messages without waiting with a setTimeout
Environment Information
- OS [e.g. Mac, Arch, Windows 10]: RedHat
- Node Version [e.g. 8.2.1]: 10.6.0
- NPM Version [e.g. 5.4.2]: 6.1.0
- C++ Toolchain [e.g. Visual Studio, llvm, g++]:
- node-rdkafka version [e.g. 2.3.3]: 2.3.2
// Non-flowing mode
consumer.connect();
consumer
.on('ready', function() {
consumer.subscribe(['librdtesting-01']);
// Read only 1000 messages
consumer.consume(1000); // <---------- This is not working...
})
.on('data', function(data) {
console.log('Message found! Contents below.');
console.log(data.value.toString());
});
If i'm replacing the line that I've marked above with the following, then it does work:
setTimeout(() => consumer.consume(1000), 60000); // <-- this is working..
Could this be related to node-rdkafka? or is it something that has to do with my kafka configuration?
You need to handle the potential error case in consume through its callback. There is a chance, especially directly after a consume, that the consume will fail because a timeout or because a rebalance is happening.
In your code that executes a timeout and then does it, it is likely that the fetcher is already set up and it works coincidentally.
Is there not an error event emitted? I am seeing something very similar, but error is null and the messages array is empty. It seems like ready is being fired before the consumer has fetched partition offsets, according to the logs when i set debug to `consumer. Also, it appears that the offset is being incremented even though I am not receiving the messages through the data or consume callbacks
@thebigredgeek This is also exactly what I was experiencing: Error is null, messages array is empty and offset is being incremented (unless configured to not have automatic offset handling).
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.
Multiple people have the same problem (myself included). This should not be closed.
I'm also facing this issue atm. Using a setTimeout, but the timeout is not reliable
Same issue, Any fix for this problem, without this we cant manually consume data and commit
We are experiencing the same issue. Please re-open this issue.