sarama
sarama copied to clipboard
remove: redundant backoff timer in heartbeat loop
The timer retryBackoff
here is redundant.
Because in the underlying logic of func Coordinator(consumerGroup string) (*Broker, error))
, findCoordinator
has already utilized a retry mechanism with client.conf.Metadata.Retry.Backoff
or client.conf.Metadata.Retry.BackoffFunc
.
func (client *client) findCoordinator(coordinatorKey string, coordinatorType CoordinatorType, attemptsRemaining int) (*FindCoordinatorResponse, error) {
retry := func(err error) (*FindCoordinatorResponse, error) {
if attemptsRemaining > 0 {
** backoff := client.computeBackoff(attemptsRemaining)
Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
** time.Sleep(backoff)
return client.findCoordinator(coordinatorKey, coordinatorType, attemptsRemaining-1)
}
return nil, err
}
...
}
func (client *client) computeBackoff(attemptsRemaining int) time.Duration {
** if client.conf.Metadata.Retry.BackoffFunc != nil {
maxRetries := client.conf.Metadata.Retry.Max
retries := maxRetries - attemptsRemaining
** return client.conf.Metadata.Retry.BackoffFunc(retries, maxRetries)
}
** return client.conf.Metadata.Retry.Backoff
}
I don't think the timer is entirely redundant but I don't think it is currently working either.
The timer that is created will obviously have fired during the findCoordinator
due to the retries with backoff. But it is reset before the select so I think the intention is that it will make sure we sleep before trying again. (I don't think this works though as the channel isn't drained so the select triggers immediately.)
If we remove the timer, then every Retry.Max
attempts we will make a new attempt without any backoff (as we unintentionally do already). So, rather than removing the timer, perhaps it should be drained before being reset or perhaps we should just change the scope and create it at the point it is currently reset. The latter might make the intention more obvious?
Thanks, @hindessm, for the insights.
I find the current logic here a bit weird for a couple of reasons.
-
Firstly,
Metadata.Retry.Max
is designed as the maximum retry count forMetadata Refresh
, and using it forheartbeat
might not be appropriate. After reviewing the Java Kafka client codes, it appears that the correct approach is to retry untilsession.time.out
, rather than relying on the maximum retry count specified inMetadata.Retry.Max
. -
Secondly, the Java Kafka Client implementation includes a config named
retry.backoff.ms
, which is forrepeatedly sending requests in a tight loop under some failure scenarios
. However, Sarama lacks such a universal config. Instead, it usesMetadata.Retry.Backoff
inheartLoop
. Nonetheless, Sarama also introduces another config calledMetadata.Retry.BackoffFunc
, which takes precedence overMetadata.Retry.Backoff
. If users setMetadata.Retry.Backoff = 0
andMetadata.Retry.BackoffFunc = xxx
, the heartbeat retry loop will have no backoff in the current logic. Meanwhile, considering the function signature isBackoffFunc func(retries, maxRetries int) time.Duration
, it's odd to utilizeMetadata.Retry.Max
as themaxRetires
parameter for the heartbeat backoff time.
Based on these observations, I recommend the following changes:
-
Avoid involving
coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
in the loop because the consumer should already have a valid coordinator here afterJoinGroup
andSyncGroup
operations. In thenewSession
method, after obtaining the coordinator, we should store it as a field in the consumer and reuse it during heartbeats. -
Instead of using
Metadata.Retry.Max
, retry untilsession.time.out
. -
Introduce a new config,
Heartbeat.Backoff
, to handle the backoff time dedicated for heartbeat.
I believe these adjustments should lead to a more efficient and reliable implementation, and I'll push new commits later involving the above changes.
Thank you for your contribution! However, this pull request has not had any activity in the past 90 days and will be closed in 30 days if no updates occur. If you believe the changes are still valid then please verify your branch has no conflicts with main and rebase if needed. If you are awaiting a (re-)review then please let us know.
Thank you for your contribution! However, this pull request has not had any activity in the past 90 days and will be closed in 30 days if no updates occur. If you believe the changes are still valid then please verify your branch has no conflicts with main and rebase if needed. If you are awaiting a (re-)review then please let us know.