go-elasticsearch icon indicating copy to clipboard operation
go-elasticsearch copied to clipboard

BulkIndexer do not provide info to BulkIndexerItem on global worker.flush error

Open ValentinVERGEZ opened this issue 3 years ago • 12 comments

Hello,

I started to use esutil.BulkIndexer recently (with elasticsearch v6) and got an error by not providing a type on an index action. I expected to obtain the info thanks to the OnFailure callback but got nothing. I now understand that OnFailure does not report errors that occur with the whole worker.flush() and that only the BulkIndexer.OnError() callback does so.
I did not find any easy workaround helping me to detect when the worker.flush() in charge of my BulkIndexerItem fails.

Do you have a workaround for this situation or does it requires an evolution of the BulkIndexer?
If it needs a PR, what do you recommend me to do?

  • Calling OnFailure for each item when an error occurs
  • Adding another callback (for example OnFlushError) to the items, dedicated to this situation
  • Something else ...

Regards

ValentinVERGEZ avatar Aug 07 '20 17:08 ValentinVERGEZ

Hello! This is strange, as all kinds of errors should be propagated. I've tried the executable example _examples/bulk/indexer.go against the 6.x branch, and there's a subtle difference between OnError for the whole indexer and OnError for individual items. In case of omitting the _type on Elasticsearch 6.x, no items are returned at all (the logging output shows it), therefore, calling OnFailure wouldn't help.

I've added some logging and error callback to the example, so you can try it locally:

diff --git i/_examples/bulk/indexer.go w/_examples/bulk/indexer.go
index e5a45b91..163eed29 100644
--- i/_examples/bulk/indexer.go
+++ w/_examples/bulk/indexer.go
@@ -17,8 +17,10 @@ import (
        "context"
        "encoding/json"
        "flag"
+       "fmt"
        "log"
        "math/rand"
+       "os"
        "runtime"
        "strconv"
        "strings"
@@ -30,6 +32,7 @@ import (
 
        "github.com/elastic/go-elasticsearch/v6"
        "github.com/elastic/go-elasticsearch/v6/esapi"
+       "github.com/elastic/go-elasticsearch/v6/estransport"
        "github.com/elastic/go-elasticsearch/v6/esutil"
 )
 
@@ -110,6 +113,8 @@ func main() {
                // Retry up to 5 attempts
                //
                MaxRetries: 5,
+
+               Logger: &estransport.ColorLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true},
        })
        if err != nil {
                log.Fatalf("Error creating the client: %s", err)
@@ -124,12 +129,13 @@ func main() {
        //       See an example in the "benchmarks" folder.
        //
        bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
-               Index:         indexName,        // The default index name
-               DocumentType:  "_doc",           // The default document type
+               Index: indexName, // The default index name
+               // DocumentType:  "_doc",           // The default document type
                Client:        es,               // The Elasticsearch client
                NumWorkers:    numWorkers,       // The number of worker goroutines
                FlushBytes:    int(flushBytes),  // The flush threshold in bytes
                FlushInterval: 30 * time.Second, // The periodic flush interval
+               OnError:       func(ctx context.Context, err error) { fmt.Printf("BulkIndexer Error: %s\n", err) },
        })
        if err != nil {
                log.Fatalf("Error creating the indexer: %s", err)

The OnError callback for the indexer should handle the situation you describe.

I'm out of office this months, so I have limited time to look into that, but I can continue the conversation in this ticket.

karmi avatar Aug 08 '20 07:08 karmi

Hello, any news here?

karmi avatar Sep 28 '20 13:09 karmi

We are facing a similar issue. In flush function, errors are not propagated to individual items using onFailure callback. func (w *worker) flush(ctx context.Context) error { . . . . res, err := req.Do(ctx, w.bi.config.Client) if err != nil { atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items))) if w.bi.config.OnError != nil { w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err)) } return fmt.Errorf("flush: %s", err) }

If the API fails due to a timeout/connection error, this is will be conveyed through "w.bi.config.OnError". It should notify to the individual item.OnFailure(ctx, item, info, nil)

sumanthakannantha avatar Feb 23 '21 17:02 sumanthakannantha

Hi any update on this issue?

itzikYeret avatar Mar 14 '21 14:03 itzikYeret

Any update on this? This update will be very helpful for us in many use cases

pundliksarafdar avatar May 25 '21 16:05 pundliksarafdar

Hi Any update on this issue?

sumanthakannantha avatar May 25 '21 16:05 sumanthakannantha

Hello! Any updates on this?

brouillette avatar Aug 05 '22 14:08 brouillette

Hi guys, Any update on this issue?

ansssu avatar Feb 22 '23 23:02 ansssu

Hi guys, Any update on this issue?

tornaci45 avatar Oct 16 '23 08:10 tornaci45

Hi guys, Any update on this issue?

rpecb avatar Dec 08 '23 16:12 rpecb

Any update on this issue?

amalic avatar Jan 21 '24 12:01 amalic

Hi Team

Is it possible to expedite this critical requirement?

sumantha-kannantha-hpe avatar Jan 22 '24 09:01 sumantha-kannantha-hpe