kafkajs icon indicating copy to clipboard operation
kafkajs copied to clipboard

HeartbeatInterval not respected for parallel heartbeat calls

Open Nevon opened this issue 3 years ago • 6 comments

If you make multiple calls to heartbeat in parallel, the heartbeatInterval will not be respected. For example:

await Promise.all([
  heartbeat(),
  heartbeat(),
  heartbeat(),
])

This will cause 3 hearbeat requests to be made, even though after the first one succeeds, the rest should be skipped since the last heartbeat was < heartbeatInterval ms ago. This could be an issue if you for example heartbeat after each message in a batch with parallel message processing, and you end up making 50 heartbeats at the same time.

https://github.com/tulios/kafkajs/blob/e1a9d9eecd4cfd29e719f1c02cae6e544df0894a/src/consumer/consumerGroup.js#L386-L401

We'll need to add a lock before aquiring the value of lastRequest.

Nevon avatar Feb 11 '21 09:02 Nevon

@Nevon looks like debouncing should be implemented here, what do you think?

XBeg9 avatar Feb 11 '21 09:02 XBeg9

Kind of, but it's not just a matter of a regular debounce, as there are some additional concerns to keep in mind:

  1. We should still not resolve the promises until one has resolved.
  2. If the promise rejects, should we reject all waiting promises as well, or let the next one try (I'm leaning towards the latter).
  3. We should only update the lastRequest if the heartbeat actually succeedeed.

I think it's as simple as just adding a lock on line 389 and then releasing it after you either succeed or fail to heartbeat. We already have the lock functionality built, so it's just a matter of dropping that in.

Nevon avatar Feb 11 '21 09:02 Nevon

I have an implementation for this as part of a POC I'm working on for independent broker fetches.

or let the next one try

I didn't go with this - I let them all fail. (They actually all share the same pending promise.) I'll raise a draft PR for discussion.

t-d-d avatar Feb 12 '21 08:02 t-d-d

We should only update the lastRequest if the heartbeat actually succeeded.

This is already the case @Nevon

If the promise rejects, should we reject all waiting promises as well, or let the next one try (I'm leaning towards the latter).

In this case, it doesn't matter. One failed heartbeat will not break the consumer. In the next event, a new heartbeat will happen, so I would say that the best case is to reject all and wait for the next attempt.

I think @t-d-d PR can work, or a lock as you mentioned previously.

tulios avatar Feb 18 '21 10:02 tulios

Yes, in this case (for the heartbeat) a lock would work as there is an interval and subsequent calls will short-circuit and not heartbeat. In the POC I'm working on, I actually use the 'shared promise' pattern in other places as well (refreshMetaData, joinAndSync.) So I guess I am making the case to add it to utils as it is a useful abstraction.

t-d-d avatar Feb 18 '21 10:02 t-d-d

Hey all, it looks like @t-d-d 's implementation to fix this bug has been merged to master. What is the cadence for releasing a new version of Kafkajs with this fix? Thanks in advance

hermanator608 avatar Oct 12 '21 15:10 hermanator608