alloy icon indicating copy to clipboard operation
alloy copied to clipboard

Add more configurable items for loki.source.kafka

Open marunrun opened this issue 4 months ago • 0 comments

Request

  1. config.Consumer.Offsets.Initial config

  2. config.Consumer.Return.Errors config

Use case

I have a long-running Kafka instance that has accumulated a large amount of logs. I don't need to store all the historical logs in Loki; I only want to capture the new logs. However, I noticed that the default value for config.Consumer.Offsets.Initial is set to sarama.OffsetOldest, which prevents me from selecting the newer logs. https://github.com/grafana/alloy/blob/b013bc598e572d1fef330bea2c71b85a0a020e05/internal/component/loki/source/internal/kafkatarget/target_syncer.go#L63

When testing Kafka consumption, I encountered some errors, but no logs were being printed, which left me quite puzzled. After reviewing Sarama's example, I discovered there is a way to log consumption exceptions.

https://pkg.go.dev/github.com/IBM/sarama#example-ConsumerGroup

	config.Consumer.Return.Errors = true

	// Track errors
	go func() {
		for err := range group.Errors() {
			fmt.Println("ERROR", err)
		}
	}()

marunrun avatar Oct 11 '24 09:10 marunrun