sarama icon indicating copy to clipboard operation
sarama copied to clipboard

remove: redundant backoff timer in heartbeat loop

Open napallday opened this issue 1 year ago • 4 comments

The timer retryBackoff here is redundant.

Because in the underlying logic of func Coordinator(consumerGroup string) (*Broker, error)), findCoordinator has already utilized a retry mechanism with client.conf.Metadata.Retry.Backoff or client.conf.Metadata.Retry.BackoffFunc.

func (client *client) findCoordinator(coordinatorKey string, coordinatorType CoordinatorType, attemptsRemaining int) (*FindCoordinatorResponse, error) {
	retry := func(err error) (*FindCoordinatorResponse, error) {
		if attemptsRemaining > 0 {
**			backoff := client.computeBackoff(attemptsRemaining)
			Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
**			time.Sleep(backoff)
			return client.findCoordinator(coordinatorKey, coordinatorType, attemptsRemaining-1)
		}
		return nil, err
	}
   ...
}

func (client *client) computeBackoff(attemptsRemaining int) time.Duration {
**	if client.conf.Metadata.Retry.BackoffFunc != nil {
		maxRetries := client.conf.Metadata.Retry.Max
		retries := maxRetries - attemptsRemaining
**		return client.conf.Metadata.Retry.BackoffFunc(retries, maxRetries)
	}
**	return client.conf.Metadata.Retry.Backoff
}

napallday avatar Jul 18 '23 14:07 napallday

I don't think the timer is entirely redundant but I don't think it is currently working either.

The timer that is created will obviously have fired during the findCoordinator due to the retries with backoff. But it is reset before the select so I think the intention is that it will make sure we sleep before trying again. (I don't think this works though as the channel isn't drained so the select triggers immediately.)

If we remove the timer, then every Retry.Max attempts we will make a new attempt without any backoff (as we unintentionally do already). So, rather than removing the timer, perhaps it should be drained before being reset or perhaps we should just change the scope and create it at the point it is currently reset. The latter might make the intention more obvious?

hindessm avatar Jul 25 '23 13:07 hindessm

Thanks, @hindessm, for the insights.

I find the current logic here a bit weird for a couple of reasons.

  1. Firstly, Metadata.Retry.Max is designed as the maximum retry count for Metadata Refresh, and using it for heartbeat might not be appropriate. After reviewing the Java Kafka client codes, it appears that the correct approach is to retry until session.time.out, rather than relying on the maximum retry count specified in Metadata.Retry.Max.

  2. Secondly, the Java Kafka Client implementation includes a config named retry.backoff.ms, which is for repeatedly sending requests in a tight loop under some failure scenarios. However, Sarama lacks such a universal config. Instead, it uses Metadata.Retry.Backoff in heartLoop. Nonetheless, Sarama also introduces another config called Metadata.Retry.BackoffFunc, which takes precedence over Metadata.Retry.Backoff. If users set Metadata.Retry.Backoff = 0 and Metadata.Retry.BackoffFunc = xxx, the heartbeat retry loop will have no backoff in the current logic. Meanwhile, considering the function signature is BackoffFunc func(retries, maxRetries int) time.Duration, it's odd to utilize Metadata.Retry.Max as the maxRetires parameter for the heartbeat backoff time.

Based on these observations, I recommend the following changes:

  1. Avoid involving coordinator, err := s.parent.client.Coordinator(s.parent.groupID) in the loop because the consumer should already have a valid coordinator here after JoinGroup and SyncGroup operations. In the newSession method, after obtaining the coordinator, we should store it as a field in the consumer and reuse it during heartbeats.

  2. Instead of using Metadata.Retry.Max, retry until session.time.out.

  3. Introduce a new config, Heartbeat.Backoff, to handle the backoff time dedicated for heartbeat.

I believe these adjustments should lead to a more efficient and reliable implementation, and I'll push new commits later involving the above changes.

napallday avatar Jul 27 '23 10:07 napallday

Thank you for your contribution! However, this pull request has not had any activity in the past 90 days and will be closed in 30 days if no updates occur. If you believe the changes are still valid then please verify your branch has no conflicts with main and rebase if needed. If you are awaiting a (re-)review then please let us know.

github-actions[bot] avatar Oct 25 '23 12:10 github-actions[bot]

Thank you for your contribution! However, this pull request has not had any activity in the past 90 days and will be closed in 30 days if no updates occur. If you believe the changes are still valid then please verify your branch has no conflicts with main and rebase if needed. If you are awaiting a (re-)review then please let us know.

github-actions[bot] avatar Feb 01 '24 22:02 github-actions[bot]