alloy
alloy copied to clipboard
Add more configurable items for loki.source.kafka
Request
-
config.Consumer.Offsets.Initial
config -
config.Consumer.Return.Errors
config
Use case
I have a long-running Kafka instance that has accumulated a large amount of logs. I don't need to store all the historical logs in Loki; I only want to capture the new logs. However, I noticed that the default value for config.Consumer.Offsets.Initial is set to sarama.OffsetOldest, which prevents me from selecting the newer logs. https://github.com/grafana/alloy/blob/b013bc598e572d1fef330bea2c71b85a0a020e05/internal/component/loki/source/internal/kafkatarget/target_syncer.go#L63
When testing Kafka consumption, I encountered some errors, but no logs were being printed, which left me quite puzzled. After reviewing Sarama's example, I discovered there is a way to log consumption exceptions.
https://pkg.go.dev/github.com/IBM/sarama#example-ConsumerGroup
config.Consumer.Return.Errors = true
// Track errors
go func() {
for err := range group.Errors() {
fmt.Println("ERROR", err)
}
}()