confluent-kafka-go icon indicating copy to clipboard operation
confluent-kafka-go copied to clipboard

Provide a simple log consumer

Open sagikazarmark opened this issue 1 year ago • 0 comments

Description

#421 added the option to consume logs from a channel instead of writing them to stderr.

As of Go 1.21 though, there is a fairly usable structured logging solution in the standard library.

I propose adding support for that logger in the form of a function that can consume logs from the Logs channel.

Here is the piece of code I wrote and use currently:

package kafka

import (
	"context"
	"log/slog"

	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

// LogEmitter emits logs from a [kafka.Consumer] or [kafka.Producer].
//
// Requires `go.logs.channel.enable` option set to true.
//
// This feature was implemented in [this PR].
//
// [this PR]: https://github.com/confluentinc/confluent-kafka-go/pull/421
type LogEmitter interface {
	Logs() chan kafka.LogEvent
}

// ConsumeLogChannel is supposed to be called in a goroutine.
// It consumes a log channel returned by a [LogEmitter].
func ConsumeLogChannel(emitter LogEmitter, logger *slog.Logger) {
	for e := range emitter.Logs() {
		processLog(logger, e)
	}
}

func processLog(logger *slog.Logger, e kafka.LogEvent) {
	logger.Log(
		context.Background(),
		mapLogLevel(e.Level),
		e.Message,
		slog.String("name", e.Name),
		slog.String("tag", e.Tag),
		slog.Time("timestamp", e.Timestamp),
	)
}

// According to [kafka.LogEvent] the Level field is an int that contains a syslog severity level.
func mapLogLevel(level int) slog.Level {
	switch level {
	case 7:
		return slog.LevelDebug

	case 6, 5:
		return slog.LevelInfo

	case 4:
		return slog.LevelWarn

	case 3, 2, 1, 0:
		return slog.LevelError

	default:
		return slog.LevelInfo
	}
}

Using the consumer:

go ConsumeLogChannel(producer, logger)

How to reproduce

Checklist

Please provide the following information:

  • [x] confluent-kafka-go and librdkafka version (LibraryVersion()): 2.2.0
  • [ ] Apache Kafka broker version:
  • [ ] Client configuration: ConfigMap{...}
  • [ ] Operating system:
  • [ ] Provide client logs (with "debug": ".." as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

sagikazarmark avatar Aug 23 '23 13:08 sagikazarmark