sdk-go icon indicating copy to clipboard operation
sdk-go copied to clipboard

Kafka Receiver: Data Loss

Open pastequo opened this issue 4 years ago • 5 comments

Hello again,

Continuing the (small) discussion here: https://github.com/cloudevents/sdk-go/issues/642

The client here starts a new goroutine each time an event is pulled. At the end of "invoke" here, Finish() is called. For kafka receiver it calls MarkMessage() here This method is extracting the message offset, adding 1 to it and finally (not synchroneously) inform kafka that this consumer-group has processed all messages before message.offset+1

Since there is no control over the processing time (the time spent in invoke), the goroutine processing the last kafka message (for example) could finish first. Then the offset of this message is pushed "first", saying that this consumer group is up to date with the current kafka log for this partition. If the process is stopped / crashed / etc, before the other goroutine finished, when the process will be restarted (or when the rebalance is done for other consumer in the same group). Only new messages will be sent. All the other messages that were computed by the different goroutines are lost

For example, I pushed 100 messages in kafka in a single partition topic After that, I start a consumer with this configuration

	saramaConfig := sarama.NewConfig()
	saramaConfig.Version = sarama.V2_0_0_0
	saramaConfig.Consumer.Offsets.AutoCommit.Enable = true
	saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

for each message, I apply a pseudo processing with a random duration

func receive(ctx context.Context, event cloudevents.Event) {
	// Random processing time
	n := rand.Intn(1000)
	time.Sleep(time.Duration(n) * time.Second)

	fmt.Printf("After %vs, %s\n", n, event.DataEncoded)
}

After a few second I stop the process using Ctrl-C image

Looking at the kafka offset I see image

Which is unexpected in many ways. I consumed 3 messages (id 26, 10 & 42). The current offset of my consumer group shouldn't be 100, should be 42+1

After investigating, there is an issue here I think it should be something like

func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		tmp := message
		m := NewMessageFromConsumerMessage(tmp)

		r.incoming <- msgErr{
			msg: binding.WithFinish(m, func(err error) {
				if protocol.IsACK(err) {
					session.MarkMessage(tmp, "")
				}
			}),
		}
	}
	return nil
}

(I would be happy to make a PR for this)

After applying this and replaying my scenario I have image and in kafka image

Which is better but still problematic im my opinion. I have only consume messages 9 & 25 and my offset is now 26. When my consumer will restart, the messages [0-8] U [10-24] won't be consumed

I saw the notion of partitioning in cloudevents, maybe it would interesting to introduce it in the client (that should stay an abstraction above the different protocol) and consume partition by partition

I don't plan to use the Kafka Receiver, I don't have an use-case (I'm more likelyto follow @slinkydeveloper advices and only focus on transforming a sarama Message into a cloudevents and vice-versa). I just though it was important enough to mention it here

pastequo avatar Dec 15 '20 18:12 pastequo

Feel free to go ahead and PR the fix for the loop.

Which is better but still problematic im my opinion. I have only consume messages 9 & 25 and my offset is now 26. When my consumer will restart, the messages [0-8] U [10-24] won't be consumed

This is weird, MarkMessage should not commit the offset immediatly but it should perform an offset tracking, that is it will commit 7 only when all messages from 0 to 6 are marked... Maybe I'm missing something here...

slinkydeveloper avatar Dec 16 '20 09:12 slinkydeveloper

I think this is not the right behavior, it doesn't have this logic

https://github.com/Shopify/sarama/blob/0616f68815691527d3b6ab4d95a2881e09400fe3/offset_manager.go#L515-L519

pastequo avatar Dec 16 '20 09:12 pastequo

Ah... well, then it doesn't work for sure :smile: We probably might need to implement by ourselves an offset tracker...

slinkydeveloper avatar Dec 16 '20 09:12 slinkydeveloper

... Or ensure that message are consumed & processed 1 by 1 per partition I understand that the Client has to be an abstraction above protocols, but would it make sense to introduce the notion of partition into it ?

This notion is already present in the spec https://github.com/cloudevents/spec/blob/v1.0/extensions/partitioning.md

pastequo avatar Dec 16 '20 10:12 pastequo

I understand that the Client has to be an abstraction above protocols, but would it make sense to introduce the notion of partition into it ?

I don't think so, to me this is outside the scope of the Client. It's not simple to abstract the partition concept and it will be useful only for kafka anyway. So why should this project provide an abstraction that just makes sense in the context of Kafka, and what makes that abstraction useful more than just using Sarama then?

slinkydeveloper avatar Dec 16 '20 10:12 slinkydeveloper