franz-go
franz-go copied to clipboard
Batch and retry uncommitted records with group consumer and kotel
Hi,
I have 2 issues, one with batch and retry of records using group consumer, the second with making kotel
work.
I'm trying to understand how can I configure Batch fetch with group consumer, and for the messages that are not committed manually due to some logical error (AutoCommit is disabled) - retry the business logic.
Client configuration:
opts := []kgo.Opt{
kgo.SeedBrokers("localhost:9092"),
kgo.ConsumerGroup(myGroupId),
kgo.ConsumeTopics(myTopicName),
kgo.DisableAutoCommit(),
}
client, _ := kgo.NewClient(opts...)
A pseudo code example:
for {
fetches := client.PollFetches()
if fetches.Errors() > 0 {
continue
}
fetches.EachRecord(func(record){
res, err := businessLogic(record)
if err != nil {
// error occurred, hopes to try again next iteration (PollFetches)
log.Error("failed logic")
} else {
// logic finished successfully, commit message to prevent next iteration over the same message
client.CommitRecords(ctx, record)
}
})
}
There are a few issues I encountered:
- Once the records were fetched, they won't be iterateable again unless I restart the connection itself as no commits were done
- I can't manage batch polling myself due to same reason in point 1 above
So I have 2 questions:
- Is there a way I can poll the messages in selected batch using consumer group, and re-iterate messages that weren't committed without re-creating the client? I've seen the
client.PollRecords
withkgo.BlockRebalanceOnPoll
andclient.AllowRebalance
but didn't really understand that, nor was I sure it's what I needed. - I tried to instrument using
kotel
, but I just couldn't make it work. my local Jaeger didn't get any traces.
Example of the code I tried to use when implementing kotel
, I used the example file examples/hooks_and_logging/plugin_kotel/main.go
:
func jaegerTP() (*sdktrace.TracerProvider, error) {
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint("http://localhost:14268/api/traces")))
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("my-service"),
semconv.DeploymentEnvironmentKey.String("my-env"),
)),
)
//otel.SetTracerProvider(tp) // tried with that too...
return tp, err
}
func newClientWithTracing(topicName string) (*kgo.Client, *kotel.Tracer) {
//tracerProvider, err := initTracerProvider() // tried with that too...
tracerProvider, err := jaegerTP()
if err != nil {
panic(err)
}
defer func() {
if err := tracerProvider.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}()
kotelTracer := newKotelTracer(tracerProvider)
kotelService := newKotel(kotelTracer)
opts := []kgo.Opt{
kgo.SeedBrokers("localhost:9092"),
kgo.ConsumerGroup("my-service"),
kgo.ConsumeTopics(topicName),
kgo.DisableAutoCommit(),
kgo.WithHooks(kotelService.Hooks()...),
}
client, err := kgo.NewClient(opts...)
if err != nil {
panic(err)
}
return client, kotelTracer
}
func consumeMessages() {
// code...
fetches.EachRecord(func(msg *kgo.Record) {
_, span := ktracer.WithProcessSpan(msg)
log.Printf("received message: key=%s value=%s", string(msg.Key), string(msg.Value))
span.End()
})
// code...
}