franz-go
franz-go copied to clipboard
Can not flush when MaxBufferedRecords reached
https://github.com/twmb/franz-go/issues/591 According this, I think when records will be flushed when linger is hit or batch is full.
seeds := strings.Split(brokers, ",")
cl, err := kgo.NewClient(
kgo.SeedBrokers(seeds...),
kgo.ProducerLinger(time.Second*10),
kgo.MaxBufferedRecords(5),
)
if err != nil {
panic(err)
}
defer cl.Close()
ctx := context.Background()
record := &kgo.Record{Topic: "cbd_yzhang_test", Value: []byte("this is a message")}
wg := sync.WaitGroup{}
for i := 0; i < 11; i++ {
fmt.Println("send msg", time.Now())
wg.Add(1)
cl.Produce(ctx, record, func(r *kgo.Record, err error) {
if err != nil {
fmt.Println(err)
}
wg.Done()
})
}
wg.Wait()
But it always flush per 10s, when I reach buffer size, it will just block and do not flush.
send msg 2024-05-10 14:49:39.630297 +0800 CST m=+0.001410626
send msg 2024-05-10 14:49:39.630429 +0800 CST m=+0.001543085
send msg 2024-05-10 14:49:39.630432 +0800 CST m=+0.001545793
send msg 2024-05-10 14:49:39.630433 +0800 CST m=+0.001547251
send msg 2024-05-10 14:49:39.630435 +0800 CST m=+0.001548543
send msg 2024-05-10 14:49:39.630436 +0800 CST m=+0.001549710
send msg 2024-05-10 14:49:49.722552 +0800 CST m=+10.094001585
send msg 2024-05-10 14:49:49.722603 +0800 CST m=+10.094051835
send msg 2024-05-10 14:49:49.722609 +0800 CST m=+10.094058335
send msg 2024-05-10 14:49:49.722614 +0800 CST m=+10.094063085
send msg 2024-05-10 14:49:49.722618 +0800 CST m=+10.094067001