franz-go icon indicating copy to clipboard operation
franz-go copied to clipboard

Batch and retry uncommitted records with group consumer and kotel

Open yakirSalt opened this issue 4 months ago • 6 comments

Hi,

I have 2 issues, one with batch and retry of records using group consumer, the second with making kotel work. I'm trying to understand how can I configure Batch fetch with group consumer, and for the messages that are not committed manually due to some logical error (AutoCommit is disabled) - retry the business logic.

Client configuration:

opts := []kgo.Opt{
  kgo.SeedBrokers("localhost:9092"),
  kgo.ConsumerGroup(myGroupId),
  kgo.ConsumeTopics(myTopicName),
  kgo.DisableAutoCommit(),
}
client, _ := kgo.NewClient(opts...)

A pseudo code example:

for {
  fetches := client.PollFetches()
  if fetches.Errors() > 0 {
    continue
  }
  
  fetches.EachRecord(func(record){
    res, err := businessLogic(record)
    if err != nil {
      // error occurred, hopes to try again next iteration (PollFetches)
      log.Error("failed logic")
    } else {
      // logic finished successfully, commit message to prevent next iteration over the same message
     client.CommitRecords(ctx, record)
    }
  })
}

There are a few issues I encountered:

  1. Once the records were fetched, they won't be iterateable again unless I restart the connection itself as no commits were done
  2. I can't manage batch polling myself due to same reason in point 1 above

So I have 2 questions:

  1. Is there a way I can poll the messages in selected batch using consumer group, and re-iterate messages that weren't committed without re-creating the client? I've seen the client.PollRecords with kgo.BlockRebalanceOnPoll and client.AllowRebalance but didn't really understand that, nor was I sure it's what I needed.
  2. I tried to instrument using kotel, but I just couldn't make it work. my local Jaeger didn't get any traces.

Example of the code I tried to use when implementing kotel, I used the example file examples/hooks_and_logging/plugin_kotel/main.go:

func jaegerTP() (*sdktrace.TracerProvider, error) {
	exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint("http://localhost:14268/api/traces")))
	if err != nil {
		return nil, err
	}

	tp := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exp),
		sdktrace.WithResource(resource.NewWithAttributes(
			semconv.SchemaURL,
			semconv.ServiceNameKey.String("my-service"),
			semconv.DeploymentEnvironmentKey.String("my-env"),
		)),
	)

	//otel.SetTracerProvider(tp) // tried with that too...

	return tp, err
}

func newClientWithTracing(topicName string) (*kgo.Client, *kotel.Tracer) {
	//tracerProvider, err := initTracerProvider() // tried with that too...
	tracerProvider, err := jaegerTP()
	if err != nil {
		panic(err)
	}
	defer func() {
		if err := tracerProvider.Shutdown(context.Background()); err != nil {
			log.Printf("Error shutting down tracer provider: %v", err)
		}
	}()

	kotelTracer := newKotelTracer(tracerProvider)
	kotelService := newKotel(kotelTracer)
	opts := []kgo.Opt{
		kgo.SeedBrokers("localhost:9092"),
		kgo.ConsumerGroup("my-service"),
		kgo.ConsumeTopics(topicName),
		kgo.DisableAutoCommit(),
		kgo.WithHooks(kotelService.Hooks()...),
	}

	client, err := kgo.NewClient(opts...)
	if err != nil {
		panic(err)
	}
	return client, kotelTracer
}

func consumeMessages() {
  // code...
  fetches.EachRecord(func(msg *kgo.Record) {
  _, span := ktracer.WithProcessSpan(msg)
  log.Printf("received message: key=%s value=%s", string(msg.Key), string(msg.Value))
  span.End()
  })
  // code...
}

yakirSalt avatar Mar 03 '24 21:03 yakirSalt