kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

A production-grade sample code on how to read message

Open silencer07 opened this issue 2 years ago • 3 comments

Describe the solution you would like

As the developers using this framework, we want a sample code from documentation that is easy to understand and that is production grade

The sample code in readme:

// make a new reader that consumes from topic-A
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{"localhost:9092", "localhost:9093", "localhost:9094"},
    GroupID:   "consumer-group-id",
    Topic:     "topic-A",
    MinBytes:  10e3, // 10KB
    MaxBytes:  10e6, // 10MB
})

for {
    m, err := r.ReadMessage(context.Background())
    if err != nil {
        break
    }
    fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}

if err := r.Close(); err != nil {
    log.Fatal("failed to close reader:", err)
}

is good when you are trying out the framework, but it gave us bugs when we tested like lost messages, improper commit of offsets etc.

Better way of reading messages from kafka

the code below so far does a better job of reading messages and ensure that the message will no be lost

func main() {
    go startConsumingMessages()
}

func startConsumingMessages() {
	log.Printf("start consuming messages... !!")
	for {
                // this ensure that new kafka reader will be created every time we return from the readMessage
                // this variable will be properly garbage-collected as well, avoiding memory leaks
		log.Printf("Creating new kafka reader...")
		reader := kafka.NewReader(kafka.ReaderConfig{
                      Brokers:   []string{"localhost:9092", "localhost:9093", "localhost:9094"},
                      GroupID:   "consumer-group-id",
                      Topic:     "topic-A",
                      MinBytes:  10e3, // 10KB
                      MaxBytes:  10e6, // 10MB
                  })

		readMessage(reader)
	}
}

func readMessage(reader *kafka.Reader) {
	defer func() {
		log.Printf("Kafka reader will be closed...")
		readerCloseError := reader.Close()
		if readerCloseError != nil {
                         fmt.Printf("Error while trying to close the reader -> %v\n",readerCloseError.Error())
		}
	}()

	for {
		err := processMessage(reader)
		if err != nil {
                         fmt.Printf("Kafka reader encountered error. breaking the loop... -> %v\n",err.Error())
			break
		}
	}
}

func (messageReader *MessageReader) processMessage(kafkaReader *kafka.Reader) error {
	perMessageContext := context.Background()
	kafkaMsg, fetchMessageError := kafkaReader.FetchMessage(perMessageContext)
	if fetchMessageError != nil {
		return fetchMessageError
	}

	// START - make sure to were able to process/persist or whatever you need to do in message, if not success return error here
        log.printf("I am just logging the message value as a sample code: %s", string(kafkaMsg.Value))
        // END

	commitMessageError := kafkaReader.CommitMessages(perMessageContext, m)
	if commitMessageError != nil {
		return commitMessageError
	}

	return nil
}

This code is easier to read, no nasty surprises and ensure that you process the message properly, else kafka should resend the message because offset was not committed

P.S. Apologies if it has compile issues here and there, I did this with no IDE, but the salient points is still there

silencer07 avatar Mar 14 '23 23:03 silencer07

@silencer07 : Love the code structure of your production grade example. Would like to know how would you extend this for multiple consumers listening to multiple topics.

ankush-custiv avatar Mar 15 '23 09:03 ankush-custiv

Still the same. The difference is that you have a list of topics you want to listen to, then get partitions for the topics, then create a consumer per partition per topic this way.

however this is not our problem because we are have consumer groups and we manage the parallelization of consumption via kurbenetes

silencer07 avatar Mar 15 '23 10:03 silencer07

There's some things regarding your issue:

  • A parent (root) context is required the be passed and used between functions to signal when application is shutting down. This will tell all inner resources to stop tasks and start the graceful shutdown.
  • Scoped contexts (reader func) should use timeout contexts context.WithTimeout(rootCtx, time.Second * n) so the actual message processing doesn't end in a deadlock.
  • Your reader (worker) scheduler MUST wait for workers to finish their execution. You may use sync.WaitGroup to deal with that concurrent-safely. That way you don't loose messages unless the timeout expires (otherwise you'll get a deadlock).
  • Your first example lacks of commits.
  • Your second example allocates a lot of memory. Potentially a lot more inefficient than the trivial way. You allocate each time a new reader instance along every field (which are a wide range of types, such as slices). Maybe using sync.Pool could reduce GC back pressure but not sure it's worth it compared to trivial implementation. Finally, the usage of many functions using pointer values just makes stacked values escape to heap, increasing GC back pressure even more.

aruizeac avatar Mar 19 '23 09:03 aruizeac