kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

`reader.Close()` stuck randomly

Open sumerc opened this issue 1 year ago • 1 comments
trafficstars

Problem

This does not happen all the time. Maybe 1 out of 5 or 10 times.

We have consumer application with following Reader config:

kafkaReaderConfig := kafka.ReaderConfig{
		Brokers: config.KafkaHosts,
		Topic:   config.KafkaTopic,
		GroupID:  kafkaGroupId,
		MaxWait:  kafkaMaxWait * time.Second,
		MinBytes: kafkaMinBytes,
		MaxBytes: kafkaMaxBytes,
		Dialer:   kafka.DefaultDialer,
	}

and the ReadMessage loop:

m, err := w.kafkaReader.ReadMessage(ctx)
		if err != nil {
			if errors.Is(err, context.Canceled) {
				break
			}
			...
		}

In a graceful shutdown scenario, what we do is:

  1. we shutdown the HTTP server, so there is no incoming Kafka data.
  2. we cancel the ctx of ReadMessage() upon receiving a signal.
  3. and wait for 10 secs for everything to finish.
  4. After 10 secs, we log the goroutine stacks and it seems graceful shutdown stuck on waiting following call:
func (r *Reader) Close() error {
	atomic.StoreUint32(&r.once, 1)

	r.mutex.Lock()
	closed := r.closed
	r.closed = true
	r.mutex.Unlock()

	r.cancel()
	r.stop()
	r.join.Wait() -> we either stuck here 

	if r.done != nil {
		<-r.done -> or here 
}

sumerc avatar Dec 08 '23 16:12 sumerc

Additinal info:

Every time this happens, I spot a recurring issue: we are waiting inside consumergroup.run. It seems like consumer group cannot be cancelled easily during rebalance?

Either:

goroutine 585773 [semacquire]:
sync.runtime_Semacquire(0x1d90c50?)
	/usr/local/go/src/runtime/sema.go:62 +0x25
sync.(*WaitGroup).Wait(0x0?)
	/usr/local/go/src/sync/waitgroup.go:116 +0x48
github.com/segmentio/kafka-go.(*Reader).unsubscribe(0xc00016d340)
	/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:113 +0x2f
github.com/segmentio/kafka-go.(*Reader).run.func4({0x1493200?, 0xc002a60580?})
	/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:345 +0x95
github.com/segmentio/kafka-go.(*Generation).Start.func1()
	/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:396 +0x2e
created by github.com/segmentio/kafka-go.(*Generation).Start in goroutine 32
	/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:395 +0x15d

goroutine 32 [semacquire]:
sync.runtime_Semacquire(0xc000162440?)
	/usr/local/go/src/runtime/sema.go:62 +0x25
sync.(*WaitGroup).Wait(0x1486ac0?)
	/usr/local/go/src/sync/waitgroup.go:116 +0x48
github.com/segmentio/kafka-go.(*ConsumerGroup).Close(0xc000162360?)
	/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:689 +0x5a
github.com/segmentio/kafka-go.(*Reader).run(0xc00016d340, 0xc000162360)
	/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:310 +0x3c2
created by github.com/segmentio/kafka-go.NewReader in goroutine 1
	/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:742 +0x7bf

or

goroutine 38 [semacquire]:
sync.runtime_Semacquire(0xc0003be200?)
	/usr/local/go/src/runtime/sema.go:62 +0x25
sync.(*WaitGroup).Wait(0x1486ac0?)
	/usr/local/go/src/sync/waitgroup.go:116 +0x48
[github.com/segmentio/kafka-go.(*ConsumerGroup](http://github.com/segmentio/kafka-go.(*ConsumerGroup "‌")).Close(0xc0003be120?)
	/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:689 +0x5a
[github.com/segmentio/kafka-go.(*Reader](http://github.com/segmentio/kafka-go.(*Reader "‌")).run(0xc000127340, 0xc0003be120)
	/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:310 +0x3c2
created by [github.com/segmentio/kafka-go.NewReader](http://github.com/segmentio/kafka-go.NewReader "‌") in goroutine 1
	/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:742 +0x7bf

goroutine 37 [select]:
net.(*Resolver).lookupIPAddr(0x1dac0a0, {0x1492f60?, 0xc000545b90}, {0x12823c2, 0x3}, {0xc00121b140, 0x33})
	/usr/local/go/src/net/lookup.go:332 +0x3fe
net.(*Resolver).internetAddrList(0x1492f60?, {0x1492f60?, 0xc000545b90?}, {0x12823c2, 0x3}, {0xc00121b140?, 0xc0004a5a00?})
	/usr/local/go/src/net/ipsock.go:288 +0x4e5
net.(*Resolver).resolveAddrList(0x1de0860?, {0x1492f60, 0xc000545b90}, {0x12828de, 0x4}, {0x12823c2?, 0x2?}, {0xc00121b140, 0x38}, {0x0, ...})
	/usr/local/go/src/net/dial.go:282 +0x405
net.(*Dialer).DialContext(0xc000255a78, {0x1492f60, 0xc000545b90}, {0x12823c2, 0x3}, {0xc00121b140, 0x38})
	/usr/local/go/src/net/dial.go:488 +0x42c
[github.com/segmentio/kafka-go.(*Dialer](http://github.com/segmentio/kafka-go.(*Dialer "‌")).dialContext(0x1d9f580, {0x1492f60, 0xc000545b90}, {0x12823c2, 0x3}, {0xc00004a00c?, 0x4aef92?})
	/go/pkg/mod/github.com/segmentio/[email protected]/dialer.go:357 +0x19b
[github.com/segmentio/kafka-go.(*Dialer](http://github.com/segmentio/kafka-go.(*Dialer "‌")).connect(0x1d9f580, {0x1492e80?, 0x1de0860?}, {0x12823c2, 0x3}, {0xc00004a00c, 0x38}, {{0x0, 0x0}, {0x0, ...}, ...})
	/go/pkg/mod/github.com/segmentio/[email protected]/dialer.go:278 +0x1ac
[github.com/segmentio/kafka-go.(*Dialer](http://github.com/segmentio/kafka-go.(*Dialer "‌")).DialContext(...)
	/go/pkg/mod/github.com/segmentio/[email protected]/dialer.go:113
[github.com/segmentio/kafka-go.(*Dialer](http://github.com/segmentio/kafka-go.(*Dialer "‌")).Dial(0xc00024fd18?, {0x12823c2?, 0x0?}, {0xc00004a00c?, 0x401?})
	/go/pkg/mod/github.com/segmentio/[email protected]/dialer.go:96 +0x8b
[github.com/segmentio/kafka-go.(*ConsumerGroupConfig](http://github.com/segmentio/kafka-go.(*ConsumerGroupConfig "‌")).Validate.makeConnect.func1(0x1000000010000?, {0xc0000c44a0?, 0x2, 0x0?})
	/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:883 +0x8b
[github.com/segmentio/kafka-go.(*ConsumerGroup](http://github.com/segmentio/kafka-go.(*ConsumerGroup "‌")).coordinator(0xc0003be120)
	/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:903 +0x66
[github.com/segmentio/kafka-go.(*ConsumerGroup](http://github.com/segmentio/kafka-go.(*ConsumerGroup "‌")).leaveGroup(0xc0003be120, {0xc00015f960, 0x63})
	/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:1220 +0x12f
[github.com/segmentio/kafka-go.(*ConsumerGroup](http://github.com/segmentio/kafka-go.(*ConsumerGroup "‌")).run(0xc0003be120)
	/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:737 +0x225
[github.com/segmentio/kafka-go.NewConsumerGroup.func1()](http://github.com/segmentio/kafka-go.NewConsumerGroup.func1() "‌")
	/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:661 +0x1c
created by [github.com/segmentio/kafka-go.NewConsumerGroup](http://github.com/segmentio/kafka-go.NewConsumerGroup "‌") in goroutine 1
	/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:660 +0x168

sumerc avatar Dec 11 '23 09:12 sumerc