kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

GoRoutines running readers appear to stall resulting in large lags on the consumer_groups

Open Sparkz0629 opened this issue 1 year ago • 13 comments

Describe the bug We have 30 individual go-routines that we start for individual readers. These read from topics (Not always unique) using unique consumer groupIds. We are seeing scenarios daily where these go-routines appear to stall (This was noticed as we have a separate service that monitors and reports when there is a lag reported for specific consumers groups) There doesnt appear to be a pattern, and appears quite random.

A restart of the microservice (And therefore all 30 processes) fixes the issue, until a couple of hours down the line when we start getting consumer group lag alerts again)

Kafka Version 2.8.0

Using the following kafka reader configs:

func newKafkaReader(config *util.Config, groupID string, topic string) *kafka.Reader {
	return kafka.NewReader(kafka.ReaderConfig{
		Brokers:     url_of_broker,
		GroupID:     groupID,
		Topic:       topic,
		Dialer:      newKafkaDialer(config),
		StartOffset: kafka.LastOffset,
	})
}

Creating the kafkaDialer:

func newKafkaDialer(config *util.Config) *kafka.Dialer {
	caCert, err := ioutil.ReadFile(config.Kafka.CaCert)

	if err != nil {
		fmt.Println(err)
	}

	cert, err := util.LoadX509KeyPair(config.Kafka.UserCert, config.Kafka.UserKey)
	if err != nil {
		fmt.Println(err)
	}

	caCertPool := x509.NewCertPool()
	ok := caCertPool.AppendCertsFromPEM(caCert)
	if !ok {
		fmt.Println("Error appending cert from PEM")
	}

	return &kafka.Dialer{
		Timeout:   20 * time.Second,
		KeepAlive: 20 * time.Second,
		DualStack: true,
		TLS: &tls.Config{
			Certificates:       []tls.Certificate{cert},
			RootCAs:            caCertPool,
			InsecureSkipVerify: true, 
		},
	}
}

Creating and populating the reader config:

groupId := "unique.consumer.group.id"
reader := &service.KafkaReader{
      Config:    config, // This is a config struct populate from a file with our broker strings, cert locations etc
      TopicName: "topic_name",
      GroupId:   groupId,
}

And this is how we read it:

for {
      msg, err := reader.ReadMessage(context.Background())
      if err != nil {
              fmt.Println("Error reading message: ", err.Error())
              return
      }

      // Processing of the message happens here
      // When the process stalls, it does not get to this processing section at all, and there is also no error from the above
     // It also does not break out of this for loop at all, hence the assumption it has stalled
}

Expected behavior I would expect the reader to reliably continue to consume messages when they arrive on the topic

Additional context Total unique Topics: 12 Total unique processes: 30

These are batch processes, so some of them only receive messages every 23-24 hours. These will be bulk publishes of around 40-50k records.

Some of the processes(19 of them) read from the same audit topic which gets around 50-100 messages spread out through out the day with a maximum gap of around 3 hours. They all use unique consumer groups.

These failures/stalls appear random, and have happened on both of the above scenarios.

Reproduce the issue

start up the app that reads from a multi node kafka cluster. While it is running, restart the node that is the leader for your topic. You should see an error similar to:

[6] Not Leader For Partition: the client attempted to send messages to a replica that is not the leader for some partition, the client's metadata are likely out of date

OR

dial tcp xx.xx.xx.xx:yyyyyy: connect: operation timed out

Once this happens, the consumer group will no longer function.

Here is the configuration for the topic:

Topic: topic_name	TopicId: V-JUHTXtSCW3sfUnfCBRNA	PartitionCount: 1	ReplicationFactor: 3	Configs: retention.ms=345600000,message.format.version=3.0-IV1,max.message.bytes=20971520
	Topic: cib-integration.stage.audit	Partition: 0	Leader: 4	Replicas: 1,4,2	Isr: 4,2,1

And here are the stack traces from before the issue:

goroutine 59 [running]:
runtime/pprof.writeGoroutineStacks({0x1029449c0, 0x14000466460})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/runtime/pprof/pprof.go:693 +0x8c
runtime/pprof.writeGoroutine({0x1029449c0, 0x14000466460}, 0x2)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/runtime/pprof/pprof.go:682 +0x3c
runtime/pprof.(*Profile).WriteTo(0x102cf5700, {0x1029449c0, 0x14000466460}, 0x2)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/runtime/pprof/pprof.go:331 +0x158
net/http/pprof.handler.ServeHTTP({0x14000430581, 0x9}, {0x10294ef68, 0x14000466460}, 0x14000424300)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/pprof/pprof.go:253 +0x520
net/http/pprof.Index({0x10294ef68, 0x14000466460}, 0x14000424300)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/pprof/pprof.go:371 +0x104
net/http.HandlerFunc.ServeHTTP(0x10293d2b8, {0x10294ef68, 0x14000466460}, 0x14000424300)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:2047 +0x40
net/http.(*ServeMux).ServeHTTP(0x102d19f00, {0x10294ef68, 0x14000466460}, 0x14000424300)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:2425 +0x18c
net/http.serverHandler.ServeHTTP({0x140001d61c0}, {0x10294ef68, 0x14000466460}, 0x14000424300)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:2879 +0x444
net/http.(*conn).serve(0x1400045e5a0, {0x102951a20, 0x140002c6c60})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:1930 +0xb6c
created by net/http.(*Server).Serve
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:3034 +0x4b8

goroutine 1 [IO wait]:
internal/poll.runtime_pollWait(0x129f1ce98, 0x72)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/runtime/netpoll.go:234 +0xa4
internal/poll.(*pollDesc).wait(0x140000ff418, 0x72, 0x0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x38
internal/poll.(*pollDesc).waitRead(...)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Accept(0x140000ff400)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_unix.go:402 +0x1ec
net.(*netFD).accept(0x140000ff400)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/fd_unix.go:173 +0x2c
net.(*TCPListener).accept(0x14000256d50)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/tcpsock_posix.go:140 +0x2c
net.(*TCPListener).Accept(0x14000256d50)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/tcpsock.go:262 +0x34
net/http.(*Server).Serve(0x140001d61c0, {0x10294ed88, 0x14000256d50})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:3002 +0x37c
net/http.(*Server).ListenAndServe(0x140001d61c0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:2931 +0xb0
net/http.ListenAndServe(...)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:3185
main.main()
	/location/of/app/cmd/ingest-server/main.go:96 +0x8fc

goroutine 4 [select]:
github.com/segmentio/kafka-go.(*Reader).FetchMessage(0x14000374000, {0x1029519b0, 0x140000380a8})
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:826 +0x290
github.com/segmentio/kafka-go.(*Reader).ReadMessage(0x14000374000, {0x1029519b0, 0x140000380a8})
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:791 +0x70
sbsa/kaas/its/pkg/service.(*KafkaReader).ReadMessage(0x1400030e000, {0x1029519b0, 0x140000380a8})
	/location/of/app/pkg/service/kafka.go:192 +0xc8
sbsa/kaas/its/pkg/batch.processStageAudit({0x10294d480, 0x1400030e000}, 0x14000281900, 0x14000308000, {0x140000cf290, 0x140000bf500, 0x0, {{0x0, 0x0}, {0x0, ...}}, ...})
	/location/of/app/pkg/batch/listeners.go:40 +0x54
sbsa/kaas/its/pkg/batch.FileWatchListener(0x14000281900, {{0x14000043d20, 0x1a}, {0x140002c0870, 0xf}, {0x140002c096c, 0x3}, {0x140002c0905, 0x9}, {0x140002c0980, ...}, ...}, ...)
	/location/of/app/pkg/batch/listeners.go:35 +0x34c
created by main.main
	/location/of/app/cmd/ingest-server/main.go:89 +0xa40

goroutine 34 [select]:
github.com/segmentio/kafka-go.(*ConsumerGroup).nextGeneration(0x14000376100, {0x0, 0x0})
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:859 +0x7cc
github.com/segmentio/kafka-go.(*ConsumerGroup).run(0x14000376100)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:721 +0x44
github.com/segmentio/kafka-go.NewConsumerGroup.func1(0x14000376100)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:659 +0x28
created by github.com/segmentio/kafka-go.NewConsumerGroup
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:658 +0x168

goroutine 35 [select]:
github.com/segmentio/kafka-go.(*ConsumerGroup).Next(0x14000376100, {0x102951978, 0x1400034e080})
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:700 +0xa4
github.com/segmentio/kafka-go.(*Reader).run(0x14000374000, 0x14000376100)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:320 +0x338
created by github.com/segmentio/kafka-go.NewReader
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:742 +0x5e0

goroutine 53 [select]:
github.com/segmentio/kafka-go.(*Generation).heartbeatLoop.func1({0x102951dd8, 0x14000418100})
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:477 +0x198
github.com/segmentio/kafka-go.(*Generation).Start.func1(0x14000416018, 0x14000418100)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:396 +0x3c
created by github.com/segmentio/kafka-go.(*Generation).Start
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:395 +0x130

goroutine 54 [IO wait]:
internal/poll.runtime_pollWait(0x129f1cf80, 0x72)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/runtime/netpoll.go:234 +0xa4
internal/poll.(*pollDesc).wait(0x140000ff718, 0x72, 0x0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x38
internal/poll.(*pollDesc).waitRead(...)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x140000ff700, {0x1400043a000, 0x268b, 0x268b})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_unix.go:167 +0x1dc
net.(*netFD).Read(0x140000ff700, {0x1400043a000, 0x268b, 0x268b})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/fd_posix.go:56 +0x44
net.(*conn).Read(0x1400040a018, {0x1400043a000, 0x268b, 0x268b})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/net.go:183 +0x4c
crypto/tls.(*atLeastReader).Read(0x14000416408, {0x1400043a000, 0x268b, 0x268b})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/crypto/tls/conn.go:777 +0x58
bytes.(*Buffer).ReadFrom(0x1400042a278, {0x102942f40, 0x14000416408})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/bytes/buffer.go:204 +0xa4
crypto/tls.(*Conn).readFromUntil(0x1400042a000, {0x1030b4a18, 0x1400040a018}, 0x5)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/crypto/tls/conn.go:799 +0xe0
crypto/tls.(*Conn).readRecordOrCCS(0x1400042a000, 0x0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/crypto/tls/conn.go:606 +0xf4
crypto/tls.(*Conn).readRecord(...)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/crypto/tls/conn.go:574
crypto/tls.(*Conn).Read(0x1400042a000, {0x14000486000, 0x1000, 0x1000})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/crypto/tls/conn.go:1277 +0x164
bufio.(*Reader).fill(0x14000488030)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/bufio/bufio.go:101 +0x10c
bufio.(*Reader).Peek(0x14000488030, 0x8)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/bufio/bufio.go:139 +0x6c
github.com/segmentio/kafka-go.(*Conn).peekResponseSizeAndID(0x14000488000)
	/location/of/app/vendor/github.com/segmentio/kafka-go/conn.go:1250 +0x38
github.com/segmentio/kafka-go.(*Conn).waitResponse(0x14000488000, 0x14000488130, 0xa)
	/location/of/app/vendor/github.com/segmentio/kafka-go/conn.go:1341 +0xec
github.com/segmentio/kafka-go.(*Conn).ReadBatchWith(0x14000488000, {0x1, 0xf4240, 0x0, 0x0})
	/location/of/app/vendor/github.com/segmentio/kafka-go/conn.go:839 +0x64c
github.com/segmentio/kafka-go.(*reader).read(0x14000125ed0, {0x102951978, 0x1400040e0c0}, 0x22ff, 0x14000488000)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:1467 +0x104
github.com/segmentio/kafka-go.(*reader).run(0x14000125ed0, {0x102951978, 0x1400040e0c0}, 0x22ff)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:1310 +0x658
github.com/segmentio/kafka-go.(*Reader).start.func1(0x14000374000, {0x102951978, 0x1400040e0c0}, {{0x14000043680, 0x1b}, 0x0}, 0x22fe, 0x14000374138)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:1211 +0x12c
created by github.com/segmentio/kafka-go.(*Reader).start
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:1191 +0x178

goroutine 55 [select]:
github.com/segmentio/kafka-go.(*Reader).commitLoopImmediate(0x14000374000, {0x102951dd8, 0x14000418100}, 0x14000418100)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:213 +0x148
github.com/segmentio/kafka-go.(*Reader).commitLoop(0x14000374000, {0x102951dd8, 0x14000418100}, 0x14000418100)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:296 +0x148
github.com/segmentio/kafka-go.(*Reader).run.func3({0x102951dd8, 0x14000418100})
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:350 +0x40
github.com/segmentio/kafka-go.(*Generation).Start.func1(0x14000416030, 0x14000418100)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:396 +0x3c
created by github.com/segmentio/kafka-go.(*Generation).Start
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:395 +0x130

goroutine 56 [select]:
github.com/segmentio/kafka-go.(*Reader).run.func4({0x102951dd8, 0x14000418100})
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:354 +0x98
github.com/segmentio/kafka-go.(*Generation).Start.func1(0x140004040e0, 0x14000418100)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:396 +0x3c
created by github.com/segmentio/kafka-go.(*Generation).Start
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:395 +0x130

goroutine 60 [IO wait]:
internal/poll.runtime_pollWait(0x129f1cbe0, 0x72)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/runtime/netpoll.go:234 +0xa4
internal/poll.(*pollDesc).wait(0x14000418618, 0x72, 0x0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x38
internal/poll.(*pollDesc).waitRead(...)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x14000418600, {0x1400048c000, 0x1000, 0x1000})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_unix.go:167 +0x1dc
net.(*netFD).Read(0x14000418600, {0x1400048c000, 0x1000, 0x1000})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/fd_posix.go:56 +0x44
net.(*conn).Read(0x1400040a038, {0x1400048c000, 0x1000, 0x1000})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/net.go:183 +0x4c
net/http.(*connReader).Read(0x14000411770, {0x1400048c000, 0x1000, 0x1000})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:780 +0x260
bufio.(*Reader).fill(0x14000407440)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/bufio/bufio.go:101 +0x10c
bufio.(*Reader).ReadSlice(0x14000407440, 0xa)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/bufio/bufio.go:360 +0x38
bufio.(*Reader).ReadLine(0x14000407440)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/bufio/bufio.go:389 +0x30
net/textproto.(*Reader).readLineSlice(0x140004117d0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/textproto/reader.go:57 +0x80
net/textproto.(*Reader).ReadLine(...)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/textproto/reader.go:38
net/http.readRequest(0x14000407440)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/request.go:1029 +0x74
net/http.(*conn).readRequest(0x1400045e640, {0x102951978, 0x1400040e900})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:966 +0x224
net/http.(*conn).serve(0x1400045e640, {0x102951a20, 0x140002c6c60})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:1856 +0x84c
created by net/http.(*Server).Serve
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:3034 +0x4b8

goroutine 61 [IO wait]:
internal/poll.runtime_pollWait(0x129f1ccc8, 0x72)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/runtime/netpoll.go:234 +0xa4
internal/poll.(*pollDesc).wait(0x14000418598, 0x72, 0x0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x38
internal/poll.(*pollDesc).waitRead(...)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x14000418580, {0x14000411841, 0x1, 0x1})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_unix.go:167 +0x1dc
net.(*netFD).Read(0x14000418580, {0x14000411841, 0x1, 0x1})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/fd_posix.go:56 +0x44
net.(*conn).Read(0x1400040a030, {0x14000411841, 0x1, 0x1})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/net.go:183 +0x4c
net/http.(*connReader).backgroundRead(0x14000411830)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:672 +0x50
created by net/http.(*connReader).startBackgroundRead
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:668 +0xc4

and once the issue starts happening:

goroutine 215 [running]:
runtime/pprof.writeGoroutineStacks({0x1036fec80, 0x140001e0000})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/runtime/pprof/pprof.go:693 +0x80
runtime/pprof.writeGoroutine({0x1036fec80, 0x140001e0000}, 0x2)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/runtime/pprof/pprof.go:682 +0x44
runtime/pprof.(*Profile).WriteTo(0x103ae1700, {0x1036fec80, 0x140001e0000}, 0x2)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/runtime/pprof/pprof.go:331 +0x7c
net/http/pprof.handler.ServeHTTP({0x1400003a2b1, 0x9}, {0x103708a50, 0x140001e0000}, 0x140003fe200)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/pprof/pprof.go:253 +0x470
net/http/pprof.Index({0x103708a50, 0x140001e0000}, 0x140003fe200)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/pprof/pprof.go:371 +0xf0
net/http.HandlerFunc.ServeHTTP(0x1036f76e8, {0x103708a50, 0x140001e0000}, 0x140003fe200)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:2047 +0x40
net/http.(*ServeMux).ServeHTTP(0x103b05f00, {0x103708a50, 0x140001e0000}, 0x140003fe200)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:2425 +0x174
net/http.serverHandler.ServeHTTP({0x140001e01c0}, {0x103708a50, 0x140001e0000}, 0x140003fe200)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:2879 +0x468
net/http.(*conn).serve(0x140003e8000, {0x10370ad58, 0x140002b2d00})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:1930 +0x17d0
created by net/http.(*Server).Serve
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:3034 +0x828

goroutine 1 [IO wait]:
internal/poll.runtime_pollWait(0x12ad34e98, 0x72)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/runtime/netpoll.go:234 +0x60
internal/poll.(*pollDesc).wait(0x140000d8118, 0x72, 0x0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x90
internal/poll.(*pollDesc).waitRead(0x140000d8118, 0x0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_poll_runtime.go:89 +0x3c
internal/poll.(*FD).Accept(0x140000d8100)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_unix.go:402 +0x330
net.(*netFD).accept(0x140000d8100)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/fd_unix.go:173 +0x4c
net.(*TCPListener).accept(0x140000de018)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/tcpsock_posix.go:140 +0x44
net.(*TCPListener).Accept(0x140000de018)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/tcpsock.go:262 +0x58
net/http.(*Server).Serve(0x140001e01c0, {0x103708870, 0x140000de018})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:3002 +0x40c
net/http.(*Server).ListenAndServe(0x140001e01c0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:2931 +0x154
net/http.ListenAndServe({0x103590673, 0xe}, {0x0, 0x0})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:3185 +0x100
main.main()
	/location/of/app/cmd/ingest-server/main.go:96 +0xd48

goroutine 35 [select, 4 minutes]:
github.com/segmentio/kafka-go.(*Reader).FetchMessage(0x140003fc8c0, {0x10370ad90, 0x14000138008})
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:826 +0x1d4
github.com/segmentio/kafka-go.(*Reader).ReadMessage(0x140003fc8c0, {0x10370ad90, 0x14000138008})
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:791 +0x7c
sbsa/kaas/its/pkg/service.(*KafkaReader).ReadMessage(0x14000278000, {0x10370ad90, 0x14000138008})
	/location/of/app/pkg/service/kafka.go:197 +0x370
sbsa/kaas/its/pkg/batch.processStageAudit({0x103706f68, 0x14000278000}, 0x140002a5900, 0x1400007e000, {0x14000157320, 0x1400012f500, 0x0, {{0x0, 0x0}, {0x0, ...}}, ...})
	/location/of/app/pkg/batch/listeners.go:40 +0x74
sbsa/kaas/its/pkg/batch.FileWatchListener(0x140002a5900, {{0x1400014bda0, 0x1a}, {0x140002d07d0, 0xf}, {0x140002d08cc, 0x3}, {0x140002d0865, 0x9}, {0x140002d08e0, ...}, ...}, ...)
	/location/of/app/pkg/batch/listeners.go:35 +0x604
created by main.main
	/location/of/app/cmd/ingest-server/main.go:89 +0xeb8

goroutine 3 [select, 3 minutes]:
github.com/segmentio/kafka-go.(*ConsumerGroup).nextGeneration(0x1400034e100, {0x14000188150, 0x68})
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:859 +0x908
github.com/segmentio/kafka-go.(*ConsumerGroup).run(0x1400034e100)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:721 +0x58
github.com/segmentio/kafka-go.NewConsumerGroup.func1(0x1400034e100)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:659 +0x28
created by github.com/segmentio/kafka-go.NewConsumerGroup
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:658 +0x244

goroutine 4 [select, 3 minutes]:
github.com/segmentio/kafka-go.(*ConsumerGroup).Next(0x1400034e100, {0x10370ad58, 0x14000328080})
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:700 +0xfc
github.com/segmentio/kafka-go.(*Reader).run(0x1400034c000, 0x1400034e100)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:320 +0x170
created by github.com/segmentio/kafka-go.NewReader
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:742 +0xa5c

goroutine 142 [select]:
github.com/segmentio/kafka-go.(*Generation).heartbeatLoop.func1({0x10370b1b8, 0x14000314880})
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:477 +0x248
github.com/segmentio/kafka-go.(*Generation).Start.func1(0x1400025f320, 0x14000314880)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:396 +0x44
created by github.com/segmentio/kafka-go.(*Generation).Start
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:395 +0x114

goroutine 53 [select, 3 minutes]:
github.com/segmentio/kafka-go.(*ConsumerGroup).nextGeneration(0x140003fe100, {0x14000850150, 0x68})
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:859 +0x908
github.com/segmentio/kafka-go.(*ConsumerGroup).run(0x140003fe100)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:721 +0x58
github.com/segmentio/kafka-go.NewConsumerGroup.func1(0x140003fe100)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:659 +0x28
created by github.com/segmentio/kafka-go.NewConsumerGroup
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:658 +0x244

goroutine 54 [select, 3 minutes]:
github.com/segmentio/kafka-go.(*ConsumerGroup).Next(0x140003fe100, {0x10370ad58, 0x14000390880})
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:700 +0xfc
github.com/segmentio/kafka-go.(*Reader).run(0x140003fc000, 0x140003fe100)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:320 +0x170
created by github.com/segmentio/kafka-go.NewReader
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:742 +0xa5c

goroutine 143 [IO wait]:
internal/poll.runtime_pollWait(0x12ad34af8, 0x72)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/runtime/netpoll.go:234 +0x60
internal/poll.(*pollDesc).wait(0x14000186798, 0x72, 0x0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x90
internal/poll.(*pollDesc).waitRead(0x14000186798, 0x0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_poll_runtime.go:89 +0x3c
internal/poll.(*FD).Read(0x14000186780, {0x14000520000, 0x268b, 0x268b})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_unix.go:167 +0x34c
net.(*netFD).Read(0x14000186780, {0x14000520000, 0x268b, 0x268b})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/fd_posix.go:56 +0x54
net.(*conn).Read(0x1400012c0d8, {0x14000520000, 0x268b, 0x268b})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/net.go:183 +0x78
crypto/tls.(*atLeastReader).Read(0x140003941f8, {0x14000520000, 0x268b, 0x268b})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/crypto/tls/conn.go:777 +0xa0
bytes.(*Buffer).ReadFrom(0x140002b1078, {0x1036fd200, 0x140003941f8})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/bytes/buffer.go:204 +0x10c
crypto/tls.(*Conn).readFromUntil(0x140002b0e00, {0x12ad35068, 0x1400012c0d8}, 0x5)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/crypto/tls/conn.go:799 +0x148
crypto/tls.(*Conn).readRecordOrCCS(0x140002b0e00, 0x0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/crypto/tls/conn.go:606 +0x19c
crypto/tls.(*Conn).readRecord(0x140002b0e00)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/crypto/tls/conn.go:574 +0x30
crypto/tls.(*Conn).Read(0x140002b0e00, {0x140003c9000, 0x1000, 0x1000})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/crypto/tls/conn.go:1277 +0x160
bufio.(*Reader).fill(0x140006387b0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/bufio/bufio.go:101 +0x25c
bufio.(*Reader).Peek(0x140006387b0, 0x8)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/bufio/bufio.go:139 +0x140
github.com/segmentio/kafka-go.(*Conn).peekResponseSizeAndID(0x14000638780)
	/location/of/app/vendor/github.com/segmentio/kafka-go/conn.go:1250 +0x54
github.com/segmentio/kafka-go.(*Conn).waitResponse(0x14000638780, 0x140006388b0, 0x19)
	/location/of/app/vendor/github.com/segmentio/kafka-go/conn.go:1341 +0xac
github.com/segmentio/kafka-go.(*Conn).ReadBatchWith(0x14000638780, {0x1, 0xf4240, 0x0, 0x0})
	/location/of/app/vendor/github.com/segmentio/kafka-go/conn.go:839 +0xb70
github.com/segmentio/kafka-go.(*reader).read(0x14000373ee0, {0x10370ad58, 0x140000b0e40}, 0x2323, 0x14000638780)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:1467 +0x160
github.com/segmentio/kafka-go.(*reader).run(0x14000373ee0, {0x10370ad58, 0x140000b0e40}, 0x2323)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:1310 +0x390
github.com/segmentio/kafka-go.(*Reader).start.func1(0x140003fc000, {0x10370ad58, 0x140000b0e40}, {{0x1400014b700, 0x1b}, 0x0}, 0x2322, 0x140003fc138)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:1211 +0x2a0
created by github.com/segmentio/kafka-go.(*Reader).start
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:1191 +0x22c

goroutine 119 [select, 3 minutes]:
github.com/segmentio/kafka-go.(*ConsumerGroup).Next(0x140002c2200, {0x10370ad58, 0x14000390440})
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:700 +0xfc
github.com/segmentio/kafka-go.(*Reader).run(0x140003fc8c0, 0x140002c2200)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:320 +0x170
created by github.com/segmentio/kafka-go.NewReader
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:742 +0xa5c

goroutine 118 [select, 3 minutes]:
github.com/segmentio/kafka-go.(*ConsumerGroup).nextGeneration(0x140002c2200, {0x1400004a1c0, 0x68})
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:859 +0x908
github.com/segmentio/kafka-go.(*ConsumerGroup).run(0x140002c2200)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:721 +0x58
github.com/segmentio/kafka-go.NewConsumerGroup.func1(0x140002c2200)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:659 +0x28
created by github.com/segmentio/kafka-go.NewConsumerGroup
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:658 +0x244

goroutine 159 [IO wait]:
internal/poll.runtime_pollWait(0x12ad34be0, 0x72)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/runtime/netpoll.go:234 +0x60
internal/poll.(*pollDesc).wait(0x140003c0298, 0x72, 0x0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x90
internal/poll.(*pollDesc).waitRead(0x140003c0298, 0x0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_poll_runtime.go:89 +0x3c
internal/poll.(*FD).Read(0x140003c0280, {0x140001d6461, 0x1, 0x1})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_unix.go:167 +0x34c
net.(*netFD).Read(0x140003c0280, {0x140001d6461, 0x1, 0x1})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/fd_posix.go:56 +0x54
net.(*conn).Read(0x140000c8000, {0x140001d6461, 0x1, 0x1})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/net.go:183 +0x78
net/http.(*connReader).backgroundRead(0x140001d6450)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:672 +0x84
created by net/http.(*connReader).startBackgroundRead
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:668 +0x100

goroutine 144 [select, 3 minutes]:
github.com/segmentio/kafka-go.(*Reader).commitLoopImmediate(0x140003fc000, {0x10370b1b8, 0x14000314880}, 0x14000314880)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:213 +0x114
github.com/segmentio/kafka-go.(*Reader).commitLoop(0x140003fc000, {0x10370b1b8, 0x14000314880}, 0x14000314880)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:296 +0xf0
github.com/segmentio/kafka-go.(*Reader).run.func3({0x10370b1b8, 0x14000314880})
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:350 +0x4c
github.com/segmentio/kafka-go.(*Generation).Start.func1(0x1400025f338, 0x14000314880)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:396 +0x44
created by github.com/segmentio/kafka-go.(*Generation).Start
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:395 +0x114

goroutine 145 [select, 3 minutes]:
github.com/segmentio/kafka-go.(*Reader).run.func4({0x10370b1b8, 0x14000314880})
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:354 +0xbc
github.com/segmentio/kafka-go.(*Generation).Start.func1(0x14000518490, 0x14000314880)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:396 +0x44
created by github.com/segmentio/kafka-go.(*Generation).Start
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:395 +0x114

goroutine 121 [select]:
github.com/segmentio/kafka-go.(*Generation).heartbeatLoop.func1({0x10370b1b8, 0x14000186080})
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:477 +0x248
github.com/segmentio/kafka-go.(*Generation).Start.func1(0x140003c4120, 0x14000186080)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:396 +0x44
created by github.com/segmentio/kafka-go.(*Generation).Start
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:395 +0x114

goroutine 122 [select, 3 minutes]:
github.com/segmentio/kafka-go.(*Reader).commitLoopImmediate(0x1400034c000, {0x10370b1b8, 0x14000186080}, 0x14000186080)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:213 +0x114
github.com/segmentio/kafka-go.(*Reader).commitLoop(0x1400034c000, {0x10370b1b8, 0x14000186080}, 0x14000186080)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:296 +0xf0
github.com/segmentio/kafka-go.(*Reader).run.func3({0x10370b1b8, 0x14000186080})
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:350 +0x4c
github.com/segmentio/kafka-go.(*Generation).Start.func1(0x140003c4138, 0x14000186080)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:396 +0x44
created by github.com/segmentio/kafka-go.(*Generation).Start
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:395 +0x114

goroutine 123 [select, 3 minutes]:
github.com/segmentio/kafka-go.(*Reader).run.func4({0x10370b1b8, 0x14000186080})
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:354 +0xbc
github.com/segmentio/kafka-go.(*Generation).Start.func1(0x140000320e0, 0x14000186080)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:396 +0x44
created by github.com/segmentio/kafka-go.(*Generation).Start
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:395 +0x114

goroutine 211 [select]:
github.com/segmentio/kafka-go.(*Generation).heartbeatLoop.func1({0x10370b1b8, 0x14000314b00})
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:477 +0x248
github.com/segmentio/kafka-go.(*Generation).Start.func1(0x1400025f380, 0x14000314b00)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:396 +0x44
created by github.com/segmentio/kafka-go.(*Generation).Start
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:395 +0x114

goroutine 212 [select, 3 minutes]:
github.com/segmentio/kafka-go.(*Reader).commitLoopImmediate(0x140003fc8c0, {0x10370b1b8, 0x14000314b00}, 0x14000314b00)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:213 +0x114
github.com/segmentio/kafka-go.(*Reader).commitLoop(0x140003fc8c0, {0x10370b1b8, 0x14000314b00}, 0x14000314b00)
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:296 +0xf0
github.com/segmentio/kafka-go.(*Reader).run.func3({0x10370b1b8, 0x14000314b00})
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:350 +0x4c
github.com/segmentio/kafka-go.(*Generation).Start.func1(0x1400025f398, 0x14000314b00)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:396 +0x44
created by github.com/segmentio/kafka-go.(*Generation).Start
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:395 +0x114

goroutine 213 [select, 3 minutes]:
github.com/segmentio/kafka-go.(*Reader).run.func4({0x10370b1b8, 0x14000314b00})
	/location/of/app/vendor/github.com/segmentio/kafka-go/reader.go:354 +0xbc
github.com/segmentio/kafka-go.(*Generation).Start.func1(0x14000518520, 0x14000314b00)
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:396 +0x44
created by github.com/segmentio/kafka-go.(*Generation).Start
	/location/of/app/vendor/github.com/segmentio/kafka-go/consumergroup.go:395 +0x114

goroutine 216 [IO wait]:
internal/poll.runtime_pollWait(0x12ad34f80, 0x72)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/runtime/netpoll.go:234 +0x60
internal/poll.(*pollDesc).wait(0x140003c0318, 0x72, 0x0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x90
internal/poll.(*pollDesc).waitRead(0x140003c0318, 0x0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_poll_runtime.go:89 +0x3c
internal/poll.(*FD).Read(0x140003c0300, {0x140002a0000, 0x1000, 0x1000})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/internal/poll/fd_unix.go:167 +0x34c
net.(*netFD).Read(0x140003c0300, {0x140002a0000, 0x1000, 0x1000})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/fd_posix.go:56 +0x54
net.(*conn).Read(0x140000c8010, {0x140002a0000, 0x1000, 0x1000})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/net.go:183 +0x78
net/http.(*connReader).Read(0x14000200660, {0x140002a0000, 0x1000, 0x1000})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:780 +0x1e4
bufio.(*Reader).fill(0x140004e00c0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/bufio/bufio.go:101 +0x25c
bufio.(*Reader).ReadSlice(0x140004e00c0, 0xa)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/bufio/bufio.go:360 +0x360
bufio.(*Reader).ReadLine(0x140004e00c0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/bufio/bufio.go:389 +0x4c
net/textproto.(*Reader).readLineSlice(0x140002006c0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/textproto/reader.go:57 +0x70
net/textproto.(*Reader).ReadLine(0x140002006c0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/textproto/reader.go:38 +0x3c
net/http.readRequest(0x140004e00c0)
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/request.go:1029 +0x74
net/http.(*conn).readRequest(0x140003e80a0, {0x10370ad58, 0x1400043ad40})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:966 +0x400
net/http.(*conn).serve(0x140003e80a0, {0x10370ad58, 0x1400043ad40})
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:1856 +0xb94
created by net/http.(*Server).Serve
	/opt/homebrew/Cellar/go/1.17.6/libexec/src/net/http/server.go:3034 +0x828

Sparkz0629 avatar Jul 25 '22 08:07 Sparkz0629

We're seeing the exact same thing. Randomly a consumer will stop dead, and we have to roll the instances to get it to come back to life. We're investigating a switch back to Sarama at this point because it seems this issue has been in the driver for a while.

image

steve-gray avatar Jul 28 '22 00:07 steve-gray

Hi! Thanks for the detailed report!

Would you be able to also try https://github.com/rhansen2/kafka-go/tree/consumer-group-client to see if it reproduced with as well?

It's still a WIP but may help us narrow down what's happening.

rhansen2 avatar Aug 02 '22 22:08 rhansen2

Hi! Thanks for the detailed report!

Would you be able to also try https://github.com/rhansen2/kafka-go/tree/consumer-group-client to see if it reproduced with as well?

It's still a WIP but may help us narrow down what's happening.

Thanks @rhansen2 .

Ive been trying to use your forked repo instead, but i keep getting the following:

go: finding module for package github.com/rhansen2/kafka-go
go: downloading github.com/rhansen2/kafka-go v0.4.16
go: found github.com/rhansen2/kafka-go in github.com/rhansen2/kafka-go v0.4.16
go: sbsa/kaas/its/pkg/service imports
        github.com/rhansen2/kafka-go: github.com/rhansen2/[email protected]: parsing go.mod:
        module declares its path as: github.com/segmentio/kafka-go
                but was required as: github.com/rhansen2/kafka-go

Im struggling to get this sorted, even when following additional online suggestions

Sparkz0629 avatar Aug 04 '22 07:08 Sparkz0629

Ive added the following to the bottom of my go.mod file:

replace github.com/segmentio/kafka-go v0.4.27 => github.com/rhansen2/kafka-go consumer-group-client

When running a go mod vendor it changed it to:

replace github.com/segmentio/kafka-go v0.4.27 => github.com/rhansen2/kafka-go v0.4.17-0.20220729022204-94e492457379

No errors, but i'm not sure if its actually using the forked repo and also if its using the correct branch.

However, will test it and see if there are any differences

Sparkz0629 avatar Aug 04 '22 09:08 Sparkz0629

@rhansen2 , I have tested with the above changes, and still appears to do the same thing.

I restarted all the kafka node that is the leader for my topic and this results in my service no longer consuming messages (lag increases), although the consumer still shows as connected (can't delete the consumer-group) it no longer consumes any messages.

Also, if i configure it to connect to a single instance in the cluster, then if that instance goes down it does not reconnect once it is up again, even though i have my ReadMessage call in an endless for loop (With reconnects)

func (kr *KafkaReader) ReadMessage(ctx context.Context) ([]byte, error) {
	msg, err := kr.kreader.ReadMessage(ctx)
	if err != nil {
		fmt.Println(err)
		fmt.Println("Establishing new reader kafka connection")
		kr.kreader = newKafkaReader(kr.Config, kr.GroupId, kr.TopicName)
		msg, err = kr.kreader.ReadMessage(ctx)

		if err != nil {
			return nil, err
		}
	}
	return msg.Value, nil
}

In this case the consumer group is completely dead and deletable.

Sparkz0629 avatar Aug 04 '22 13:08 Sparkz0629

I had the same problem

wifiwang777 avatar Aug 10 '22 03:08 wifiwang777

An update on the above, we switched over to the franz driver and this issue went away, which limits the issue seemingly to either this driver or our usage of it. I suppose @Sparkz0629 if you've still got a workload with this, it might be good to catch a pprof goroutine dump of stacks at two points:

  • when its all working
  • when its stalled

From my own analysis of the problem, I'm kind of leaning towards the problem being some activity/timer/routine for this driver exits its loop/stops firing. I always looked for hung goroutines with long waits on mutexes/selects etc, but nothing leapt out.

steve-gray avatar Aug 13 '22 11:08 steve-gray

@steve-gray can you please details what you have done. We might experienced the same thing at the moment and interested to know exactly which changes/modification or driver you use. When you said use franz driver, I have no clue hence this question, forgive me if I am ignorant but I did read the whole thread.

Kind regards

sunshine69 avatar Aug 20 '22 23:08 sunshine69

I think you mean you use this go kafka stack https://github.com/twmb/franz-go

sunshine69 avatar Aug 22 '22 09:08 sunshine69

Hi @Sparkz0629 thanks for your patience and for trying that branch.

Unfortunately I'm struggling to reproduce this issue locally but I am still looking into it.

Could you possibly provide the logs from the reader when the stall occurs?

Thanks!

rhansen2 avatar Aug 26 '22 15:08 rhansen2

@rhansen2 , I would be happy to give any logs. Is there something specific, because i dont get any verbose logs at all. Literally just the logs that were posted in initial issue raised.

Sparkz0629 avatar Aug 26 '22 18:08 Sparkz0629

@Sparkz0629 Are you settings a logger on the Reader you're using? Something like https://pkg.go.dev/github.com/segmentio/kafka-go#readme-logging ?

With the logger configured you should see pretty chatty output describe what the Reader is doing.

rhansen2 avatar Aug 26 '22 19:08 rhansen2

Hi @Sparkz0629,

Just wanted to check in to see if you had any updates or additional information on this issue?

rhansen2 avatar Sep 21 '22 16:09 rhansen2

Hi @rhansen2 My apologies for the delay, i someone missed this. I will try and get some additional logs for you and send them through.

Sparkz0629 avatar Sep 23 '22 02:09 Sparkz0629

Hi @rhansen2 ,

Thanks for the info on the reader logger.

As requested, here you go:



entering loop for consumer group, "CONSUMER_GROUP_ID"

joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-c22e4a10-d1b9-4254-b3fe-5090a0ffb323 in generation 9
selected as leader for group, "CONSUMER_GROUP_ID"

using 'range' balancer to assign group, "CONSUMER_GROUP_ID"
found member: main@localhostname (github.com/segmentio/kafka-go)-c22e4a10-d1b9-4254-b3fe-5090a0ffb323/[]byte(nil)
found topic/partition: TOPIC_NAME/0
assigned member/topic/partitions main@localhostname (github.com/segmentio/kafka-go)-c22e4a10-d1b9-4254-b3fe-5090a0ffb323/TOPIC_NAME/[0]
joinGroup succeeded for response, "CONSUMER_GROUP_ID".  generationID=9, memberID=main@localhostname (github.com/segmentio/kafka-go)-c22e4a10-d1b9-4254-b3fe-5090a0ffb323
Joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-c22e4a10-d1b9-4254-b3fe-5090a0ffb323 in generation 9
Syncing 1 assignments for generation 9 as member main@localhostname (github.com/segmentio/kafka-go)-c22e4a10-d1b9-4254-b3fe-5090a0ffb323
sync group finished for group, "CONSUMER_GROUP_ID"
subscribed to topics and partitions: map[{topic:TOPIC_NAME partition:0}:40334]
initializing kafka reader for partition 0 of TOPIC_NAME starting at offset 40334
started heartbeat for group, "CONSUMER_GROUP_ID" [3s]
started commit for group "CONSUMER_GROUP_ID"

the kafka reader for partition 0 of TOPIC_NAME is seeking to offset 40334
no messages received from kafka within the allocated time for partition 0 of TOPIC_NAME at offset 40334
no messages received from kafka within the allocated time for partition 0 of TOPIC_NAME at offset 40334



########### Here I placed a messaged on the topic for consumption ###########



no messages received from kafka within the allocated time for partition 0 of TOPIC_NAME at offset 40334
committed offsets for group "CONSUMER_GROUP_ID": 
        topic: TOPIC_NAME
                partition 0: 40335

########### The below reads are the reader consuming some additional messages that were placed on the topic as a result of processes triggered by the above message ###########

committed offsets for group "CONSUMER_GROUP_ID": 
        topic: TOPIC_NAME
                partition 0: 40336
committed offsets for group "CONSUMER_GROUP_ID": 
        topic: TOPIC_NAME
                partition 0: 40337

########### Here the reader goes back to polling the topic ###########

no messages received from kafka within the allocated time for partition 0 of TOPIC_NAME at offset 40337
no messages received from kafka within the allocated time for partition 0 of TOPIC_NAME at offset 40337

########### Here I restart all 5 kafka nodes at once ###########


stopped heartbeat for group "CONSUMER_GROUP_ID"

stopped commit for group "CONSUMER_GROUP_ID"

no messages received from kafka within the allocated time for partition 0 of TOPIC_NAME at offset 40337
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
Leaving group "CONSUMER_GROUP_ID", member main@localhostname (github.com/segmentio/kafka-go)-c22e4a10-d1b9-4254-b3fe-5090a0ffb323
dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
Establishing new reader kafka connection
entering loop for consumer group, "CONSUMER_GROUP_ID"

Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
Error on message: dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
dial tcp IP_OF_KAKFA:PORT_OF_KAFKA: connect: operation timed out
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Establishing new reader kafka connection
entering loop for consumer group, "CONSUMER_GROUP_ID"

Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Error on message: [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Establishing new reader kafka connection
entering loop for consumer group, "CONSUMER_GROUP_ID"

Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Error on message: [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Unable to establish connection to consumer group coordinator for group "CONSUMER_GROUP_ID": [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
[15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
Failed to join group "CONSUMER_GROUP_ID": [16] Not Coordinator For Group: the broker returns this error code if it receives an offset fetch or commit request for a group that it is not a coordinator for
[16] Not Coordinator For Group: the broker returns this error code if it receives an offset fetch or commit request for a group that it is not a coordinator for
Failed to join group "CONSUMER_GROUP_ID": [16] Not Coordinator For Group: the broker returns this error code if it receives an offset fetch or commit request for a group that it is not a coordinator for
[16] Not Coordinator For Group: the broker returns this error code if it receives an offset fetch or commit request for a group that it is not a coordinator for
Failed to join group "CONSUMER_GROUP_ID": [16] Not Coordinator For Group: the broker returns this error code if it receives an offset fetch or commit request for a group that it is not a coordinator for
[16] Not Coordinator For Group: the broker returns this error code if it receives an offset fetch or commit request for a group that it is not a coordinator for
Failed to join group "CONSUMER_GROUP_ID": [16] Not Coordinator For Group: the broker returns this error code if it receives an offset fetch or commit request for a group that it is not a coordinator for
[16] Not Coordinator For Group: the broker returns this error code if it receives an offset fetch or commit request for a group that it is not a coordinator for
[16] Not Coordinator For Group: the broker returns this error code if it receives an offset fetch or commit request for a group that it is not a coordinator for
Establishing new reader kafka connection
entering loop for consumer group, "CONSUMER_GROUP_ID"

Failed to join group "CONSUMER_GROUP_ID": [16] Not Coordinator For Group: the broker returns this error code if it receives an offset fetch or commit request for a group that it is not a coordinator for
[16] Not Coordinator For Group: the broker returns this error code if it receives an offset fetch or commit request for a group that it is not a coordinator for
joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-f4864b83-310e-49b2-b6ed-737868bb65c0 in generation 10
joinGroup succeeded for response, "CONSUMER_GROUP_ID".  generationID=10, memberID=main@localhostname (github.com/segmentio/kafka-go)-f4864b83-310e-49b2-b6ed-737868bb65c0
Joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-f4864b83-310e-49b2-b6ed-737868bb65c0 in generation 10
joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-863aab20-f9d3-4933-befb-35f50430f251 in generation 10
joinGroup succeeded for response, "CONSUMER_GROUP_ID".  generationID=10, memberID=main@localhostname (github.com/segmentio/kafka-go)-863aab20-f9d3-4933-befb-35f50430f251
Joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-863aab20-f9d3-4933-befb-35f50430f251 in generation 10
joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-bc9d65d9-c1e7-4361-847f-33de1f6b6394 in generation 10
selected as leader for group, "CONSUMER_GROUP_ID"

joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-0f036499-549e-45b7-b85b-fad504ae7cf0 in generation 10
joinGroup succeeded for response, "CONSUMER_GROUP_ID".  generationID=10, memberID=main@localhostname (github.com/segmentio/kafka-go)-0f036499-549e-45b7-b85b-fad504ae7cf0
Joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-0f036499-549e-45b7-b85b-fad504ae7cf0 in generation 10
joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-9c8b7ba9-3595-44cb-a08d-035309ecfa93 in generation 10
joinGroup succeeded for response, "CONSUMER_GROUP_ID".  generationID=10, memberID=main@localhostname (github.com/segmentio/kafka-go)-9c8b7ba9-3595-44cb-a08d-035309ecfa93
Joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-9c8b7ba9-3595-44cb-a08d-035309ecfa93 in generation 10
using 'range' balancer to assign group, "CONSUMER_GROUP_ID"
found member: main@localhostname (github.com/segmentio/kafka-go)-bc9d65d9-c1e7-4361-847f-33de1f6b6394/[]byte(nil)
found member: main@localhostname (github.com/segmentio/kafka-go)-0f036499-549e-45b7-b85b-fad504ae7cf0/[]byte(nil)
found member: main@localhostname (github.com/segmentio/kafka-go)-863aab20-f9d3-4933-befb-35f50430f251/[]byte(nil)
found member: main@localhostname (github.com/segmentio/kafka-go)-9c8b7ba9-3595-44cb-a08d-035309ecfa93/[]byte(nil)
found member: main@localhostname (github.com/segmentio/kafka-go)-f4864b83-310e-49b2-b6ed-737868bb65c0/[]byte(nil)
found topic/partition: TOPIC_NAME/0
assigned member/topic/partitions main@localhostname (github.com/segmentio/kafka-go)-f4864b83-310e-49b2-b6ed-737868bb65c0/TOPIC_NAME/[0]
joinGroup succeeded for response, "CONSUMER_GROUP_ID".  generationID=10, memberID=main@localhostname (github.com/segmentio/kafka-go)-bc9d65d9-c1e7-4361-847f-33de1f6b6394
Joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-bc9d65d9-c1e7-4361-847f-33de1f6b6394 in generation 10
Syncing 5 assignments for generation 10 as member main@localhostname (github.com/segmentio/kafka-go)-bc9d65d9-c1e7-4361-847f-33de1f6b6394
sync group finished for group, "CONSUMER_GROUP_ID"
received empty assignments for group, "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-9c8b7ba9-3595-44cb-a08d-035309ecfa93 for generation 10
sync group finished for group, "CONSUMER_GROUP_ID"
received empty assignments for group, "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-863aab20-f9d3-4933-befb-35f50430f251 for generation 10
sync group finished for group, "CONSUMER_GROUP_ID"
received empty assignments for group, "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-bc9d65d9-c1e7-4361-847f-33de1f6b6394 for generation 10
sync group finished for group, "CONSUMER_GROUP_ID"
received empty assignments for group, "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-0f036499-549e-45b7-b85b-fad504ae7cf0 for generation 10
sync group finished for group, "CONSUMER_GROUP_ID"
subscribed to topics and partitions: map[]
started commit for group "CONSUMER_GROUP_ID"

started heartbeat for group, "CONSUMER_GROUP_ID" [3s]
subscribed to topics and partitions: map[{topic:TOPIC_NAME partition:0}:40337]
started heartbeat for group, "CONSUMER_GROUP_ID" [3s]
initializing kafka reader for partition 0 of TOPIC_NAME starting at offset 40337
started commit for group "CONSUMER_GROUP_ID"

subscribed to topics and partitions: map[]
started heartbeat for group, "CONSUMER_GROUP_ID" [3s]
started commit for group "CONSUMER_GROUP_ID"

subscribed to topics and partitions: map[]
started heartbeat for group, "CONSUMER_GROUP_ID" [3s]
started commit for group "CONSUMER_GROUP_ID"

started heartbeat for group, "CONSUMER_GROUP_ID" [3s]
subscribed to topics and partitions: map[]
started commit for group "CONSUMER_GROUP_ID"

the kafka reader for partition 0 of TOPIC_NAME is seeking to offset 40337
no messages received from kafka within the allocated time for partition 0 of TOPIC_NAME at offset 40337
no messages received from kafka within the allocated time for partition 0 of TOPIC_NAME at offset 40337


########### All Kafka nodes are now available, so reader goes back to polling. ###########

no messages received from kafka within the allocated time for partition 0 of TOPIC_NAME at offset 40337
no messages received from kafka within the allocated time for partition 0 of TOPIC_NAME at offset 40338
no messages received from kafka within the allocated time for partition 0 of TOPIC_NAME at offset 40338

########### At this point, the reader is no longer functional, and wont consumer messages until i restart it. ###########

The only weirdness i see is the fact the offset was bumped up by one once the kafka nodes were brought back online, without there being actual logging of the reader committing this to kafka.

Not sure if im just seeing issues where there arent any...

Sparkz0629 avatar Sep 23 '22 15:09 Sparkz0629

Thanks for suppling the logs; It's still not entirely clear what's going on. There are these subscribed to topics and partitions: map[] lines which seem to indicate that certain group members aren't getting any assignments.

found member: main@localhostname (github.com/segmentio/kafka-go)-bc9d65d9-c1e7-4361-847f-33de1f6b6394/[]byte(nil)
found member: main@localhostname (github.com/segmentio/kafka-go)-0f036499-549e-45b7-b85b-fad504ae7cf0/[]byte(nil)
found member: main@localhostname (github.com/segmentio/kafka-go)-863aab20-f9d3-4933-befb-35f50430f251/[]byte(nil)
found member: main@localhostname (github.com/segmentio/kafka-go)-9c8b7ba9-3595-44cb-a08d-035309ecfa93/[]byte(nil)
found member: main@localhostname (github.com/segmentio/kafka-go)-f4864b83-310e-49b2-b6ed-737868bb65c0/[]byte(nil)
found topic/partition: TOPIC_NAME/0

Looks a bit like 5 consumers in the same group are trying consume from a topic with a single partition.

Are you redacting multiple consumer group ids and topic names with the same CONSUMER_GROUP_ID and TOPIC_NAME values?

rhansen2 avatar Sep 30 '22 16:09 rhansen2

@rhansen2 ,

It is a single CONSUMER_GROUP_ID and TOPIC_NAME.

The topic is however split between 5 partitions on kafka.

Sparkz0629 avatar Sep 30 '22 20:09 Sparkz0629

It looks like the reader is only showing a single partition for that topic found topic/partition: TOPIC_NAME/0. That may be why it's stalling, are you only seeing lag grow on the other 4 partitions?

Something else that stands out is; before the Kafka restart it looks like your consumer group has 1 member but after it looks like 5 members try to join

Before:

joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-c22e4a10-d1b9-4254-b3fe-5090a0ffb323 in generation 9
selected as leader for group, "CONSUMER_GROUP_ID"

using 'range' balancer to assign group, "CONSUMER_GROUP_ID"
found member: main@localhostname (github.com/segmentio/kafka-go)-c22e4a10-d1b9-4254-b3fe-5090a0ffb323/[]byte(nil)
found topic/partition: TOPIC_NAME/0
assigned member/topic/partitions main@localhostname (github.com/segmentio/kafka-go)-c22e4a10-d1b9-4254-b3fe-5090a0ffb323/TOPIC_NAME/[0]
joinGroup succeeded for response, "CONSUMER_GROUP_ID".  generationID=9, memberID=main@localhostname (github.com/segmentio/kafka-go)-c22e4a10-d1b9-4254-b3fe-5090a0ffb323
Joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-c22e4a10-d1b9-4254-b3fe-5090a0ffb323 in generation 9
Syncing 1 assignments for generation 9 as member main@localhostname (github.com/segmentio/kafka-go)-c22e4a10-d1b9-4254-b3fe-5090a0ffb323
sync group finished for group, "CONSUMER_GROUP_ID"
subscribed to topics and partitions: map[{topic:TOPIC_NAME partition:0}:40334]
initializing kafka reader for partition 0 of TOPIC_NAME starting at offset 40334
started heartbeat for group, "CONSUMER_GROUP_ID" [3s]
started commit for group "CONSUMER_GROUP_ID"

After

joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-f4864b83-310e-49b2-b6ed-737868bb65c0 in generation 10
joinGroup succeeded for response, "CONSUMER_GROUP_ID".  generationID=10, memberID=main@localhostname (github.com/segmentio/kafka-go)-f4864b83-310e-49b2-b6ed-737868bb65c0
Joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-f4864b83-310e-49b2-b6ed-737868bb65c0 in generation 10
joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-863aab20-f9d3-4933-befb-35f50430f251 in generation 10
joinGroup succeeded for response, "CONSUMER_GROUP_ID".  generationID=10, memberID=main@localhostname (github.com/segmentio/kafka-go)-863aab20-f9d3-4933-befb-35f50430f251
Joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-863aab20-f9d3-4933-befb-35f50430f251 in generation 10
joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-bc9d65d9-c1e7-4361-847f-33de1f6b6394 in generation 10
selected as leader for group, "CONSUMER_GROUP_ID"

joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-0f036499-549e-45b7-b85b-fad504ae7cf0 in generation 10
joinGroup succeeded for response, "CONSUMER_GROUP_ID".  generationID=10, memberID=main@localhostname (github.com/segmentio/kafka-go)-0f036499-549e-45b7-b85b-fad504ae7cf0
Joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-0f036499-549e-45b7-b85b-fad504ae7cf0 in generation 10
joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-9c8b7ba9-3595-44cb-a08d-035309ecfa93 in generation 10
joinGroup succeeded for response, "CONSUMER_GROUP_ID".  generationID=10, memberID=main@localhostname (github.com/segmentio/kafka-go)-9c8b7ba9-3595-44cb-a08d-035309ecfa93
Joined group "CONSUMER_GROUP_ID" as member main@localhostname (github.com/segmentio/kafka-go)-9c8b7ba9-3595-44cb-a08d-035309ecfa93 in generation 10
using 'range' balancer to assign group, "CONSUMER_GROUP_ID"
found member: main@localhostname (github.com/segmentio/kafka-go)-bc9d65d9-c1e7-4361-847f-33de1f6b6394/[]byte(nil)
found member: main@localhostname (github.com/segmentio/kafka-go)-0f036499-549e-45b7-b85b-fad504ae7cf0/[]byte(nil)
found member: main@localhostname (github.com/segmentio/kafka-go)-863aab20-f9d3-4933-befb-35f50430f251/[]byte(nil)
found member: main@localhostname (github.com/segmentio/kafka-go)-9c8b7ba9-3595-44cb-a08d-035309ecfa93/[]byte(nil)
found member: main@localhostname (github.com/segmentio/kafka-go)-f4864b83-310e-49b2-b6ed-737868bb65c0/[]byte(nil)
found topic/partition: TOPIC_NAME/0

Based on seeing Establishing new reader kafka connection in the logs I think you're hitting the corresponding line in your code snippet below.

func (kr *KafkaReader) ReadMessage(ctx context.Context) ([]byte, error) {
	msg, err := kr.kreader.ReadMessage(ctx)
	if err != nil {
		fmt.Println(err)
		fmt.Println("Establishing new reader kafka connection")
		kr.kreader = newKafkaReader(kr.Config, kr.GroupId, kr.TopicName)
		msg, err = kr.kreader.ReadMessage(ctx)

		if err != nil {
			return nil, err
		}
	}
	return msg.Value, nil
}

If that's the case I believe you should call Close() on the old reader before creating the new one; That way it cleans up any resources associated with it, including the old consumer group member. I also don't believe creating a new reader is not necessary whenever ReadMessage() returns an error. The Reader should continue trying to re-establish the consumer group in the background until it's able to successfully do so.

rhansen2 avatar Sep 30 '22 23:09 rhansen2

@rhansen2 ,

I removed the creation of a new reader in the ReadMessage function, so it now looks like so:

func (kr *KafkaReader) ReadMessage(ctx context.Context) ([]byte, error) {
	if kr.kreader == nil {
		kr.kreader = newKafkaReader(kr.Config, kr.GroupId, kr.TopicName)
	}
	msg, err := kr.kreader.ReadMessage(ctx)
	if err != nil {
		return nil, err
	}
	return msg.Value, nil
}

This seems to have completely solved the reader issue.

No matter how many times i restart the brokers and in which order, it always recovers.

I assume i should do the same with the writer? We also have a recreation of the writer on a failure. I could just loop the writing of the message until i get a success response back?

Thank you so much for taking the time to help with this.

I will continue testing it and confirm if i run into any issues, but for now it looks great!

Sparkz0629 avatar Oct 01 '22 20:10 Sparkz0629

I assume i should do the same with the writer? We also have a recreation of the writer on a failure. I could just loop the writing of the message until i get a success response back?

Yes, you're correct, you do not need to recreate the writer, just retry until success.

Glad I was able to help get things working!

rhansen2 avatar Oct 02 '22 00:10 rhansen2

I think it's safe to close this off.

The logs get very unhappy during the instability, but other than that (Which is kinda expected) it is working as expected.

Thanks for the help @rhansen2

Sparkz0629 avatar Oct 03 '22 09:10 Sparkz0629