kafka-go
kafka-go copied to clipboard
Messages not consumed when reader connects during topic creation
Describe the bug
I am using Reader with GroupID defined to read a kafka topic. The topic is being created in parallel to reader start. Sometimes it happens reader joins consumer group when the topic creation on kafka side is not completed yet and is assigned with no partitions.
In this case the reader never starts consuming the topic. When I change the topic config (e.g. change of number of partitions) partition watcher correctly triggers rebalance and reader starts to read.
Kafka Version 2.7.0 (locally using bitnami/kafka:2-debian-10)
To Reproduce
- Start kafka consumer using Reader
kafka.ReaderConfig{
Brokers: []string{"127.0.0.1:9092"},
GroupID: "regatta-test-local",
Topic: "regatta-test",
Dialer: dialer,
MaxBytes: 10e6, // 10MB
MaxWait: 3 * time.Second,
CommitInterval: 1 * time.Second,
RetentionTime: 7 * 24 * time.Hour,
WatchPartitionChanges: true,
}
- Reader is not able to connect to kafka because it is not started yet. Logs:
2021-01-12T10:05:06.924+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 entering loop for consumer group, regatta-test-local
2021-01-12T10:05:06.925+0100 ERROR consumer:regatta-test [email protected]/logger.go:17 Unable to establish connection to consumer group coordinator for group regatta-test-local: dial tcp 127.0.0.1:9092: connect: connection refused
github.com/segmentio/kafka-go.LoggerFunc.Printf
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/logger.go:17
github.com/segmentio/kafka-go.(*ConsumerGroup).nextGeneration.func1
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:638
github.com/segmentio/kafka-go.(*ConsumerGroup).withErrorLogger
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:1090
github.com/segmentio/kafka-go.(*ConsumerGroup).nextGeneration
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:637
github.com/segmentio/kafka-go.(*ConsumerGroup).run
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:579
github.com/segmentio/kafka-go.NewConsumerGroup.func1
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:517
2021-01-12T10:05:06.925+0100 ERROR consumer:regatta-test [email protected]/logger.go:17 dial tcp 127.0.0.1:9092: connect: connection refused
github.com/segmentio/kafka-go.LoggerFunc.Printf
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/logger.go:17
github.com/segmentio/kafka-go.(*Reader).run.func2
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:263
github.com/segmentio/kafka-go.(*Reader).withErrorLogger
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:1021
github.com/segmentio/kafka-go.(*Reader).run
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:262
- Start empty kafka.
- Reader can't read
regatta-testtopic. Logs:
2021-01-12T10:05:32.218+0100 ERROR consumer:regatta-test [email protected]/logger.go:17 Unable to establish connection to consumer group coordinator for group regatta-test-local: [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active
github.com/segmentio/kafka-go.LoggerFunc.Printf
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/logger.go:17
github.com/segmentio/kafka-go.(*ConsumerGroup).nextGeneration.func1
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:638
github.com/segmentio/kafka-go.(*ConsumerGroup).withErrorLogger
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:1090
github.com/segmentio/kafka-go.(*ConsumerGroup).nextGeneration
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:637
github.com/segmentio/kafka-go.(*ConsumerGroup).run
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:579
github.com/segmentio/kafka-go.NewConsumerGroup.func1
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:517
2021-01-12T10:05:37.312+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 joined group regatta-test-local as member [email protected] (github.com/segmentio/kafka-go)-41f1f0bc-e20a-40f6-8370-6b362813e55e in generation 1
2021-01-12T10:05:37.312+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 selected as leader for group, regatta-test-local
2021-01-12T10:05:37.331+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 using 'range' balancer to assign group, regatta-test-local
2021-01-12T10:05:37.331+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 found member: [email protected] (github.com/segmentio/kafka-go)-41f1f0bc-e20a-40f6-8370-6b362813e55e/[]byte(nil)
2021-01-12T10:05:37.331+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 joinGroup succeeded for response, regatta-test-local. generationID=1, [email protected] (github.com/segmentio/kafka-go)-41f1f0bc-e20a-40f6-8370-6b362813e55e
2021-01-12T10:05:37.331+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 Joined group regatta-test-local as member [email protected] (github.com/segmentio/kafka-go)-41f1f0bc-e20a-40f6-8370-6b362813e55e in generation 1
2021-01-12T10:05:37.331+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 Syncing 1 assignments for generation 1 as member [email protected] (github.com/segmentio/kafka-go)-41f1f0bc-e20a-40f6-8370-6b362813e55e
2021-01-12T10:05:37.438+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 received empty assignments for group, regatta-test-local as member [email protected] (github.com/segmentio/kafka-go)-41f1f0bc-e20a-40f6-8370-6b362813e55e for generation 1
2021-01-12T10:05:37.438+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 sync group finished for group, regatta-test-local
2021-01-12T10:05:37.446+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 subscribed to partitions: map[]
2021-01-12T10:05:37.447+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 started commit for group regatta-test-local
2021-01-12T10:05:37.447+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 started partition watcher for group, regatta-test-local, topic regatta-test [5s]
2021-01-12T10:05:37.453+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 started heartbeat for group, regatta-test-local [3s]
2021-01-12T10:05:37.466+0100 ERROR consumer:regatta-test [email protected]/logger.go:17 Problem getting partitions during startup, [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker
, Returning and setting up nextGeneration
github.com/segmentio/kafka-go.LoggerFunc.Printf
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/logger.go:17
github.com/segmentio/kafka-go.(*Generation).partitionWatcher.func1.3
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:448
github.com/segmentio/kafka-go.(*ConsumerGroup).withErrorLogger
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:1090
github.com/segmentio/kafka-go.(*Generation).partitionWatcher.func1
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:447
github.com/segmentio/kafka-go.(*Generation).Start.func1
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:338
2021-01-12T10:05:37.466+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 stopped partition watcher for group, regatta-test-local, topic regatta-test
2021-01-12T10:05:37.467+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 stopped heartbeat for group regatta-test-local
- The above error keeps occurring permanently.
- Create topic
regatta-testin kafka. - Error
Unknown Topic Or Partitionno longer occurs but it is stucked with empty assignments:
2021-01-12T10:13:35.777+0100 ERROR consumer:regatta-test [email protected]/logger.go:17 Problem getting partitions during startup, [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker
, Returning and setting up nextGeneration
github.com/segmentio/kafka-go.LoggerFunc.Printf
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/logger.go:17
github.com/segmentio/kafka-go.(*Generation).partitionWatcher.func1.3
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:448
github.com/segmentio/kafka-go.(*ConsumerGroup).withErrorLogger
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:1090
github.com/segmentio/kafka-go.(*Generation).partitionWatcher.func1
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:447
github.com/segmentio/kafka-go.(*Generation).Start.func1
/Users/jizi/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:338
2021-01-12T10:13:35.777+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 stopped partition watcher for group, regatta-test-local, topic regatta-test
2021-01-12T10:13:35.778+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 stopped commit for group regatta-test-local
2021-01-12T10:13:35.778+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 stopped heartbeat for group regatta-test-local
2021-01-12T10:13:35.807+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 joined group regatta-test-local as member [email protected] (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e in generation 355
2021-01-12T10:13:35.807+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 selected as leader for group, regatta-test-local
2021-01-12T10:13:35.825+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 using 'range' balancer to assign group, regatta-test-local
2021-01-12T10:13:35.825+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 found member: [email protected] (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e/[]byte(nil)
2021-01-12T10:13:35.825+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 joinGroup succeeded for response, regatta-test-local. generationID=355, [email protected] (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e
2021-01-12T10:13:35.825+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 Joined group regatta-test-local as member [email protected] (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e in generation 355
2021-01-12T10:13:35.825+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 Syncing 1 assignments for generation 355 as member [email protected] (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e
2021-01-12T10:13:35.833+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 received empty assignments for group, regatta-test-local as member [email protected] (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e for generation 355
2021-01-12T10:13:35.833+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 sync group finished for group, regatta-test-local
2021-01-12T10:13:35.842+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 started heartbeat for group, regatta-test-local [3s]
2021-01-12T10:13:35.842+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 subscribed to partitions: map[]
2021-01-12T10:13:35.842+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 started partition watcher for group, regatta-test-local, topic regatta-test [5s]
2021-01-12T10:13:35.842+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 started commit for group regatta-test-local
- Change topic config and the reader recovers. Logs:
2021-01-12T10:14:45.849+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 Partition changes found, reblancing group: regatta-test-local.
2021-01-12T10:14:45.849+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 stopped partition watcher for group, regatta-test-local, topic regatta-test
2021-01-12T10:14:45.849+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 stopped commit for group regatta-test-local
2021-01-12T10:14:45.849+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 stopped heartbeat for group regatta-test-local
2021-01-12T10:14:45.866+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 joined group regatta-test-local as member [email protected] (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e in generation 356
2021-01-12T10:14:45.866+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 selected as leader for group, regatta-test-local
2021-01-12T10:14:45.869+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 using 'range' balancer to assign group, regatta-test-local
2021-01-12T10:14:45.869+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 found member: [email protected] (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e/[]byte(nil)
2021-01-12T10:14:45.869+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 found topic/partition: regatta-test/0
2021-01-12T10:14:45.869+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 found topic/partition: regatta-test/1
2021-01-12T10:14:45.869+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 assigned member/topic/partitions [email protected] (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e/regatta-test/[0 1]
2021-01-12T10:14:45.869+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 joinGroup succeeded for response, regatta-test-local. generationID=356, [email protected] (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e
2021-01-12T10:14:45.869+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 Joined group regatta-test-local as member [email protected] (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e in generation 356
2021-01-12T10:14:45.869+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 Syncing 1 assignments for generation 356 as member [email protected] (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e
2021-01-12T10:14:45.873+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 sync group finished for group, regatta-test-local
2021-01-12T10:14:45.876+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 subscribed to partitions: map[0:-2 1:-2]
2021-01-12T10:14:45.876+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 started partition watcher for group, regatta-test-local, topic regatta-test [5s]
2021-01-12T10:14:45.876+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 started commit for group regatta-test-local
2021-01-12T10:14:45.876+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 initializing kafka reader for partition 0 of regatta-test starting at offset -2
2021-01-12T10:14:45.876+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 initializing kafka reader for partition 1 of regatta-test starting at offset -2
2021-01-12T10:14:45.876+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 started heartbeat for group, regatta-test-local [3s]
2021-01-12T10:14:45.920+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 the kafka reader for partition 0 of regatta-test is seeking to offset 0
2021-01-12T10:14:45.926+0100 DEBUG consumer:regatta-test [email protected]/logger.go:17 the kafka reader for partition 1 of regatta-test is seeking to offset 0
Sometimes it happens that when the topic is created in kafka the reader gets the partitions immediately and than the reader works as expected without any further intervention needed. It seems that topic creation is not atomic operation and when the reader connects "in the middle" it is not able to recover. Maybe it can bedetected here when the assignments are empty?
Expected behavior The reader is always able to reconnect and consume kafka topic even if kafka is not ready during startup.
Hello @jizi, and thank you for the detailed bug report.
I think your suggestion on how to fix seems sound, would you have time to submit a pull request that addresses the issue?
Hello @jizi, I just wanted to check whether you were still experiencing this issue with kafka-go?