ekuiper icon indicating copy to clipboard operation
ekuiper copied to clipboard

Add Support for Key, Partition Offset, and Headers in Kafka Source Plugin

Open wduczkowski opened this issue 11 months ago • 2 comments

Currently, the Kafka source plugin does not retrieve key, partition, offset or header information from Kafka messages, even though the underlying library (github.com/segmentio/kafka-go) already supports these parameters. This omission prevents users from accessing important metadata that can be crucial for debugging, routing, or processing logic.

Proposed Solution:

Extend the existing Kafka source plugin to include the retrieval of:

  • Message Key
  • Partition
  • Offset
  • Headers
  • Topic

Update any existing data structures or interfaces within the plugin to store and return these additional metadata fields. Ensure that backward compatibility is preserved if the plugin is already being used without this metadata.

Example Reference: Below is a sample code snippet using kafka-go that demonstrates how to read the topic, partition, offset, key, value and headers:

import (
	"context"
	"fmt"
	"log"
	"os"
	"strings"

	kafka "github.com/segmentio/kafka-go"
)

func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
	brokers := strings.Split(kafkaURL, ",")
	return kafka.NewReader(kafka.ReaderConfig{
		Brokers:  brokers,
		GroupID:  groupID,
		Topic:    topic,
		MinBytes: 10e3, // 10KB
		MaxBytes: 10e6, // 10MB
	})
}

func main() {
	kafkaURL := os.Getenv("kafkaURL")
	topic := os.Getenv("topic")
	groupID := os.Getenv("groupID")

	reader := getKafkaReader(kafkaURL, topic, groupID)
	defer reader.Close()

	fmt.Println("start consuming ... !!")
	for {
		m, err := reader.ReadMessage(context.Background())
		if err != nil {
			log.Fatalln(err)
		}
		fmt.Printf("message at topic:%v partition:%v offset:%v  %s = %s  headers:%v\n",
			m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value), m.Headers)
	}
}

Event on kafka topic:

key: dev000001
header : {
     "app.id": "best-app-ever"
}
value: { "temperature":32.7063, "createTime":1735896706186, "correlationId":"bb889204-4ad7-4652-b18a-bc465592302d"} 

output:

consumer-logger-1  | message at topic:connect.edge.raw.temperature partition:0 offset:31106     dev000001 = {"temperature":32.7063,"createTime":1735896706186,"correlationId":"bb889204-4ad7-4652-b18a-bc465592302d"}  headers:[{app.id best-app-ever}]

wduczkowski avatar Jan 03 '25 10:01 wduczkowski

Hi @ngjaying I'd like to work on this issue. The issue is still available? Thanks!

Elia-Renzoni avatar Oct 08 '25 07:10 Elia-Renzoni

@Elia-Renzoni Thanks for your interest! It is indeed still open. Feel free to assign it to yourself (if applicable) and begin work. We appreciate you picking this up!

ngjaying avatar Oct 09 '25 00:10 ngjaying