franz-go icon indicating copy to clipboard operation
franz-go copied to clipboard

Struggling to catch retryable errors despite KeepRetryableFetchErrors() option

Open jadamscn opened this issue 6 months ago • 5 comments

Hi,

We have a problem with misconfiguring topic names, and I'd like to catch and fatal on this error. Reading the docs, my understanding is KeepRetryableFetchErrors() should do this, and then I can examine each error to decide whether or not I want to fatal. However, I'm having no luck with 0.15.3 on Go 1.21.4. I know kgo is definitely seeing the error, because the log says as much, but doesn't seem to be passing it to my fetches.Errors.

Quick example:

func main() {
	var opts = []kgo.Opt{
		kgo.SeedBrokers(host),
		kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelInfo, nil)),
		kgo.ConsumerGroup(`test`),
		kgo.SASL(plain.Auth{User: user, Pass: pass}.AsMechanism()),
		kgo.FetchMaxWait(10 * time.Millisecond),
		kgo.KeepRetryableFetchErrors(),
		kgo.ConsumeTopics("test_nonexistent_topic"),
		kgo.Dialer((&tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}}).DialContext),
	}

	cl, err := kgo.NewClient(opts...)
	if err != nil {
		fmt.Printf("ERROR: %s\n", err)
		os.Exit(1)
	}

	for {
		ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
		defer cancel()

		fetches := cl.PollFetches(ctx)
		if fetches.Err() != nil {
			for _, err := range fetches.Errors() {
				fmt.Printf("ERROR: %s\n", err.Err)
			}
			break
		}

		for _, r := range fetches.Records() {
			spew.Dump(r)
		}
	}
}

Output:

[INFO] beginning autocommit loop; group: test
[INFO] immediate metadata update triggered; why: querying metadata for consumer initialization
[INFO] metadata update triggered; why: re-updating due to inner errors: UNKNOWN_TOPIC_OR_PARTITION{test_nonexistent}
[INFO] metadata update triggered; why: re-updating due to inner errors: UNKNOWN_TOPIC_OR_PARTITION{test_nonexistent}
[INFO] metadata update triggered; why: re-updating due to inner errors: UNKNOWN_TOPIC_OR_PARTITION{test_nonexistent}
[INFO] metadata update triggered; why: re-updating due to inner errors: UNKNOWN_TOPIC_OR_PARTITION{test_nonexistent}
[INFO] metadata update triggered; why: re-updating due to inner errors: UNKNOWN_TOPIC_OR_PARTITION{test_nonexistent}
[INFO] metadata update triggered; why: re-updating due to inner errors: UNKNOWN_TOPIC_OR_PARTITION{test_nonexistent}
ERROR: context deadline exceeded

jadamscn avatar Dec 20 '23 03:12 jadamscn