go-elasticsearch
go-elasticsearch copied to clipboard
OnError callback in esutil.BulkIndexerConfig is invoked duplicately
In bulk_indexer.go, I see logic below:
func (w *worker) run() {
go func() {
...
if err := w.flush(ctx); err != nil {
w.mu.Unlock()
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, err)
}
continue
}
...
}()
}
func (w *worker) flush(ctx context.Context) error {
...
if res.IsError() {
atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items)))
// TODO(karmi): Wrap error (include response struct)
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err))
}
return fmt.Errorf("flush: %s", res.String())
}
...
}
When flush fails, bi.config.OnError is called twice. One happens in flush function, and the other in the run function.