kafka-go
kafka-go copied to clipboard
A new method for Batch called ReadIntoMessage
Describe the solution you would like
Could we have a method that combines Batch.Read() and Batch.ReadMessage() Where if there is enough space in msg.Value and msg.Key it will reuse those buffers otherwise create a new buffer.
Example usage.
// to consume messages
topic := "my-topic"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
msg := kafka.Message{
Value: make([]byte, 10e3), // 10KB max per message
Key: make([]byte, 256), // 256 bytes max per message
}
for {
err := batch.ReadIntoMessage(&msg)
if err != nil {
break
}
fmt.Println(string(msg.Value))
}
if err := batch.Close(); err != nil {
log.Fatal("failed to close batch:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("failed to close connection:", err)
}
I could provide a PR if you guys are interested.