fix: Apply timeout correctly to consumeNum
What
Applies timeout to the entire batch instead of on a per message basis.
How
The calls to the individual consume share the batch timeout and each call to consume can only use the timeout that the batch has remaining.
Why
The existing behavior is described in detail in this issue.
In short, for d millisecond delay, c count, and b blocking time: b = d * c, given a constant topic rpm 60000 / d.
Given any rpm > 60000 / d: b = c * (60 / rpm) seconds
For example, a 1000ms delay with a batch count of 100 will cause the consumeNum loop to block for up to 100 seconds given a constant topic rpm of 60.
References
Issue: https://github.com/confluentinc/confluent-kafka-javascript/issues/262 PR Introduced: https://github.com/Blizzard/node-rdkafka/pull/34
PRs in node-rdkafka addressing the same issue: https://github.com/Blizzard/node-rdkafka/pull/1061 https://github.com/Blizzard/node-rdkafka/pull/1053
Test & Review
With c = 100, d = 1000, rpm = 700, expectation is that pre-change we'd block for about 8.6 seconds (it seems like it takes longer in the example, though) before returning the 100 messages.
My testing environment has a 50ms cooldown on calling consume, so after the change we expect a batch of messages to be returned every 1050ms.
Pre-Change
Post-Change
I've not reviewed the PR completely yet, but this bug has been there long enough that I assume someone might have started accidentally relying on this behaviour. Could you make this change conditional on calling a setter before consuming? Similar to how "setDefaultIsTimeoutOnlyForFirstMessage" works currently in the code. We can keep it 'false' for now and make it 'true' when we do a major version release.
Besides that, you could add a failing test case in the automated test that passes after this change. too? Let me know if you have any issues adding that.
Thanks a lot for this PR!
I've not reviewed the PR completely yet, but this bug has been there long enough that I assume someone might have started accidentally relying on this behaviour. Could you make this change conditional on calling a setter before consuming? Similar to how "setDefaultIsTimeoutOnlyForFirstMessage" works currently in the code. We can keep it 'false' for now and make it 'true' when we do a major version release.
Besides that, you could add a failing test case in the automated test that passes after this change. too? Let me know if you have any issues adding that.
Thanks a lot for this PR!
Sure sure, that makes sense lol Updated so the new behavior is opt in.
Also added two e2e tests that spec both behaviors as if they're desired. Let me know what you think! :pray:
@milindl I don't really know etiquette around this, but crossed my mind today, so thought I'd check in. Let me know if there are any other updates I can make.