elastigo icon indicating copy to clipboard operation
elastigo copied to clipboard

Fix datarace issue for PendingDocuments function

Open mazing80 opened this issue 8 years ago • 5 comments

Sometimes occurred data race issue on PendingDocuments().

mazing80 avatar Mar 16 '16 17:03 mazing80

I'm assuming you found this with the race detector? Do you still have the output from the tool?

vrecan avatar Mar 17 '16 19:03 vrecan

Yes, like below.

==================
WARNING: DATA RACE
Write by goroutine 30:
  my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib.(*BulkIndexer).startDocChannel.func1()
      /Users/me/Golang/src/my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib/corebulk.go:250 +0x12a

Previous read by goroutine 14:
  my.corp.com/me/log-server.git/storage/es.PendingChecker.func1()
      /Users/me/Golang/src/my.corp.com/me/log-server.git/storage/es/client.go:81 +0x2a4

Goroutine 30 (running) created at:
  my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib.(*BulkIndexer).startDocChannel()
      /Users/me/Golang/src/my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib/corebulk.go:259 +0x42
  my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib.(*BulkIndexer).Start.func1()
      /Users/me/Golang/src/my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib/corebulk.go:142 +0x104

Goroutine 14 (running) created at:
  my.corp.com/me/log-server.git/storage/es.PendingChecker()
      /Users/me/Golang/src/my.corp.com/me/log-server.git/storage/es/client.go:84 +0xf1
  main.main()
      /Users/me/Golang/src/my.corp.com/me/log-server.git/main.go:85 +0xc41
==================

mazing80 avatar Mar 18 '16 06:03 mazing80

And this is my simple client code. If I wrong, please tell me.


import (
    "sync"
    "time"
        "log"

    elastigo "github.com/mattbaird/elastigo/lib"
)

const (
    DefaultHost = "localhost:0092"
    // max connections for ES
    esMaxConns = 15
    // if 0 it will not retry
    retryForSeconds = 0
    // ES Bulk buffer - 5MB
    //bulkMaxBuffer = 5242880
    bulkMaxBuffer        = 5242880
    bulkMaxDocs          = 3000
    pendingCheckDuration = time.Second * 1
)

var esClient *elastigo.Conn
var ESBulkIndexer *elastigo.BulkIndexer

type check struct {
    done   chan struct{}
    enable bool
    sync.Mutex
}

var pendingCheck check

func init() {
    pendingCheck.enable = false
}

func Setup() {
    esClient = elastigo.NewConn()
    esClient.Hosts = append(esClient.Hosts, DefaultHost)
    ESBulkIndexer = esClient.NewBulkIndexerErrors(esMaxConns, retryForSeconds)
    ESBulkIndexer.BulkMaxBuffer = bulkMaxBuffer
    ESBulkIndexer.BulkMaxDocs = bulkMaxDocs
}

func SetHost(host []string) {
    esClient.Hosts = host
}

func Start() {
    go func() {
        for errBuf := range ESBulkIndexer.ErrorChannel {
            log.Warnf("ES Error : %v", errBuf.Err)
        }
        log.Info("ES Error Channel closed.")
    }()
    ESBulkIndexer.Start()
    log.Info("ES BulkIndexer start.")
}

func PendingChecker() {
    ticker := time.NewTicker(pendingCheckDuration)
    pendingCheck.Lock()
    pendingCheck.enable = true
    pendingCheck.done = make(chan struct{}, 1)
    pendingCheck.Unlock()
    go func() {
    L:
        for range ticker.C {
            select {
            case <-pendingCheck.done:
                pendingCheck.Lock()
                close(pendingCheck.done)
                pendingCheck.enable = false
                pendingCheck.Unlock()
                ticker.Stop()
                log.Info("ES Pending checker stop.")
                break L
            default:
                log.Infof("ES Pending documents : %d.", ESBulkIndexer.PendingDocuments())
            }
        }
    }()
}

func Stop() {
    ESBulkIndexer.Flush()
    ESBulkIndexer.Stop()
    close(ESBulkIndexer.ErrorChannel)
    pendingCheck.Lock()
    if pendingCheck.enable {
        pendingCheck.done <- struct{}{}
    }
    pendingCheck.Unlock()
    time.Sleep(time.Millisecond * 100)
    log.Info("ES BulkIndexer stop.")
}

func main() {
    Setup()
    SetHost("localhost:9200")
    Start()
    PendingChecker()
    // running http server 
    Stop()
}

mazing80 avatar Mar 18 '16 06:03 mazing80

I can look into this more tonight but the elastigo connection can't be shared acrossed goroutines right now

vrecan avatar Mar 19 '16 18:03 vrecan

Thank you. I will test again. :)

mazing80 avatar Mar 24 '16 17:03 mazing80