elastic
elastic copied to clipboard
Bulk processor retries indefinitely on failure
Please use the following questions as a guideline to help me answer your issue/question without further inquiry. Thank you.
Which version of Elastic are you using?
- [ ] elastic.v7 (for Elasticsearch 7.x)
- [x] elastic.v6 (for Elasticsearch 6.x)
- [ ] elastic.v5 (for Elasticsearch 5.x)
- [ ] elastic.v3 (for Elasticsearch 2.x)
- [ ] elastic.v2 (for Elasticsearch 1.x)
Please describe the expected behavior
This issue is similar to #1247 but with a bigger scope.
When an individual request fails in a bulk processor, it should not retry when the backoff returns false.
Please describe the actual behavior
When the backoff returns false and there is an error in the individual request , the failing request is added back to the bulkprocessor before the commit function returns. This causes the indefinite retries.
The following code in bulk_processor.go is responsible for this behaviour.
// commitFunc will commit bulk requests and, on failure, be retried
// via exponential backoff
commitFunc := func() error {
var err error
// Save requests because they will be reset in service.Do
reqs := w.service.requests
res, err = w.service.Do(ctx)
if err == nil {
// Overall bulk request was OK. But each bulk response item also has a status
if w.p.retryItemStatusCodes != nil && len(w.p.retryItemStatusCodes) > 0 {
// Check res.Items since some might be soft failures
if res.Items != nil && res.Errors {
// res.Items will be 1 to 1 with reqs in same order
for i, item := range res.Items {
for _, result := range item {
if _, found := w.p.retryItemStatusCodes[result.Status]; found {
w.service.Add(reqs[i]) //here the failing request is added back to the processor regardless of the backoff flag
if err == nil {
err = ErrBulkItemRetry
}
}
}
}
}
}
}
return err
}
Any steps to reproduce the behavior?
One easy way to recreate this is to retry on client side errors, and send wrong data type to a field in the index.
hi @raiRaiyan I believe I contributed the code that you referenced.
The library will check the status on each response line item as shown above before adding the request corresponding to the line item back. The default list of status codes this applies to is
defaultRetryItemStatusCodes = []int{408, 429, 503, 507}
if _, found := w.p.retryItemStatusCodes[result.Status]; found {
// add back
}
These correspond to a set of status codes I looked up which could be temporary errors. E.g. 429 too many requests. I would be surprised if a wrong data type would cause one of those response status to be returned, but I would be interested to find out if that's wrong.
You can override this slice of status codes to add back with the RetryItemStatusCodes method on the bulk processor.
@rwynn Thank you for the reply. The code works as expected when the requests succeed after the initial failure, but the issue is of infinite retries when the requests do not succeed. The wrong data type was just an example for how to recreate the issue.
My understanding is that the backoff function defines how the requests are retried and it can control when to stop retrying by returning false from the Next function. But this is not the case, as the commitFunc keeps adding the failed requests (provided it belongs to the list of status codes defined) to the worker, regardless of the output of the backoff function.
I want the processor to stop retrying when the max time is reached for exponential backoff and discard the failing request, but currently that doesn't seem to happen.
@raiRaiyan I think to get the behavior you are expecting, one way to do it would be to add a 4th argument to RetryNotify defined here which is a reset function.
The reset function would be called when all the retries have been exhausted without success and it would clear the queue of requests.
So, I'm thinking something like this...
func RetryNotify(operation Operation, b Backoff, notify Notify, reset ResetFunc) error {
var err error
var wait time.Duration
var retry bool
var n int
for {
if err = operation(); err == nil {
return nil
}
n++
wait, retry = b.Next(n)
if !retry {
if reset != nil {
reset()
}
return err
}
if notify != nil {
notify(err)
}
time.Sleep(wait)
}
}
The reset function passed in would be a function that closes over BulkService and calls Reset on it to clear the line items.
To clarify further, without inspecting and checking the response item lines in the commit function you get a scenario where the overall bulk request succeeds, but the response line items contain soft (temporary) failures. If you don't inspect those and return an error on them, then the RetryNotify thinks the operation() call succeeded and does not initiate retry. Furthermore, without the "add back" logic you actually lose those line items since the Do function calls Reset if the overall response was OK.
A much more direct solution would be to simply add the following line after the call to RetryNotify.
err := RetryNotify(commitFunc, w.p.backoff, notifyFunc)
w.service.Reset()
Since all the retries happen inside RetryNotify.
Yes. Resetting the service queue after the RetryNotify makes sense. I was thinking of passing a flag to the commitFunc to control whether or not the request is added back, but just resetting the queue is enough and the overhead is very minimal.
@rwynn @raiRaiyan I'm very busy with my day job currently, so I'd be more than happy to get a PR to be merged for the next release. Thanks for your committment.