franz-go icon indicating copy to clipboard operation
franz-go copied to clipboard

Can not flush when MaxBufferedRecords reached

Open evanzhang87 opened this issue 1 month ago • 0 comments

https://github.com/twmb/franz-go/issues/591 According this, I think when records will be flushed when linger is hit or batch is full.

	seeds := strings.Split(brokers, ",")
	cl, err := kgo.NewClient(
		kgo.SeedBrokers(seeds...),
		kgo.ProducerLinger(time.Second*10),
		kgo.MaxBufferedRecords(5),
	)
	if err != nil {
		panic(err)
	}
	defer cl.Close()
	ctx := context.Background()

	record := &kgo.Record{Topic: "cbd_yzhang_test", Value: []byte("this is a message")}
	wg := sync.WaitGroup{}
	for i := 0; i < 11; i++ {
		fmt.Println("send msg", time.Now())
		wg.Add(1)
		cl.Produce(ctx, record, func(r *kgo.Record, err error) {
			if err != nil {
				fmt.Println(err)
			}
			wg.Done()
		})
	}
	wg.Wait()

But it always flush per 10s, when I reach buffer size, it will just block and do not flush.

send msg 2024-05-10 14:49:39.630297 +0800 CST m=+0.001410626
send msg 2024-05-10 14:49:39.630429 +0800 CST m=+0.001543085
send msg 2024-05-10 14:49:39.630432 +0800 CST m=+0.001545793
send msg 2024-05-10 14:49:39.630433 +0800 CST m=+0.001547251
send msg 2024-05-10 14:49:39.630435 +0800 CST m=+0.001548543
send msg 2024-05-10 14:49:39.630436 +0800 CST m=+0.001549710
send msg 2024-05-10 14:49:49.722552 +0800 CST m=+10.094001585
send msg 2024-05-10 14:49:49.722603 +0800 CST m=+10.094051835
send msg 2024-05-10 14:49:49.722609 +0800 CST m=+10.094058335
send msg 2024-05-10 14:49:49.722614 +0800 CST m=+10.094063085
send msg 2024-05-10 14:49:49.722618 +0800 CST m=+10.094067001

evanzhang87 avatar May 10 '24 06:05 evanzhang87