kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Reader using GroupID can miss messages when restarting (does not set offsets for all partitions)

Open wwade opened this issue 3 years ago • 1 comments

Describe the bug

While it is running, the Reader does not commit the consumer group offsets until it receives the first message on a partition. The result of this is that any messages delivered on those partitions before it starts again will be lost. This only happens when starting with kafka.LastOffset.

For example, if I read two messages on paritions 0 and 1 respectively, and then exit, it will have committed the following offsets:

Partition Offset
0 1
1 1
2 <not set>

I would have expected to see, instead:

Partition Offset
0 1
1 1
2 0

Kafka Version

  • What version(s) of Kafka are you testing against? wurstmeister/kafka:2.12-2.3.1
  • What version of kafka-go are you using? github.com/segmentio/kafka-go v0.4.31

To Reproduce

Resources to reproduce the behavior:

version: "3"
services:
  kafka:
    image: wurstmeister/kafka:2.12-2.3.1
    restart: on-failure:3
    network_mode: "host"
    environment:
      KAFKA_VERSION: '2.3.1'
      KAFKA_BROKER_ID: '1'
      KAFKA_DELETE_TOPIC_ENABLE: 'true'
      KAFKA_ADVERTISED_HOST_NAME: 'localhost'
      KAFKA_ADVERTISED_PORT: '19092'
      KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
      KAFKA_LISTENERS: 'PLAINTEXT://:19092'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:19092'
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'

  zookeeper:
    image: wurstmeister/zookeeper
    network_mode: "host"
package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"os/signal"
	"sort"
	"strconv"
	"strings"
	"syscall"
	"time"

	"github.com/google/uuid"
	"github.com/segmentio/kafka-go"
)

const broker = "localhost:19092"

var (
	topic  = ""
	logger *log.Logger
)

func main() {
	logger = log.New(log.Writer(), "[TESTCASE OUTPUT] ", log.Lshortfile)
	topic = "TOPIC-" + uuid.NewString()
	logger.Print("using topic name ", topic)
	ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer cancel()

	createTopic(ctx)
	count := 0
	count += produce(ctx, 1, []byte{0})
	count += produce(ctx, 1, []byte{1})
	// count += produce(ctx, 1, []byte{2})
	countMessages(ctx, count)

	timeout := 10 * time.Second
	realGroup := "legit-consumer-" + uuid.NewString()

	logger.Print("first time, will create the consumer group and read all the messages")
	consume(ctx, realGroup, timeout, count, kafka.FirstOffset)

	logger.Print("consumer is stopped, produce 3 new messages")
	count += produce(ctx, 1, []byte{0})
	count += produce(ctx, 1, []byte{1})
	count += produce(ctx, 1, []byte{2})
	logger.Print("confirm that the messages showed up in the topic")
	countMessages(ctx, count)

	logger.Print("restart consumer, expect to see the 3 new messages")
	consume(ctx, realGroup, timeout, 3, kafka.LastOffset)
}

func createTopic(ctx context.Context) {
	conn, err := kafka.Dial("tcp", broker)
	if err != nil {
		panic(err.Error())
	}
	defer conn.Close()

	controller, err := conn.Controller()
	if err != nil {
		panic(err.Error())
	}
	var controllerConn *kafka.Conn
	controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
	if err != nil {
		panic(err.Error())
	}
	defer controllerConn.Close()

	logger.Print("create topic ", topic)
	topicConfigs := []kafka.TopicConfig{{Topic: topic, NumPartitions: 3, ReplicationFactor: 1}}

	err = controllerConn.CreateTopics(topicConfigs...)
	if err != nil {
		panic(err.Error())
	}
}

func check(err error) {
	if err != nil {
		panic(err)
	}
}

func printGroupOffsets(ctx context.Context, g string) {
	hostPort := strings.SplitN(broker, ":", 2)
	host, port := hostPort[0], hostPort[1]
	portNum, err := strconv.Atoi(port)
	check(err)
	a, err := net.LookupIP(host)
	check(err)

	c := kafka.Client{Addr: &net.TCPAddr{IP: a[0], Port: portNum}}
	r, err := c.ListGroups(ctx, &kafka.ListGroupsRequest{})
	check(err)
	for _, rg := range r.Groups {
		if g != rg.GroupID {
			continue
		}
		o, err := c.ConsumerOffsets(ctx, kafka.TopicAndGroup{
			Topic:   topic,
			GroupId: g,
		})
		check(err)
		keys := make([]int, 0, len(o))
		for k := range o {
			keys = append(keys, k)
		}
		sort.Slice(keys, func(i, j int) bool {
			return keys[i] < keys[j]
		})
		logger.Print("GroupID: ", rg.GroupID)
		for _, k := range keys {
			logger.Print("   Partition: ", k, ", offset: ", o[k])
		}
	}
}

func keyToPartition(msg kafka.Message, partitions ...int) (partition int) {
	if len(msg.Key) == 0 {
		return 0
	}
	return int(uint(msg.Key[0]) % uint(len(partitions)))
}

func produce(ctx context.Context, count int, key []byte) int {
	writer := kafka.NewWriter(kafka.WriterConfig{
		Brokers:   []string{broker},
		Topic:     topic,
		BatchSize: 1,
		Balancer:  kafka.BalancerFunc(keyToPartition),
		Logger:    kafka.LoggerFunc(log.New(log.Writer(), "[kafka writer] ", 0).Printf),
	})

	msgs := make([]kafka.Message, count)
	for i := 0; i < count; i++ {
		msgs[i] = kafka.Message{
			Partition: 0,
			Key:       key,
			Value:     []byte(fmt.Sprintf("message %v.%v", key, i)),
		}
	}
	check(writer.WriteMessages(ctx, msgs...))
	return count
}

func countMessages(ctx context.Context, count int) {
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
	reader := kafka.NewReader(kafka.ReaderConfig{
		StartOffset: kafka.FirstOffset,
		GroupID:     uuid.NewString(),
		Brokers:     []string{broker},
		MaxWait:     1 * time.Second,
		Topic:       topic,
		Logger:      kafka.LoggerFunc(log.New(log.Writer(), "[message counter] ", 0).Printf),
	})
	defer func() { check(reader.Close()) }()
	partitions := map[int]int{}
	for i := 0; i < count; i++ {
		select {
		case <-ctx.Done():
			check(context.Canceled)
		default:
		}
		msg, err := reader.ReadMessage(ctx)
		check(err)
		partitions[msg.Partition]++
	}
	logger.Printf("partition message counts: %#v", partitions)
}

func consume(ctx context.Context, g string, d time.Duration, c int, so int64) {
	ctx, cancel := context.WithTimeout(ctx, d)
	defer cancel()
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:     []string{broker},
		Topic:       topic,
		GroupID:     g,
		MaxWait:     2 * time.Second,
		StartOffset: so,
		Logger:      kafka.LoggerFunc(log.New(log.Writer(), "[legit consumer] ", 0).Printf),
	})
	defer func() { check(reader.Close()) }()
	printGroupOffsets(ctx, g)

	for i := 0; i < c; i++ {
		select {
		case <-ctx.Done():
			return
		default:
		}
		logger.Print("expecting ", c-i, " more messages...")
		msg, err := reader.ReadMessage(ctx)
		check(err)
		logger.Printf("offset=%v key=%v value=%q", msg.Offset, msg.Key, string(msg.Value))
	}
}

Expected Behavior

I expected the consumer group to have an offset for every partition being read by the Reader after the reader runs the first time and exits cleanly. Basically, during reader init, commit offset=current.

Observed Behavior

The reader only sets offsets for a partition when it commits a message on that parition.

Additional Context

The expected behaviour is seen, for example, when using kafka-console-consumer.sh.

$ docker-compose exec  kafka kafka-console-consumer.sh  --bootstrap-server localhost:19092 --group console-consumer --topic TOPIC
Processed a total of 0 messages
$ docker-compose exec  kafka kafka-consumer-groups.sh  --bootstrap-server localhost:19092 --describe --group console-consumer

Consumer group 'console-consumer' has no active members.

GROUP            TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
console-consumer TOPIC           0          1               1               0               -               -               -
console-consumer TOPIC           2          0               0               0               -               -               -
console-consumer TOPIC           1          1               1               0               -               -               -
$ docker-compose exec  kafka kafka-consumer-groups.sh  --bootstrap-server localhost:19092 --describe --group kafka-go

Consumer group 'kafka-go' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
kafka-go        TOPIC           0          1               1               0               -               -               -
kafka-go        TOPIC           1          1               1               0               -               -               -
$ docker-compose exec  kafka kafka-console-consumer.sh  --bootstrap-server localhost:19092 --group console-consumer --topic TOPIC
message [0].0
message [2].0
message [1].0

So where kafka-console-consumer.sh received 3 messages after restarting, kafka-go received only 2.

wwade avatar Apr 30 '22 01:04 wwade

Here is a brutal hack I tried out to explain my suggestion further. It does pass my test case, at least (but it doesn't do a lot of other things properly).

See commit 8a1f956b0d2ebb09b4de9fd53d522603f9fe3dfc.

wwade avatar Apr 30 '22 02:04 wwade

It sounds like this is primarily a concern in cases of a very low-volume topic, so is there any particular reason using FirstOffset wouldn't be suitable here? Based on my understanding, this would probably be the right configuration for this scenario.

I can see the point you are making about committing at least a 0 for partitions we've not necessarily seen any messages for yet, but something tells me that implementing that logic internally could be tricky. Each consumer group generation is aware of which partitions it is processing, so it may be possible to leverage that to append "empty" partitions to the commit list. But that could be problematic on it's own, since you may end up overwriting a previous commit if a partition just happens to not be included on subsequent reads/batches.

I would strongly recommend using FirstOffset as that would definitely fix any concerns you have about data loss based on the situation you've described. If you would like to see LastOffset have some handling for this then I would at least review a PR to make this change, but I'd want to test it pretty thoroughly to make sure we don't introduce other more serious data loss problems in other situations.

dominicbarnes avatar Oct 14 '22 16:10 dominicbarnes