go-elasticsearch icon indicating copy to clipboard operation
go-elasticsearch copied to clipboard

Bug: Context timeout error is swallowed when closing BulkIndexer in v7.17

Open karatekaneen opened this issue 1 year ago • 1 comments

When I was trying to gracefully handle timeouts I noticed that the error is swallowed and it looks like everything was OK even though the timeout was triggered and the whole operation was aborted.

To reproduce

package elastic

import (
	"context"
	"errors"
	"log"
	"strings"
	"testing"
	"time"

	"github.com/elastic/go-elasticsearch/v7"
	"github.com/elastic/go-elasticsearch/v7/esutil"
)

func TestNewBulkIndexer(t *testing.T) {
	// I don't have any elasticsearch running so this will definitely time out
	es, err := elasticsearch.NewClient(elasticsearch.Config{
		RetryBackoff: func(i int) time.Duration { return time.Duration(i) * 1000 * time.Millisecond },
		MaxRetries:   5,
	})
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}

	indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
		Client:     es,     // The Elasticsearch client
		Index:      "test", // The default index name
		NumWorkers: 4,      // The number of worker goroutines (default: number of CPUs)
		FlushBytes: 5e+6,   // The flush threshold in bytes (default: 5M)
	})
	if err != nil {
		log.Fatalf("Error creating the indexer: %s", err)
	}
	err = indexer.Add(
		context.Background(),
		esutil.BulkIndexerItem{
			Action:     "index",
			DocumentID: "1",
			Body:       strings.NewReader(`{"title":"Test"}`),
		},
	)
	if err != nil {
		log.Fatalf("Unexpected error: %s", err)
	}

	// Add a very narrow timeout
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	// Close the bulkIndexer
	err = indexer.Close(ctx)
	// This should be "context deadline cancelled" but is nil for some reason
	if !errors.Is(err, context.DeadlineExceeded) {
		t.Errorf(`Expected "context deadline cancelled" - Got %v`, err)
	}
}

karatekaneen avatar May 16 '23 15:05 karatekaneen

We ran into the same issue, which was pretty tricky and confusing to debug. We only came to the conclusion by seeing timeouts from the pubsub subscription that feeds the bulk indexer while there were no errors in the logs.

In our case we have a continuous indexing job running, so under normal circumstances we almost never reach the "close" step. It would be good to have the error available in the OnFailure handler for an individual item or the OnError handler for the bulk indexer itself.

floretan avatar May 24 '23 14:05 floretan