elastigo
elastigo copied to clipboard
Fix datarace issue for PendingDocuments function
Sometimes occurred data race issue on PendingDocuments()
.
I'm assuming you found this with the race detector? Do you still have the output from the tool?
Yes, like below.
==================
WARNING: DATA RACE
Write by goroutine 30:
my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib.(*BulkIndexer).startDocChannel.func1()
/Users/me/Golang/src/my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib/corebulk.go:250 +0x12a
Previous read by goroutine 14:
my.corp.com/me/log-server.git/storage/es.PendingChecker.func1()
/Users/me/Golang/src/my.corp.com/me/log-server.git/storage/es/client.go:81 +0x2a4
Goroutine 30 (running) created at:
my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib.(*BulkIndexer).startDocChannel()
/Users/me/Golang/src/my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib/corebulk.go:259 +0x42
my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib.(*BulkIndexer).Start.func1()
/Users/me/Golang/src/my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib/corebulk.go:142 +0x104
Goroutine 14 (running) created at:
my.corp.com/me/log-server.git/storage/es.PendingChecker()
/Users/me/Golang/src/my.corp.com/me/log-server.git/storage/es/client.go:84 +0xf1
main.main()
/Users/me/Golang/src/my.corp.com/me/log-server.git/main.go:85 +0xc41
==================
And this is my simple client code. If I wrong, please tell me.
import (
"sync"
"time"
"log"
elastigo "github.com/mattbaird/elastigo/lib"
)
const (
DefaultHost = "localhost:0092"
// max connections for ES
esMaxConns = 15
// if 0 it will not retry
retryForSeconds = 0
// ES Bulk buffer - 5MB
//bulkMaxBuffer = 5242880
bulkMaxBuffer = 5242880
bulkMaxDocs = 3000
pendingCheckDuration = time.Second * 1
)
var esClient *elastigo.Conn
var ESBulkIndexer *elastigo.BulkIndexer
type check struct {
done chan struct{}
enable bool
sync.Mutex
}
var pendingCheck check
func init() {
pendingCheck.enable = false
}
func Setup() {
esClient = elastigo.NewConn()
esClient.Hosts = append(esClient.Hosts, DefaultHost)
ESBulkIndexer = esClient.NewBulkIndexerErrors(esMaxConns, retryForSeconds)
ESBulkIndexer.BulkMaxBuffer = bulkMaxBuffer
ESBulkIndexer.BulkMaxDocs = bulkMaxDocs
}
func SetHost(host []string) {
esClient.Hosts = host
}
func Start() {
go func() {
for errBuf := range ESBulkIndexer.ErrorChannel {
log.Warnf("ES Error : %v", errBuf.Err)
}
log.Info("ES Error Channel closed.")
}()
ESBulkIndexer.Start()
log.Info("ES BulkIndexer start.")
}
func PendingChecker() {
ticker := time.NewTicker(pendingCheckDuration)
pendingCheck.Lock()
pendingCheck.enable = true
pendingCheck.done = make(chan struct{}, 1)
pendingCheck.Unlock()
go func() {
L:
for range ticker.C {
select {
case <-pendingCheck.done:
pendingCheck.Lock()
close(pendingCheck.done)
pendingCheck.enable = false
pendingCheck.Unlock()
ticker.Stop()
log.Info("ES Pending checker stop.")
break L
default:
log.Infof("ES Pending documents : %d.", ESBulkIndexer.PendingDocuments())
}
}
}()
}
func Stop() {
ESBulkIndexer.Flush()
ESBulkIndexer.Stop()
close(ESBulkIndexer.ErrorChannel)
pendingCheck.Lock()
if pendingCheck.enable {
pendingCheck.done <- struct{}{}
}
pendingCheck.Unlock()
time.Sleep(time.Millisecond * 100)
log.Info("ES BulkIndexer stop.")
}
func main() {
Setup()
SetHost("localhost:9200")
Start()
PendingChecker()
// running http server
Stop()
}
I can look into this more tonight but the elastigo connection can't be shared acrossed goroutines right now
Thank you. I will test again. :)