kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

FetchMessage Why submit offset automatically

Open yangtongbing opened this issue 1 year ago • 7 comments

I tried several ways, including setting CommitInterval, to commit automatically, and didn't commit offset through CommitMessages as expected. Here is my sample code.

package main

import (
	"context"
	"fmt"
	kafka "github.com/segmentio/kafka-go"
)

func main() {
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{"127.0.0.1:9092"},
		Topic:    "test",
		GroupID:  "9",
		MinBytes: 10e3, // 10KB
		MaxBytes: 10e6, // 10MB
	})
	defer r.Close()
	ctx := context.Background()
	for {
		m, err := r.FetchMessage(ctx)
		if err != nil {
			break
		}
		fmt.Printf("message at offset %d:%s\n", m.Offset, string(m.Value))
		if shouldCommit(m) {
			err = r.CommitMessages(ctx, m)
			if err != nil {
				fmt.Println("commit failed", err)
			}
		}
	}
}

func shouldCommit(message kafka.Message) bool {
	return false
}

yangtongbing avatar Dec 05 '24 08:12 yangtongbing