kafka-go
kafka-go copied to clipboard
`reader.Close()` stuck randomly
trafficstars
Problem
This does not happen all the time. Maybe 1 out of 5 or 10 times.
We have consumer application with following Reader config:
kafkaReaderConfig := kafka.ReaderConfig{
Brokers: config.KafkaHosts,
Topic: config.KafkaTopic,
GroupID: kafkaGroupId,
MaxWait: kafkaMaxWait * time.Second,
MinBytes: kafkaMinBytes,
MaxBytes: kafkaMaxBytes,
Dialer: kafka.DefaultDialer,
}
and the ReadMessage loop:
m, err := w.kafkaReader.ReadMessage(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
break
}
...
}
In a graceful shutdown scenario, what we do is:
- we shutdown the HTTP server, so there is no incoming Kafka data.
- we cancel the
ctxofReadMessage()upon receiving a signal. - and wait for
10secs for everything to finish. - After
10secs, we log the goroutine stacks and it seems graceful shutdown stuck on waiting following call:
func (r *Reader) Close() error {
atomic.StoreUint32(&r.once, 1)
r.mutex.Lock()
closed := r.closed
r.closed = true
r.mutex.Unlock()
r.cancel()
r.stop()
r.join.Wait() -> we either stuck here
if r.done != nil {
<-r.done -> or here
}
Additinal info:
Every time this happens, I spot a recurring issue: we are waiting inside consumergroup.run. It seems like consumer group cannot be cancelled easily during rebalance?
Either:
goroutine 585773 [semacquire]:
sync.runtime_Semacquire(0x1d90c50?)
/usr/local/go/src/runtime/sema.go:62 +0x25
sync.(*WaitGroup).Wait(0x0?)
/usr/local/go/src/sync/waitgroup.go:116 +0x48
github.com/segmentio/kafka-go.(*Reader).unsubscribe(0xc00016d340)
/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:113 +0x2f
github.com/segmentio/kafka-go.(*Reader).run.func4({0x1493200?, 0xc002a60580?})
/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:345 +0x95
github.com/segmentio/kafka-go.(*Generation).Start.func1()
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:396 +0x2e
created by github.com/segmentio/kafka-go.(*Generation).Start in goroutine 32
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:395 +0x15d
goroutine 32 [semacquire]:
sync.runtime_Semacquire(0xc000162440?)
/usr/local/go/src/runtime/sema.go:62 +0x25
sync.(*WaitGroup).Wait(0x1486ac0?)
/usr/local/go/src/sync/waitgroup.go:116 +0x48
github.com/segmentio/kafka-go.(*ConsumerGroup).Close(0xc000162360?)
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:689 +0x5a
github.com/segmentio/kafka-go.(*Reader).run(0xc00016d340, 0xc000162360)
/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:310 +0x3c2
created by github.com/segmentio/kafka-go.NewReader in goroutine 1
/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:742 +0x7bf
or
goroutine 38 [semacquire]:
sync.runtime_Semacquire(0xc0003be200?)
/usr/local/go/src/runtime/sema.go:62 +0x25
sync.(*WaitGroup).Wait(0x1486ac0?)
/usr/local/go/src/sync/waitgroup.go:116 +0x48
[github.com/segmentio/kafka-go.(*ConsumerGroup](http://github.com/segmentio/kafka-go.(*ConsumerGroup "")).Close(0xc0003be120?)
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:689 +0x5a
[github.com/segmentio/kafka-go.(*Reader](http://github.com/segmentio/kafka-go.(*Reader "")).run(0xc000127340, 0xc0003be120)
/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:310 +0x3c2
created by [github.com/segmentio/kafka-go.NewReader](http://github.com/segmentio/kafka-go.NewReader "") in goroutine 1
/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:742 +0x7bf
goroutine 37 [select]:
net.(*Resolver).lookupIPAddr(0x1dac0a0, {0x1492f60?, 0xc000545b90}, {0x12823c2, 0x3}, {0xc00121b140, 0x33})
/usr/local/go/src/net/lookup.go:332 +0x3fe
net.(*Resolver).internetAddrList(0x1492f60?, {0x1492f60?, 0xc000545b90?}, {0x12823c2, 0x3}, {0xc00121b140?, 0xc0004a5a00?})
/usr/local/go/src/net/ipsock.go:288 +0x4e5
net.(*Resolver).resolveAddrList(0x1de0860?, {0x1492f60, 0xc000545b90}, {0x12828de, 0x4}, {0x12823c2?, 0x2?}, {0xc00121b140, 0x38}, {0x0, ...})
/usr/local/go/src/net/dial.go:282 +0x405
net.(*Dialer).DialContext(0xc000255a78, {0x1492f60, 0xc000545b90}, {0x12823c2, 0x3}, {0xc00121b140, 0x38})
/usr/local/go/src/net/dial.go:488 +0x42c
[github.com/segmentio/kafka-go.(*Dialer](http://github.com/segmentio/kafka-go.(*Dialer "")).dialContext(0x1d9f580, {0x1492f60, 0xc000545b90}, {0x12823c2, 0x3}, {0xc00004a00c?, 0x4aef92?})
/go/pkg/mod/github.com/segmentio/[email protected]/dialer.go:357 +0x19b
[github.com/segmentio/kafka-go.(*Dialer](http://github.com/segmentio/kafka-go.(*Dialer "")).connect(0x1d9f580, {0x1492e80?, 0x1de0860?}, {0x12823c2, 0x3}, {0xc00004a00c, 0x38}, {{0x0, 0x0}, {0x0, ...}, ...})
/go/pkg/mod/github.com/segmentio/[email protected]/dialer.go:278 +0x1ac
[github.com/segmentio/kafka-go.(*Dialer](http://github.com/segmentio/kafka-go.(*Dialer "")).DialContext(...)
/go/pkg/mod/github.com/segmentio/[email protected]/dialer.go:113
[github.com/segmentio/kafka-go.(*Dialer](http://github.com/segmentio/kafka-go.(*Dialer "")).Dial(0xc00024fd18?, {0x12823c2?, 0x0?}, {0xc00004a00c?, 0x401?})
/go/pkg/mod/github.com/segmentio/[email protected]/dialer.go:96 +0x8b
[github.com/segmentio/kafka-go.(*ConsumerGroupConfig](http://github.com/segmentio/kafka-go.(*ConsumerGroupConfig "")).Validate.makeConnect.func1(0x1000000010000?, {0xc0000c44a0?, 0x2, 0x0?})
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:883 +0x8b
[github.com/segmentio/kafka-go.(*ConsumerGroup](http://github.com/segmentio/kafka-go.(*ConsumerGroup "")).coordinator(0xc0003be120)
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:903 +0x66
[github.com/segmentio/kafka-go.(*ConsumerGroup](http://github.com/segmentio/kafka-go.(*ConsumerGroup "")).leaveGroup(0xc0003be120, {0xc00015f960, 0x63})
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:1220 +0x12f
[github.com/segmentio/kafka-go.(*ConsumerGroup](http://github.com/segmentio/kafka-go.(*ConsumerGroup "")).run(0xc0003be120)
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:737 +0x225
[github.com/segmentio/kafka-go.NewConsumerGroup.func1()](http://github.com/segmentio/kafka-go.NewConsumerGroup.func1() "")
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:661 +0x1c
created by [github.com/segmentio/kafka-go.NewConsumerGroup](http://github.com/segmentio/kafka-go.NewConsumerGroup "") in goroutine 1
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:660 +0x168