kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

A new method for Batch called ReadIntoMessage

Open rouzier opened this issue 1 year ago • 0 comments

Describe the solution you would like

Could we have a method that combines Batch.Read() and Batch.ReadMessage() Where if there is enough space in msg.Value and msg.Key it will reuse those buffers otherwise create a new buffer.

Example usage.

        // to consume messages
        topic := "my-topic"
        partition := 0

        conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
        if err != nil {
                log.Fatal("failed to dial leader:", err)
        }

        conn.SetReadDeadline(time.Now().Add(10 * time.Second))
        batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
        msg := kafka.Message{
                Value: make([]byte, 10e3), // 10KB max per message
                Key:   make([]byte, 256),  // 256 bytes max per message
        }
        for {
                err := batch.ReadIntoMessage(&msg)
                if err != nil {
                        break
                }
                fmt.Println(string(msg.Value))
        }

        if err := batch.Close(); err != nil {
                log.Fatal("failed to close batch:", err)
        }

        if err := conn.Close(); err != nil {
                log.Fatal("failed to close connection:", err)
        }

I could provide a PR if you guys are interested.

rouzier avatar Feb 21 '24 18:02 rouzier