elastigo
elastigo copied to clipboard
Fundamental flaw in bulk indexer?
We've been trying to track down some issues where there are holes in our imported data, yet no errors coming through. I think I've identified a race condition where the last documents may never be flushed to Elasticsearch.
In startHttpSender, we wait on both the b.sendBuf
and b.httpDoneChan
inside a select loop. If we receive on httpDoneChan
we return, and the goroutine finishes. During the Stop
process, we first push the last docs, if there are any, onto sendBuf
and then send true on httpDoneChan
.
Go promises to process a single channel in a fifo manner, however from our tests this does not hold true for a select
on multiple channels. I believe that in some cases, the httpDoneChan
will beat the sendBuf
call, and the goroutine will close, Stop
will return, and sendBuf
will be left with data in it, never to be received on. This is more likely to happen, I imagine, if the sendBuf
part of the select
statement is still processing at the moment that the Stop
method is called, since neither channel will be ready to receive on immediately, and then the race occurs.
Here's some sample code that shows how the ordering of input to the channels does not matter when selecting on them, and also showing that this can't be solved by closing the channels either :(. The playground link doesn't show the close issue, but testing on a live machine will show the count
output varies.
http://play.golang.org/p/a0RbfN8uQ8
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string, 10)
ch2 := make(chan string, 10)
go func() {
count := 0
LOOP:
for {
select {
case a, more := <-ch1:
if !more {
break LOOP
}
count++
fmt.Println(a)
case a, more := <-ch2:
if !more {
break LOOP
}
count++
fmt.Println(a)
}
}
fmt.Println(count)
}()
for i := 0; i < 10; i++ {
ch1 <- "a"
ch2 <- "b"
}
close(ch1)
close(ch2)
time.Sleep(time.Second)
}
this is quite possible. This code needs some love. If you have some proposals, I'm open to helping.
My personal opinion is that it's trying to be a little too "asynchronous". We've moved to a simpler system that has a synchronous interface for any single "batch" of entities, (which is what we were trying to use the BulkIndexer for), whereas the BulkIndexer feels like it's designed to be more of a long running process.
We communicate via a docChan
and an errChan
. A goroutine runs and pulls from the docChan
until it's closed, and sends the docs it receives in batches to elasticsearch as it reaches the threshold - sending any errors (and nil) back on the errChan
, and using a sync.WaitGroup
to ensure all requests are processed, then closing the errChan
. In the main thread the entities are looped through, sent on the docChan
, then the docChan
is closed, and the errChan
is looped over until it's closed. This has worked seamlessly for us, and was a very simple interface.
Obviously this was domain specific to our code, since we knew how to parse our entities into bulk indexer compatible documents, but I wouldn't imagine this would be too hard to replicate as BatchIndexer
style code. That could be implemented in parallel to the BulkIndexer.
The bulkIndexer has a lot of issues, a simpler bulkIndexer would be appreciated. The current one does not allow the client to deal with errors. It's also very likely to drop data on shutdown of an app since you don't know if something was indexed or not.
Yes, I've been trying to track down errors in the bulk indexer and it's really difficult. Also, there seems to be a memory leak in NewBulkIndexerErrors. If an update or index operation fails and it retries then it seems to leak memory.
the background go routine stuff really doesn't belong in the library in my opinion. The library should give you a simple bulk api that the caller can wrap in a goroutine if they want to make it async.
@vrecan @kristen1980 how about setting a custom Sender function attribute on the BulkIndexer, is that not a good way to deal with errors? what kind of errors are you talking about that can't be caught this way?
@snikch were you using an old version of elastigo? httpDoneChan has been removed since nov 2014 (9fd168ffce5dfbc43e76e669136c2c6c2ac4ffc6 ) and startHttpSender looks quite different than you describe. does this race issue still exist?
Hrm, it's possible. This was over a year ago so I've really got no memory of the whole situation. Sorry!