confluent-kafka-go icon indicating copy to clipboard operation
confluent-kafka-go copied to clipboard

producer Flush() doesn't respect timeout and returns wrong messages len

Open kurennoy16 opened this issue 1 year ago • 5 comments

Description

I'm writing a wrapper to use this kafka client in my project. I noticed that in case of specifying the wrong broker address, the producer Flush() method blocks my app termination. No messages were sent via the producer client, it simply exits immediately on retrieving kafka.ErrTransport error.

I see a few issues here:

  1. Flush() doesn't respect the provided timeout
  2. Flush() always return value > 0 even if no messages were sent

How to reproduce

This is simplified code reproducing the issue.

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
	"github.com/pkg/errors"
	"golang.org/x/sync/errgroup"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	gracefulShutdown(cancel)

	eg, ctx := errgroup.WithContext(ctx)

	if _, err := initProducer(ctx, eg); err != nil {
		panic(err)
	}

	if err := eg.Wait(); err != nil {
		panic(err)
	}
}

func gracefulShutdown(stop func()) {
	signalChannel := make(chan os.Signal, 1)
	signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)

	go func() {
		<-signalChannel
		log.Print("got interrupt signal")
		stop()
	}()
}

func initProducer(ctx context.Context, eg *errgroup.Group) (*kafka.Producer, error) {
	producerClient, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "127.0.0.1:88888"}) // <--- uses wrong broker address
	if err != nil {
		return nil, errors.Wrap(err, "kafka.NewProducer")
	}

	eg.Go(func() error {
		<-ctx.Done()

		// wait until all in send process messages are sent to internal librdkafka queue before Flush()
		time.Sleep(1 * time.Second)

		for producerClient.Flush(5000) > 0 {
			log.Print("Still waiting to flush outstanding messages\n")
		}

		producerClient.Close()

		return nil
	})

	eg.Go(func() error {
		ev := <-producerClient.Events()
		// Broker transport failure, so I want to shutdown application
		// err.IsFatal() is false, so it's useless. err.IsRetriable() seems works only with transaction, so it useless too.
		// Is there better approach to handle such errors?
		if err := ev.(kafka.Error); err.Code() == kafka.ErrTransport {
			err.IsRetriable()
			return errors.Wrapf(err, "broker transport error, code: %d", err.Code())
		}

		return nil
	})

	return producerClient, nil
}

Checklist

Please provide the following information:

  • [x] confluent-kafka-go and librdkafka version (LibraryVersion()): 33620479 2.1.1
  • [x] Apache Kafka broker version: confluentinc/cp-kafka:7.3.0
  • [x] Client configuration: ConfigMap{...} &kafka.ConfigMap{"bootstrap.servers": "127.0.0.1:88888"}
  • [x] Operating system: Darwin arm64
  • [ ] Provide client logs (with "debug": ".." as necessary)
  • [ ] Provide broker log excerpts
  • [x] Critical issue

kurennoy16 avatar May 11 '23 09:05 kurennoy16

Hello, thanks for reporting this.

  1. I tried using this locally, the timeout for Flush seems to be working fine, it takes 5 seconds between flushes. If it's not the same for you, please attach some logs with timing so I can understand it better.

  2. The Flush is not timing out, because there are pending Events in Events(): namely, the connection errors. It's expected in normal operations for there to be transport errors with brokers (because of things like connectivity issues or cluster rolls). So, a broker transport failure can't be considered a fatal error, and the underlying library tries to keep reconnecting. This reconnection causes further errors which then are put inside the Events() channel. This channel needs to be drained by the user, since while it's common for Flush to be called before Close, it can also be called anytime, and we don't want users to miss any error report by draining the channel ourselves when Flush is called.

  3. Finally, about looping with a Flush: it's probably a bit risky to flush inside a loop, especially if the broker is actually down and not just having a temporary failure. There might be actual messages produced and waiting for delivery, and no amount of Flushing will cause them to be delivered. They will end up as failed delivery reports after a certain period of time, which are also something which the user has to drain from the Events() channel. I see that some of our examples we have used the looping while Flush()>0 pattern, I will change it, so thank you for bringing it to notice.

milindl avatar May 15 '23 05:05 milindl

Hey @milindl Thank you so much for your response.

Yeah, I believe I was confused with this loop over Flush() in the examples. That's a great idea to fix examples everywhere.

Could you advise me on the way on handling errors, like credentials issues, etc? I tried to use kafka.Error.IsFatal(), but it doesn't return anything. I believe isFatal() has a narrow use case for now, right?

kurennoy16 avatar May 18 '23 07:05 kurennoy16

2. The Flush is not timing out, because there are pending Events in Events(): namely, the connection errors. It's expected in normal operations for there to be transport errors with brokers (because of things like connectivity issues or cluster rolls). So, a broker transport failure can't be considered a fatal error, and the underlying library tries to keep reconnecting. This reconnection causes further errors which then are put inside the Events() channel. This channel needs to be drained by the user, since while it's common for Flush to be called before Close, it can also be called anytime, and we don't want users to miss any error report by draining the channel ourselves when Flush is called.

This is an important piece of information that could be included in the docs.

danielo-golangieli avatar May 19 '23 13:05 danielo-golangieli

@milindl What would be the correct course of action in this case to Flush events? I was trying something along the lines of this:

defer func() {
		log.Infof("draining kafka events")
		// drain remaining events, because it prevents flushing
		for ev := range controller.producer.Events() {
			handleKafkaEvent(ev, true)
		}

		log.Infof("flushing kafka messages")
		controller.producer.Flush(timeout)
	}()

but the problem with this is that the events channel is not closed prior to closing the producer, but closing the producer would probably prevent flushing from happening.

danielo-golangieli avatar May 19 '23 14:05 danielo-golangieli

Hey @kurennoy16 👋 . Did you figure out the error handling for the fatal errors? Also, what errors are considered fatal errors?

Sam-sad-Sajid avatar Aug 15 '23 15:08 Sam-sad-Sajid