elastic icon indicating copy to clipboard operation
elastic copied to clipboard

Bulk stats does not show any failures, but some documents are missing on index, need to run multiple times to work

Open vitroz opened this issue 4 years ago • 2 comments

Please use the following questions as a guideline to help me answer your issue/question without further inquiry. Thank you.

Which version of Elastic are you using?

[ ] elastic.v7 (for Elasticsearch 7.x) [x] elastic.v6 (for Elasticsearch 6.x) [ ] elastic.v5 (for Elasticsearch 5.x) [ ] elastic.v3 (for Elasticsearch 2.x) [ ] elastic.v2 (for Elasticsearch 1.x)

Please describe the expected behavior

Using bulkProcessor.Add(r) to create requests for documents to be indexed, after all documents are added I call bulkProcessor.Flush() all requests are successfully displayed as created by bulkProcessor.Stats() and all records appear on ES requests.

Please describe the actual behavior

Not all records show up, even though stats does not shows any errors. If I run the code multiple times, all records eventually show up.

Any steps to reproduce the behavior?

count, err := index.DocumentsTotalCount(apiCtx, sqlWhereStmt) // Calls a (Select Count(*) from <Index table>) on the Database
				if err != nil {
					panic(err)
				}
var limit float64 = 1000.0
totalPages := int(math.Ceil(float64(count) / limit))

				bulkProcessor, err := apiCtx.ElasticClient.BulkProcessor().
					Name("ESImporterWorker").
					Workers(2).
					BulkActions(1000).
					BulkSize(2 << 20).
					Stats(true).
					Do(context.Background())
				if err != nil {
					panic(err)
				}

defer bulkProcessor.Close()

				for i := 0; i < totalPages; i++ {
					offset := int(limit) * i
					result, err := index.Documents(apiCtx, sqlWhereStmt, int(limit), offset) // fetchs records on the DB in sets of 1000
					if err != nil {
						panic(err)
					}
					documents := reflect.ValueOf(result)

					for i := 0; i < documents.Len(); i++ {
						doc := documents.Index(i)
						r := elastic.NewBulkIndexRequest().
							Index(index.Name()).
							Type(index.MappingType()).
							Id(doc.FieldByName("ID").String()).
							Doc(doc.Interface())
						bulkProcessor.Add(r)
					}
				}
				err = bulkProcessor.Flush()
				if err != nil {
					panic(err)
				}
				stats := bulkProcessor.Stats()
				fmt.Printf("Number of times flush has been invoked: %d\n", stats.Flushed)
				fmt.Printf("Number of times workers committed reqs: %d\n", stats.Committed)
				fmt.Printf("Number of requests indexed            : %d\n", stats.Indexed)
				fmt.Printf("Number of requests reported as created: %d\n", stats.Created)
				fmt.Printf("Number of requests reported as updated: %d\n", stats.Updated)
				fmt.Printf("Number of requests reported as success: %d\n", stats.Succeeded)
				fmt.Printf("Number of requests reported as failed : %d\n", stats.Failed)
				fmt.Printf("\n\n")

Any idea on where I might debug what is going on?

I see no errors what so ever, but some records are always missing, If I keep trying to run the same exact code, it works.

Stats example

Total completion (%): 
100.0000
offset
26000

Number of times flush has been invoked: 1
Number of times workers committed reqs: 26
Number of requests indexed            : 25935
Number of requests reported as created: 0
Number of requests reported as updated: 0
Number of requests reported as success: 25935
Number of requests reported as failed : 0
```

If I try to request the data count though, I get 
{
    "total_hits": 25906
}

I noticed If try to run the same code (multiple times), on the same data (query) it eventually adds up to the actual size

vitroz avatar Nov 11 '20 03:11 vitroz

Does your program stop after the flush? If so, does adding a delay of, say, 5 seconds at the end of main help in any way?

olivere avatar Dec 10 '20 13:12 olivere

Hi @olivere ! Sorry for the late response, had a problem with my setup My program does not stop, do you think adding the delay might help, even if it is not currently crashing?

vitroz avatar Dec 14 '20 20:12 vitroz