confluent-kafka-go
confluent-kafka-go copied to clipboard
Provide a simple log consumer
Description
#421 added the option to consume logs from a channel instead of writing them to stderr.
As of Go 1.21 though, there is a fairly usable structured logging solution in the standard library.
I propose adding support for that logger in the form of a function that can consume logs from the Logs channel.
Here is the piece of code I wrote and use currently:
package kafka
import (
"context"
"log/slog"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
// LogEmitter emits logs from a [kafka.Consumer] or [kafka.Producer].
//
// Requires `go.logs.channel.enable` option set to true.
//
// This feature was implemented in [this PR].
//
// [this PR]: https://github.com/confluentinc/confluent-kafka-go/pull/421
type LogEmitter interface {
Logs() chan kafka.LogEvent
}
// ConsumeLogChannel is supposed to be called in a goroutine.
// It consumes a log channel returned by a [LogEmitter].
func ConsumeLogChannel(emitter LogEmitter, logger *slog.Logger) {
for e := range emitter.Logs() {
processLog(logger, e)
}
}
func processLog(logger *slog.Logger, e kafka.LogEvent) {
logger.Log(
context.Background(),
mapLogLevel(e.Level),
e.Message,
slog.String("name", e.Name),
slog.String("tag", e.Tag),
slog.Time("timestamp", e.Timestamp),
)
}
// According to [kafka.LogEvent] the Level field is an int that contains a syslog severity level.
func mapLogLevel(level int) slog.Level {
switch level {
case 7:
return slog.LevelDebug
case 6, 5:
return slog.LevelInfo
case 4:
return slog.LevelWarn
case 3, 2, 1, 0:
return slog.LevelError
default:
return slog.LevelInfo
}
}
Using the consumer:
go ConsumeLogChannel(producer, logger)
How to reproduce
Checklist
Please provide the following information:
- [x] confluent-kafka-go and librdkafka version (
LibraryVersion()
): 2.2.0 - [ ] Apache Kafka broker version:
- [ ] Client configuration:
ConfigMap{...}
- [ ] Operating system:
- [ ] Provide client logs (with
"debug": ".."
as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue