confluent-kafka-javascript icon indicating copy to clipboard operation
confluent-kafka-javascript copied to clipboard

fix: Apply timeout correctly to consumeNum

Open andrewhessler opened this issue 6 months ago • 3 comments

What

Applies timeout to the entire batch instead of on a per message basis.

How

The calls to the individual consume share the batch timeout and each call to consume can only use the timeout that the batch has remaining.

Why

The existing behavior is described in detail in this issue.

In short, for d millisecond delay, c count, and b blocking time: b = d * c, given a constant topic rpm 60000 / d.

Given any rpm > 60000 / d: b = c * (60 / rpm) seconds

For example, a 1000ms delay with a batch count of 100 will cause the consumeNum loop to block for up to 100 seconds given a constant topic rpm of 60.

References

Issue: https://github.com/confluentinc/confluent-kafka-javascript/issues/262 PR Introduced: https://github.com/Blizzard/node-rdkafka/pull/34

PRs in node-rdkafka addressing the same issue: https://github.com/Blizzard/node-rdkafka/pull/1061 https://github.com/Blizzard/node-rdkafka/pull/1053

Test & Review

With c = 100, d = 1000, rpm = 700, expectation is that pre-change we'd block for about 8.6 seconds (it seems like it takes longer in the example, though) before returning the 100 messages.

My testing environment has a 50ms cooldown on calling consume, so after the change we expect a batch of messages to be returned every 1050ms.

Pre-Change confluent-node-rdkafka-pre2

Post-Change confluent-node-rdkafka-post2

andrewhessler avatar Jun 08 '25 15:06 andrewhessler

I've not reviewed the PR completely yet, but this bug has been there long enough that I assume someone might have started accidentally relying on this behaviour. Could you make this change conditional on calling a setter before consuming? Similar to how "setDefaultIsTimeoutOnlyForFirstMessage" works currently in the code. We can keep it 'false' for now and make it 'true' when we do a major version release.

Besides that, you could add a failing test case in the automated test that passes after this change. too? Let me know if you have any issues adding that.

Thanks a lot for this PR!

milindl avatar Aug 01 '25 11:08 milindl

I've not reviewed the PR completely yet, but this bug has been there long enough that I assume someone might have started accidentally relying on this behaviour. Could you make this change conditional on calling a setter before consuming? Similar to how "setDefaultIsTimeoutOnlyForFirstMessage" works currently in the code. We can keep it 'false' for now and make it 'true' when we do a major version release.

Besides that, you could add a failing test case in the automated test that passes after this change. too? Let me know if you have any issues adding that.

Thanks a lot for this PR!

Sure sure, that makes sense lol Updated so the new behavior is opt in.

Also added two e2e tests that spec both behaviors as if they're desired. Let me know what you think! :pray:

andrewhessler avatar Aug 03 '25 18:08 andrewhessler

@milindl I don't really know etiquette around this, but crossed my mind today, so thought I'd check in. Let me know if there are any other updates I can make.

andrewhessler avatar Oct 12 '25 18:10 andrewhessler