kafka-konsumer
kafka-konsumer copied to clipboard
Easy implementation of kafka consumer with built-in exception manager (kafka-cronsumer)
Kafka Konsumer

Description
Kafka Konsumer provides an easy implementation of Kafka consumer with a built-in retry/exception manager (kafka-cronsumer).
Migration Guide
V2 Release Notes
- Added ability for manipulating kafka message headers.
- Added transactional retry feature. Set false if you want to use exception/retry strategy to only failed messages.
- Enable manuel commit at both single and batch consuming modes.
- Enabling consumer resume/pause functionality. Please refer to its example and how it works documentation.
- Bumped kafka-cronsumer to the latest version:
- Backoff strategy support (linear, exponential options)
- Added message key for retried messages
- Added x-error-message to see what was the error of the message during processing
- Reduce memory allocation.
- Increase TP on changing internal concurrency structure.
How to migrate from v1 to v2?
You can get latest version via go get github.com/Trendyol/kafka-konsumer/v2@latest
-
You need to change import path from
github.com/Trendyol/kafka-konsumer
togithub.com/Trendyol/kafka-konsumer/v2
-
You need to change your consume function with pointer signature.
-
We moved messageGroupDuration from
batchConfiguration.messageGroupDuration
to root level. Because this field is used single (non-batch) consumer too.
Installation
go get github.com/Trendyol/kafka-konsumer/v2@latest
Examples
You can find a number of ready-to-run examples at this directory.
After running docker-compose up
command, you can run any application you want.
Simple Consumer
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
ConsumeFn: consumeFn,
RetryEnabled: false,
}
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()
consumer.Consume()
}
func consumeFn(message kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
return nil
}
Simple Consumer With Retry/Exception Option
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
ConsumeFn: consumeFn,
}
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()
consumer.Consume()
}
func consumeFn(message kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
return nil
}
With Batch Option
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
LogLevel: kafka.LogLevelDebug,
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
MessageGroupDuration: time.Second,
BatchConfiguration: kafka.BatchConfiguration{
MessageGroupLimit: 1000,
BatchConsumeFn: batchConsumeFn,
},
}
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()
consumer.Consume()
}
func batchConsumeFn(messages []kafka.Message) error {
fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
return nil
}
With Disabling Transactional Retry
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
LogLevel: kafka.LogLevelDebug,
RetryEnabled: true,
TransactionalRetry: kafka.NewBoolPtr(false),
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
MessageGroupDuration: time.Second,
BatchConfiguration: kafka.BatchConfiguration{
MessageGroupLimit: 1000,
BatchConsumeFn: batchConsumeFn,
},
}
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()
consumer.Consume()
}
func batchConsumeFn(messages []kafka.Message) error {
// you can add custom error handling here & flag messages
for i := range messages {
if i%2 == 0 {
messages[i].IsFailed = true
}
}
// you must return err here to retry failed messages
return errors.New("err")
}
With Distributed Tracing Support
Please refer to Tracing Example
With Pause & Resume Consumer
Please refer to Pause Resume Example
With Grafana & Prometheus
In this example, we are demonstrating how to create Grafana dashboard and how to define alerts in Prometheus. You can
see the example by going to the with-grafana folder in the examples folder
and running the infrastructure with docker compose up
and then the application.
With SASL-PLAINTEXT Authentication
Under the examples - with-sasl-plaintext folder, you can find an example
of a consumer integration with SASL/PLAIN mechanism. To try the example, you can run the command docker compose up
under the specified folder and then start the application.
Configurations
config | description | default |
---|---|---|
reader |
Describes all segmentio kafka reader configurations | |
consumeFn |
Kafka consumer function, if retry enabled it, is also used to consume retriable messages | |
skipMessageByHeaderFn |
Function to filter messages based on headers, return true if you want to skip the message | nil |
logLevel |
Describes log level; valid options are debug , info , warn , and error |
info |
concurrency |
Number of goroutines used at listeners | 1 |
retryEnabled |
Retry/Exception consumer is working or not | false |
transactionalRetry |
Set false if you want to use exception/retry strategy to only failed messages | true |
commitInterval |
indicates the interval at which offsets are committed to the broker. | 1s |
rack |
see doc | |
clientId |
see doc | |
messageGroupDuration |
Maximum time to wait for a batch | 1s |
metricPrefix |
MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_konsumer . Currently, there are two exposed prometheus metrics. processed_messages_total and unprocessed_messages_total So, if default metric prefix used, metrics names are kafka_konsumer_processed_messages_total_current and kafka_konsumer_unprocessed_messages_total_current . |
kafka_konsumer |
dial.Timeout |
see doc | no timeout |
dial.KeepAlive |
see doc | not enabled |
transport.DialTimeout |
see doc | 5s |
transport.IdleTimeout |
see doc | 30s |
transport.MetadataTTL |
see doc | 6s |
transport.MetadataTopics |
see doc | all topics in cluster |
distributedTracingEnabled |
indicates open telemetry support on/off for consume and produce operations. | false |
distributedTracingConfiguration.TracerProvider |
see doc | otel.GetTracerProvider() |
distributedTracingConfiguration.Propagator |
see doc | otel.GetTextMapPropagator() |
retryConfiguration.clientId |
see doc | |
retryConfiguration.startTimeCron |
Cron expression when retry consumer (kafka-cronsumer) starts to work at | |
retryConfiguration.metricPrefix |
MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_cronsumer. Currently, there are two exposed prometheus metrics. retried_messages_total_current and discarded_messages_total_current. So, if default metric prefix used, metrics names are kafka_cronsumer_retried_messages_total_current and kafka_cronsumer_discarded_messages_total_current | kafka_cronsumer |
retryConfiguration.workDuration |
Work duration exception consumer actively consuming messages | |
retryConfiguration.topic |
Retry/Exception topic names | |
retryConfiguration.brokers |
Retry topic brokers urls | |
retryConfiguration.maxRetry |
Maximum retry value for attempting to retry a message | 3 |
retryConfiguration.tls.rootCAPath |
see doc | "" |
retryConfiguration.tls.intermediateCAPath |
Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
retryConfiguration.sasl.authType |
SCRAM or PLAIN |
|
retryConfiguration.sasl.username |
SCRAM OR PLAIN username | |
retryConfiguration.sasl.password |
SCRAM OR PLAIN password | |
retryConfiguration.skipMessageByHeaderFn |
Function to filter messages based on headers, return true if you want to skip the message | nil |
batchConfiguration.messageGroupLimit |
Maximum number of messages in a batch | |
batchConfiguration.batchConsumeFn |
Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | |
batchConfiguration.preBatchFn |
This function enable for transforming messages before batch consuming starts | |
batchConfiguration.balancer |
see doc | leastBytes |
tls.rootCAPath |
see doc | "" |
tls.intermediateCAPath |
Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
sasl.authType |
SCRAM or PLAIN |
|
sasl.username |
SCRAM OR PLAIN username | |
sasl.password |
SCRAM OR PLAIN password | |
logger |
If you want to custom logger | info |
apiEnabled |
Enabled metrics | false |
apiConfiguration.port |
Set API port | 8090 |
apiConfiguration.healtCheckPath |
Set Health check path | healthcheck |
metricConfiguration.path |
Set metric endpoint path | /metrics |
Monitoring
Kafka Konsumer offers an API that handles exposing several metrics.
Exposed Metrics
Metric Name | Description | Value Type |
---|---|---|
kafka_konsumer_processed_messages_total_current | Total number of processed messages. | Counter |
kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter |