watermill
watermill copied to clipboard
Retry Middleware blocked message
When I use a single consumer, subsequent messages cannot be consumed normally after Retry is triggered.
retryLoop:
for {
waitTime := expBackoff.NextBackOff()
select {
case <-ctx.Done():
return producedMessages, err
case <-time.After(waitTime):
// go on
}
producedMessages, err = h(msg)
if err == nil {
return producedMessages, nil
}
if r.Logger != nil {
r.Logger.Error("Error occurred, retrying", err, watermill.LogFields{
"retry_no": retryNum,
"max_retries": r.MaxRetries,
"wait_time": waitTime,
"elapsed_time": expBackoff.GetElapsedTime(),
})
}
if r.OnRetryHook != nil {
r.OnRetryHook(retryNum, waitTime)
}
retryNum++
if retryNum > r.MaxRetries {
break retryLoop
}
}
Do you mean after entering retryLoop
section?
yes, when I set InitialInterval 30min, it causes message blocking
@guihouchang if you set the initial interval to 30 minutes, the middleware waits for 30 minutes after a message processing fails. This is by design, and it blocks further messages. If you need to unblock the following messages, you need something more complex, like publishing the broken message on a poison queue.