sarama
sarama copied to clipboard
fix: race condition(may panic) when closing consumer group
Background
We encounter a data race issue when closing consumer group.
Versions
Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Sarama | Kafka | Go |
---|---|---|
1.36.0 | 2.8.1 | 1.17.7 |
Logs
logs: CLICK ME
WARNING: DATA RACE
Write at 0x00c000f448b0 by goroutine 750:
runtime.closechan()
/Users/xxxxxxxxx/.gvm/gos/go1.17.7/src/runtime/chan.go:355 +0x0
github.com/Shopify/sarama.(*consumerGroup).Close.func1.1()
/Users/xxxxxxxxx/go/pkg/mod/github.com/!shopify/[email protected]/consumer_group.go:161 +0x44
Previous read at 0x00c000f448b0 by goroutine 1238:
runtime.chansend()
/Users/xxxxxxxxx/.gvm/gos/go1.17.7/src/runtime/chan.go:158 +0x0
github.com/Shopify/sarama.(*consumerGroup).handleError()
/Users/xxxxxxxxx/go/pkg/mod/github.com/!shopify/[email protected]/consumer_group.go:568 +0x22d
github.com/Shopify/sarama.newConsumerGroupSession.func1()
/Users/xxxxxxxxx/go/pkg/mod/github.com/!shopify/[email protected]/consumer_group.go:724 +0xb7
github.com/Shopify/sarama.newConsumerGroupSession·dwrap·79()
/Users/xxxxxxxxx/go/pkg/mod/github.com/!shopify/[email protected]/consumer_group.go:726 +0x63
Investigation Results
After investigation, we find the root cause.
-
handleError
is used to collect errors fromheartBeatLoop
,partitionConsumer
,offsetManager
, etc. Some goroutines are also spawned for error collecting, including example1, example1, example3 .... - If
c.config.Consumer.Return.Errors=true
, those errors will be sent to a collective channelc.errors
. - The order of
c.errors <- err
andclose(c.errors)
is non-deterministic and could even cause panic theoretically. A possible panic circumstance is shown in the below flow -- after checking the consumer group is not closed, the goroutine switches. Andclose(c.errors)
is called. When goroutine switches back,c.errors <- err
could cause panic as an error is sent to a closed channel.
(here is a screenshot)
Solution
To prevent deadlock issue raised in #1581, I just create another dedicated lock instead of reusing the current c.lock
one.
@Jacob-bzx thanks for spotting and debugging this problem
I can see that the mutex approach will solve the data race, but I'm less sure about the potential panic.
I wonder if we might want to consider using a sync.Once and permitting the close in either handlerError (if it has spotted that the consumer is being closed) or alternatively within the current func after the client Close has returned
i.e.,
diff --git a/consumer_group.go b/consumer_group.go
index 7d755ea..3a8a713 100644
--- a/consumer_group.go
+++ b/consumer_group.go
@@ -86,9 +86,10 @@ type consumerGroup struct {
memberID string
errors chan error
- lock sync.Mutex
- closed chan none
- closeOnce sync.Once
+ lock sync.Mutex
+ closed chan none
+ closeOnce sync.Once
+ closeErrorsOnce sync.Once
userData []byte
}
@@ -157,9 +158,6 @@ func (c *consumerGroup) Close() (err error) {
}
// drain errors
- go func() {
- close(c.errors)
- }()
for e := range c.errors {
err = e
}
@@ -167,6 +165,10 @@ func (c *consumerGroup) Close() (err error) {
if e := c.client.Close(); e != nil {
err = e
}
+
+ c.closeErrorsOnce.Do(func() {
+ close(c.errors)
+ })
})
return
}
@@ -559,7 +561,10 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) {
select {
case <-c.closed:
- // consumer is closed
+ // consumer is closed, close errors chan
+ c.closeErrorsOnce.Do(func() {
+ close(c.errors)
+ })
return
default:
}
hi @dnwe master, I doubt that the below way with sync.Once
doesn't actually resolve the data race issue.
func (c *consumerGroup) handleError(err error, topic string, partition int32) {
...
select {
case <-c.closed:
+ c.closeErrorsOnce.Do(func() {
+ close(c.errors)
+ })
return
default:
}
== step 1: switch to step 2 ==
== step 4: from step 3 ==
select {
case c.errors <- err:
default:
// no error listener
}
}
func (c *consumerGroup) Close() (err error) {
c.closeOnce.Do(func() {
== step 2: from step 1 ==
close(c.closed)
...
// drain errors
=>1: for e := range c.errors {
err = e
}
if e := c.client.Close(); e != nil {
err = e
}
=>2:
+ c.closeErrorsOnce.Do(func() {
+ close(c.errors)
+ })
})
== step 3: switch to step4 ==
return
I mark a possible workflow that would cause data race as step 1~4, which would also cause panic due to writing to a closed channel.
Moreover, if handleError
is never called, since we drain errors(=>1) before closing the channel(=>2), c.Close()
method may stay in the draining for-loop forever(=>1) and never returned.
I believe the initial solution with RWMutex
shown below is able to avoid panic
func (c *consumerGroup) handleError(err error, topic string, partition int32) {
...
c.errorsLock.RLock()
defer c.errorsLock.RUnlock()
select {
case <-c.closed:
// consumer is closed
return
default:
}
select {
case c.errors <- err:
default:
// no error listener
}
func (c *consumerGroup) Close() (err error) {
c.closeOnce.Do(func() {
=>3 close(c.closed)
...
go func() {
c.errorsLock.Lock()
defer c.errorsLock.Unlock()
=>4 close(c.errors)
}()
...
return
}
because:
-
close(c.closed)
(=>3) happens beforeclose(c.errors)
(=>4) - when
handleError
holds the read lock, it will first judge whether the consumer is closed(case <- c.closed:
), if so, it will return here instead of sending err to the closed channel.