franz-go
franz-go copied to clipboard
Struggling to catch retryable errors despite KeepRetryableFetchErrors() option
Hi,
We have a problem with misconfiguring topic names, and I'd like to catch and fatal on this error. Reading the docs, my understanding is KeepRetryableFetchErrors() should do this, and then I can examine each error to decide whether or not I want to fatal. However, I'm having no luck with 0.15.3 on Go 1.21.4. I know kgo is definitely seeing the error, because the log says as much, but doesn't seem to be passing it to my fetches.Errors.
Quick example:
func main() {
var opts = []kgo.Opt{
kgo.SeedBrokers(host),
kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelInfo, nil)),
kgo.ConsumerGroup(`test`),
kgo.SASL(plain.Auth{User: user, Pass: pass}.AsMechanism()),
kgo.FetchMaxWait(10 * time.Millisecond),
kgo.KeepRetryableFetchErrors(),
kgo.ConsumeTopics("test_nonexistent_topic"),
kgo.Dialer((&tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}}).DialContext),
}
cl, err := kgo.NewClient(opts...)
if err != nil {
fmt.Printf("ERROR: %s\n", err)
os.Exit(1)
}
for {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
fetches := cl.PollFetches(ctx)
if fetches.Err() != nil {
for _, err := range fetches.Errors() {
fmt.Printf("ERROR: %s\n", err.Err)
}
break
}
for _, r := range fetches.Records() {
spew.Dump(r)
}
}
}
Output:
[INFO] beginning autocommit loop; group: test
[INFO] immediate metadata update triggered; why: querying metadata for consumer initialization
[INFO] metadata update triggered; why: re-updating due to inner errors: UNKNOWN_TOPIC_OR_PARTITION{test_nonexistent}
[INFO] metadata update triggered; why: re-updating due to inner errors: UNKNOWN_TOPIC_OR_PARTITION{test_nonexistent}
[INFO] metadata update triggered; why: re-updating due to inner errors: UNKNOWN_TOPIC_OR_PARTITION{test_nonexistent}
[INFO] metadata update triggered; why: re-updating due to inner errors: UNKNOWN_TOPIC_OR_PARTITION{test_nonexistent}
[INFO] metadata update triggered; why: re-updating due to inner errors: UNKNOWN_TOPIC_OR_PARTITION{test_nonexistent}
[INFO] metadata update triggered; why: re-updating due to inner errors: UNKNOWN_TOPIC_OR_PARTITION{test_nonexistent}
ERROR: context deadline exceeded