node-rdkafka icon indicating copy to clipboard operation
node-rdkafka copied to clipboard

unable to consume specific amount of messages without waiting with a setTimeout

Open danrevah opened this issue 7 years ago • 8 comments

Environment Information

  • OS [e.g. Mac, Arch, Windows 10]: RedHat
  • Node Version [e.g. 8.2.1]: 10.6.0
  • NPM Version [e.g. 5.4.2]: 6.1.0
  • C++ Toolchain [e.g. Visual Studio, llvm, g++]:
  • node-rdkafka version [e.g. 2.3.3]: 2.3.2
// Non-flowing mode
consumer.connect();

consumer
  .on('ready', function() {
    consumer.subscribe(['librdtesting-01']);

    // Read only 1000 messages
    consumer.consume(1000); // <---------- This is not working...
  })
  .on('data', function(data) {
    console.log('Message found!  Contents below.');
    console.log(data.value.toString());
  });

If i'm replacing the line that I've marked above with the following, then it does work:

setTimeout(() => consumer.consume(1000), 60000); // <-- this is working..

Could this be related to node-rdkafka? or is it something that has to do with my kafka configuration?

danrevah avatar Jul 12 '18 17:07 danrevah

You need to handle the potential error case in consume through its callback. There is a chance, especially directly after a consume, that the consume will fail because a timeout or because a rebalance is happening.

In your code that executes a timeout and then does it, it is likely that the fetcher is already set up and it works coincidentally.

webmakersteve avatar Jul 19 '18 02:07 webmakersteve

Is there not an error event emitted? I am seeing something very similar, but error is null and the messages array is empty. It seems like ready is being fired before the consumer has fetched partition offsets, according to the logs when i set debug to `consumer. Also, it appears that the offset is being incremented even though I am not receiving the messages through the data or consume callbacks

thebigredgeek avatar Oct 08 '18 16:10 thebigredgeek

@thebigredgeek This is also exactly what I was experiencing: Error is null, messages array is empty and offset is being incremented (unless configured to not have automatic offset handling).

noderat avatar Nov 05 '18 18:11 noderat

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.

stale[bot] avatar Feb 03 '19 23:02 stale[bot]

Multiple people have the same problem (myself included). This should not be closed.

rosshadden avatar Mar 18 '19 18:03 rosshadden

I'm also facing this issue atm. Using a setTimeout, but the timeout is not reliable

philipyoo avatar Mar 22 '19 18:03 philipyoo

Same issue, Any fix for this problem, without this we cant manually consume data and commit

Albinzr avatar Jan 02 '20 13:01 Albinzr

We are experiencing the same issue. Please re-open this issue.

talaltoukan avatar May 23 '22 10:05 talaltoukan