sarama icon indicating copy to clipboard operation
sarama copied to clipboard

fix: race condition(may panic) when closing consumer group

Open napallday opened this issue 2 years ago • 2 comments

Background

We encounter a data race issue when closing consumer group.

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
1.36.0 2.8.1 1.17.7
Logs
logs: CLICK ME


WARNING: DATA RACE
Write at 0x00c000f448b0 by goroutine 750:
  runtime.closechan()
      /Users/xxxxxxxxx/.gvm/gos/go1.17.7/src/runtime/chan.go:355 +0x0
  github.com/Shopify/sarama.(*consumerGroup).Close.func1.1()
      /Users/xxxxxxxxx/go/pkg/mod/github.com/!shopify/[email protected]/consumer_group.go:161 +0x44

Previous read at 0x00c000f448b0 by goroutine 1238:
  runtime.chansend()
      /Users/xxxxxxxxx/.gvm/gos/go1.17.7/src/runtime/chan.go:158 +0x0
  github.com/Shopify/sarama.(*consumerGroup).handleError()
      /Users/xxxxxxxxx/go/pkg/mod/github.com/!shopify/[email protected]/consumer_group.go:568 +0x22d
  github.com/Shopify/sarama.newConsumerGroupSession.func1()
      /Users/xxxxxxxxx/go/pkg/mod/github.com/!shopify/[email protected]/consumer_group.go:724 +0xb7
  github.com/Shopify/sarama.newConsumerGroupSession·dwrap·79()
      /Users/xxxxxxxxx/go/pkg/mod/github.com/!shopify/[email protected]/consumer_group.go:726 +0x63

Investigation Results

After investigation, we find the root cause.

  • handleError is used to collect errors from heartBeatLoop, partitionConsumer, offsetManager, etc. Some goroutines are also spawned for error collecting, including example1, example1, example3 ....
  • If c.config.Consumer.Return.Errors=true, those errors will be sent to a collective channel c.errors.
  • The order of c.errors <- err and close(c.errors) is non-deterministic and could even cause panic theoretically. A possible panic circumstance is shown in the below flow -- after checking the consumer group is not closed, the goroutine switches. And close(c.errors) is called. When goroutine switches back, c.errors <- err could cause panic as an error is sent to a closed channel.

image(here is a screenshot)

Solution

To prevent deadlock issue raised in #1581, I just create another dedicated lock instead of reusing the current c.lock one.

napallday avatar Sep 09 '22 16:09 napallday

@Jacob-bzx thanks for spotting and debugging this problem

I can see that the mutex approach will solve the data race, but I'm less sure about the potential panic.

I wonder if we might want to consider using a sync.Once and permitting the close in either handlerError (if it has spotted that the consumer is being closed) or alternatively within the current func after the client Close has returned

i.e.,

diff --git a/consumer_group.go b/consumer_group.go
index 7d755ea..3a8a713 100644
--- a/consumer_group.go
+++ b/consumer_group.go
@@ -86,9 +86,10 @@ type consumerGroup struct {
 	memberID        string
 	errors          chan error
 
-	lock      sync.Mutex
-	closed    chan none
-	closeOnce sync.Once
+	lock            sync.Mutex
+	closed          chan none
+	closeOnce       sync.Once
+	closeErrorsOnce sync.Once
 
 	userData []byte
 }
@@ -157,9 +158,6 @@ func (c *consumerGroup) Close() (err error) {
 		}
 
 		// drain errors
-		go func() {
-			close(c.errors)
-		}()
 		for e := range c.errors {
 			err = e
 		}
@@ -167,6 +165,10 @@ func (c *consumerGroup) Close() (err error) {
 		if e := c.client.Close(); e != nil {
 			err = e
 		}
+
+		c.closeErrorsOnce.Do(func() {
+			close(c.errors)
+		})
 	})
 	return
 }
@@ -559,7 +561,10 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) {
 
 	select {
 	case <-c.closed:
-		// consumer is closed
+		// consumer is closed, close errors chan
+		c.closeErrorsOnce.Do(func() {
+			close(c.errors)
+		})
 		return
 	default:
 	}

dnwe avatar Sep 15 '22 18:09 dnwe

hi @dnwe master, I doubt that the below way with sync.Once doesn't actually resolve the data race issue.

func (c *consumerGroup) handleError(err error, topic string, partition int32) {
        ...
	select {
	case <-c.closed:
+		c.closeErrorsOnce.Do(func() {
+			close(c.errors)
+		})
                 return
	default:
	}

== step 1: switch to step 2 ==

== step 4: from step 3 ==

	select {
	case c.errors <- err:
	default:
		// no error listener
	}
}
func (c *consumerGroup) Close() (err error) {
	c.closeOnce.Do(func() {
== step 2: from step 1 ==
		close(c.closed)
                 ...

		// drain errors
=>1:		for e := range c.errors {
			err = e
		}

		if e := c.client.Close(); e != nil {
			err = e
		}

=>2:
+		c.closeErrorsOnce.Do(func() {
+			close(c.errors)
+		})
	})

== step 3: switch to step4 ==
	return

I mark a possible workflow that would cause data race as step 1~4, which would also cause panic due to writing to a closed channel. Moreover, if handleError is never called, since we drain errors(=>1) before closing the channel(=>2), c.Close() method may stay in the draining for-loop forever(=>1) and never returned.

I believe the initial solution with RWMutex shown below is able to avoid panic

func (c *consumerGroup) handleError(err error, topic string, partition int32) {
        ...
	c.errorsLock.RLock()
	defer c.errorsLock.RUnlock()

	select {
	case <-c.closed:
		// consumer is closed
		return
	default:
	}

	select {
	case c.errors <- err:
	default:
		// no error listener
	}

func (c *consumerGroup) Close() (err error) {
	c.closeOnce.Do(func() {
=>3		close(c.closed)
                 ...
		go func() {
			c.errorsLock.Lock()
			defer c.errorsLock.Unlock()
=>4			close(c.errors)
		}()

	...
	return
}

because:

  • close(c.closed) (=>3) happens before close(c.errors)(=>4)
  • when handleError holds the read lock, it will first judge whether the consumer is closed(case <- c.closed:), if so, it will return here instead of sending err to the closed channel.

napallday avatar Sep 16 '22 04:09 napallday