sdk-go
sdk-go copied to clipboard
Kafka Receiver: Data Loss
Hello again,
Continuing the (small) discussion here: https://github.com/cloudevents/sdk-go/issues/642
The client here starts a new goroutine each time an event is pulled. At the end of "invoke" here, Finish() is called. For kafka receiver it calls MarkMessage() here This method is extracting the message offset, adding 1 to it and finally (not synchroneously) inform kafka that this consumer-group has processed all messages before message.offset+1
Since there is no control over the processing time (the time spent in invoke), the goroutine processing the last kafka message (for example) could finish first. Then the offset of this message is pushed "first", saying that this consumer group is up to date with the current kafka log for this partition. If the process is stopped / crashed / etc, before the other goroutine finished, when the process will be restarted (or when the rebalance is done for other consumer in the same group). Only new messages will be sent. All the other messages that were computed by the different goroutines are lost
For example, I pushed 100 messages in kafka in a single partition topic After that, I start a consumer with this configuration
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_0_0_0
saramaConfig.Consumer.Offsets.AutoCommit.Enable = true
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
for each message, I apply a pseudo processing with a random duration
func receive(ctx context.Context, event cloudevents.Event) {
// Random processing time
n := rand.Intn(1000)
time.Sleep(time.Duration(n) * time.Second)
fmt.Printf("After %vs, %s\n", n, event.DataEncoded)
}
After a few second I stop the process using Ctrl-C
Looking at the kafka offset I see
Which is unexpected in many ways. I consumed 3 messages (id 26, 10 & 42). The current offset of my consumer group shouldn't be 100, should be 42+1
After investigating, there is an issue here I think it should be something like
func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
tmp := message
m := NewMessageFromConsumerMessage(tmp)
r.incoming <- msgErr{
msg: binding.WithFinish(m, func(err error) {
if protocol.IsACK(err) {
session.MarkMessage(tmp, "")
}
}),
}
}
return nil
}
(I would be happy to make a PR for this)
After applying this and replaying my scenario I have
and in kafka
Which is better but still problematic im my opinion. I have only consume messages 9 & 25 and my offset is now 26. When my consumer will restart, the messages [0-8] U [10-24] won't be consumed
I saw the notion of partitioning in cloudevents, maybe it would interesting to introduce it in the client (that should stay an abstraction above the different protocol) and consume partition by partition
I don't plan to use the Kafka Receiver, I don't have an use-case (I'm more likelyto follow @slinkydeveloper advices and only focus on transforming a sarama Message into a cloudevents and vice-versa). I just though it was important enough to mention it here
Feel free to go ahead and PR the fix for the loop.
Which is better but still problematic im my opinion. I have only consume messages 9 & 25 and my offset is now 26. When my consumer will restart, the messages [0-8] U [10-24] won't be consumed
This is weird, MarkMessage
should not commit the offset immediatly but it should perform an offset tracking, that is it will commit 7 only when all messages from 0 to 6 are marked... Maybe I'm missing something here...
I think this is not the right behavior, it doesn't have this logic
https://github.com/Shopify/sarama/blob/0616f68815691527d3b6ab4d95a2881e09400fe3/offset_manager.go#L515-L519
Ah... well, then it doesn't work for sure :smile: We probably might need to implement by ourselves an offset tracker...
... Or ensure that message are consumed & processed 1 by 1 per partition I understand that the Client has to be an abstraction above protocols, but would it make sense to introduce the notion of partition into it ?
This notion is already present in the spec https://github.com/cloudevents/spec/blob/v1.0/extensions/partitioning.md
I understand that the Client has to be an abstraction above protocols, but would it make sense to introduce the notion of partition into it ?
I don't think so, to me this is outside the scope of the Client. It's not simple to abstract the partition concept and it will be useful only for kafka anyway. So why should this project provide an abstraction that just makes sense in the context of Kafka, and what makes that abstraction useful more than just using Sarama then?