go-elasticsearch
go-elasticsearch copied to clipboard
BulkIndexer do not provide info to BulkIndexerItem on global worker.flush error
Hello,
I started to use esutil.BulkIndexer
recently (with elasticsearch v6) and got an error by not providing a type on an index action. I expected to obtain the info thanks to the OnFailure
callback but got nothing. I now understand that OnFailure
does not report errors that occur with the whole worker.flush()
and that only the BulkIndexer.OnError()
callback does so.
I did not find any easy workaround helping me to detect when the worker.flush()
in charge of my BulkIndexerItem
fails.
Do you have a workaround for this situation or does it requires an evolution of the BulkIndexer?
If it needs a PR, what do you recommend me to do?
- Calling
OnFailure
for each item when an error occurs - Adding another callback (for example
OnFlushError
) to the items, dedicated to this situation - Something else ...
Regards
Hello! This is strange, as all kinds of errors should be propagated. I've tried the executable example _examples/bulk/indexer.go
against the 6.x
branch, and there's a subtle difference between OnError
for the whole indexer and OnError
for individual items. In case of omitting the _type on Elasticsearch 6.x, no items are returned at all (the logging output shows it), therefore, calling OnFailure
wouldn't help.
I've added some logging and error callback to the example, so you can try it locally:
diff --git i/_examples/bulk/indexer.go w/_examples/bulk/indexer.go
index e5a45b91..163eed29 100644
--- i/_examples/bulk/indexer.go
+++ w/_examples/bulk/indexer.go
@@ -17,8 +17,10 @@ import (
"context"
"encoding/json"
"flag"
+ "fmt"
"log"
"math/rand"
+ "os"
"runtime"
"strconv"
"strings"
@@ -30,6 +32,7 @@ import (
"github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
+ "github.com/elastic/go-elasticsearch/v6/estransport"
"github.com/elastic/go-elasticsearch/v6/esutil"
)
@@ -110,6 +113,8 @@ func main() {
// Retry up to 5 attempts
//
MaxRetries: 5,
+
+ Logger: &estransport.ColorLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true},
})
if err != nil {
log.Fatalf("Error creating the client: %s", err)
@@ -124,12 +129,13 @@ func main() {
// See an example in the "benchmarks" folder.
//
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
- Index: indexName, // The default index name
- DocumentType: "_doc", // The default document type
+ Index: indexName, // The default index name
+ // DocumentType: "_doc", // The default document type
Client: es, // The Elasticsearch client
NumWorkers: numWorkers, // The number of worker goroutines
FlushBytes: int(flushBytes), // The flush threshold in bytes
FlushInterval: 30 * time.Second, // The periodic flush interval
+ OnError: func(ctx context.Context, err error) { fmt.Printf("BulkIndexer Error: %s\n", err) },
})
if err != nil {
log.Fatalf("Error creating the indexer: %s", err)
The OnError
callback for the indexer should handle the situation you describe.
I'm out of office this months, so I have limited time to look into that, but I can continue the conversation in this ticket.
Hello, any news here?
We are facing a similar issue. In flush function, errors are not propagated to individual items using onFailure callback. func (w *worker) flush(ctx context.Context) error { . . . . res, err := req.Do(ctx, w.bi.config.Client) if err != nil { atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items))) if w.bi.config.OnError != nil { w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err)) } return fmt.Errorf("flush: %s", err) }
If the API fails due to a timeout/connection error, this is will be conveyed through "w.bi.config.OnError". It should notify to the individual item.OnFailure(ctx, item, info, nil)
Hi any update on this issue?
Any update on this? This update will be very helpful for us in many use cases
Hi Any update on this issue?
Hello! Any updates on this?
Hi guys, Any update on this issue?
Hi guys, Any update on this issue?
Hi guys, Any update on this issue?
Any update on this issue?
Hi Team
Is it possible to expedite this critical requirement?