kafka-go
kafka-go copied to clipboard
A production-grade sample code on how to read message
Describe the solution you would like
As the developers using this framework, we want a sample code from documentation that is easy to understand and that is production grade
The sample code in readme:
// make a new reader that consumes from topic-A
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
is good when you are trying out the framework, but it gave us bugs when we tested like lost messages, improper commit of offsets etc.
Better way of reading messages from kafka
the code below so far does a better job of reading messages and ensure that the message will no be lost
func main() {
go startConsumingMessages()
}
func startConsumingMessages() {
log.Printf("start consuming messages... !!")
for {
// this ensure that new kafka reader will be created every time we return from the readMessage
// this variable will be properly garbage-collected as well, avoiding memory leaks
log.Printf("Creating new kafka reader...")
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
readMessage(reader)
}
}
func readMessage(reader *kafka.Reader) {
defer func() {
log.Printf("Kafka reader will be closed...")
readerCloseError := reader.Close()
if readerCloseError != nil {
fmt.Printf("Error while trying to close the reader -> %v\n",readerCloseError.Error())
}
}()
for {
err := processMessage(reader)
if err != nil {
fmt.Printf("Kafka reader encountered error. breaking the loop... -> %v\n",err.Error())
break
}
}
}
func (messageReader *MessageReader) processMessage(kafkaReader *kafka.Reader) error {
perMessageContext := context.Background()
kafkaMsg, fetchMessageError := kafkaReader.FetchMessage(perMessageContext)
if fetchMessageError != nil {
return fetchMessageError
}
// START - make sure to were able to process/persist or whatever you need to do in message, if not success return error here
log.printf("I am just logging the message value as a sample code: %s", string(kafkaMsg.Value))
// END
commitMessageError := kafkaReader.CommitMessages(perMessageContext, m)
if commitMessageError != nil {
return commitMessageError
}
return nil
}
This code is easier to read, no nasty surprises and ensure that you process the message properly, else kafka should resend the message because offset was not committed
P.S. Apologies if it has compile issues here and there, I did this with no IDE, but the salient points is still there
@silencer07 : Love the code structure of your production grade example. Would like to know how would you extend this for multiple consumers listening to multiple topics.
Still the same. The difference is that you have a list of topics you want to listen to, then get partitions for the topics, then create a consumer per partition per topic this way.
however this is not our problem because we are have consumer groups and we manage the parallelization of consumption via kurbenetes
There's some things regarding your issue:
- A parent (root) context is required the be passed and used between functions to signal when application is shutting down. This will tell all inner resources to stop tasks and start the graceful shutdown.
- Scoped contexts (reader func) should use timeout contexts
context.WithTimeout(rootCtx, time.Second * n)so the actual message processing doesn't end in a deadlock. - Your reader (worker) scheduler MUST wait for workers to finish their execution. You may use
sync.WaitGroupto deal with that concurrent-safely. That way you don't loose messages unless the timeout expires (otherwise you'll get a deadlock). - Your first example lacks of commits.
- Your second example allocates a lot of memory. Potentially a lot more inefficient than the trivial way. You allocate each time a new reader instance along every field (which are a wide range of types, such as slices). Maybe using
sync.Poolcould reduce GC back pressure but not sure it's worth it compared to trivial implementation. Finally, the usage of many functions using pointer values just makes stacked values escape to heap, increasing GC back pressure even more.