elastigo
elastigo copied to clipboard
Broken BulkIndexer Flush
If you have something like this:
indexer := core.NewBulkIndexerErrors(200, 60)
indexer.BulkMaxBuffer = 10485760
indexer.BulkMaxDocs = 6000
done := make(chan bool)
indexer.Run(done)
// Clean up on exit as the indexer doesn't seem to do it by itself
defer func() { indexer.ErrorChannel = nil }()
defer close(indexer.ErrorChannel)
go func() {
for errBuf := range indexer.ErrorChannel {
// just blissfully print errors forever
log.Println(errBuf.Err)
}
log.Println("No more errors")
}()
for i := 0; i < 20; i++ {
indexer.Index("twitter", "user", strconv.Itoa(i), "", nil, `{"alias":"babbsan"}`)
}
done <- true
indexer.Flush()
And don't call the Flush in the end, it will not save those documents. I'm guessing it is because Run() is creating its own goroutine which won't block on shutdown, where calling Flush will block.
Also see my note in the code about indexer not closing down the error channel on shutdown
Maybe a bit related to #30