watermill icon indicating copy to clipboard operation
watermill copied to clipboard

[watermill-kafka] - cannot process events concurrently without ACKing the event

Open nachogiljaldo opened this issue 1 year ago • 8 comments

When using watermill-kafka's subscriber, only one event can be processed at the time, unless you ACK the event. That, disregard the number of partitions you may have. If you ACK the event, the offset is marked as done, and therefore the event.

The consequence is that if you want to ensure at least once processing you can process only one event at the time.

I might be missing something, but I don't think I am, else I would expect this test to pass:

Publish 20 events in 8 partitions and expect to process at least 2 events in 15s without ACK

func TestConcurrentProcessingDifferentPartitions(t *testing.T) {
	pub, sub := createPartitionedPubSub(t)
	topicName := "topic_" + watermill.NewUUID()

	var messagesToPublish []*message.Message

	for i := 0; i < 20; i++ {
		id := watermill.NewUUID()
		messagesToPublish = append(messagesToPublish, message.NewMessage(id, nil))
	}
	err := pub.Publish(topicName, messagesToPublish...)
	require.NoError(t, err, "cannot publish message")

	messages, err := sub.Subscribe(context.Background(), topicName)
	require.NoError(t, err)

	var receivedMessages []*message.Message
	go func() {
		for {
			select {
			case msg, ok := <-messages:
				if ok {
					receivedMessages = append(receivedMessages, msg)
				}
			}
		}
	}()

	time.Sleep(15 * time.Second)
	require.GreaterOrEqual(t, len(receivedMessages), 2)
}

My expectation would be one of these (and ideally both configurable):

  • I can process concurrently N messages where N is the amount of partitions assigned to the consumer
  • I can get as many as I ask for (with a max buffer size) and I need to ACK / NACK them explicitly

Am I missing something, is it currently possible?

nachogiljaldo avatar Jul 26 '23 15:07 nachogiljaldo

A single kafka consumer is created for a Subscribe call.

Your assumption is half-correct. You can process N messages parallely if you have N partitions assigned to N consumers. Even if you have N amount of partitions, if you have only one kafka consumer then it may consume only one message at a time, there also might be some additional overhead for the consumer to switch partitions in between.

You would need N consumers in the same consumer group so that each consumer can bind to each partition and read from them parallelly. If you have fewer consumers and more partitions then your consumers will be doing a lot of partition switching, and if you have more consumers than partitions then some of your consumers will be idle, not receiving any messages at all.

Since the implementation creates a single consumer, it will get messages one by one and not in parallel as you expect.

sarkarshuvojit avatar Sep 19 '23 11:09 sarkarshuvojit

Yes, that part is clearly understood, but the subscriber works with any arbitrary number of partitions, it gets and uses those assigned to it by the broker, but it consumes one concurrently, disregard the number of partitions it has assigned.

I understand a workaround would be creating N subscribers, but that seems to put a lot of load in the consumer:

  • you need to know the number of partitions
  • the number of partitions might change while the consumers run

I wonder if it would make sense to add some configuration to add concurrency policies:

  • one (current behavior)
  • one per assigned partition
  • N from any partitions to rely on the consumer of apply concurrency checks if any

nachogiljaldo avatar Sep 19 '23 11:09 nachogiljaldo

Correct, I was not suggesting creating 3 Subscriptions but rather 3 lightweight consumers. I don't think that can currently be done.

Also just curious as you mention the number of partitions changing while the consumer runs. In your use-case, do the partition count change frequently?

A solution may be implementing a configuration, where internally one Subscribe call will create N kafka consumers and then fans in the messages to a single go channel.

sarkarshuvojit avatar Sep 19 '23 12:09 sarkarshuvojit

Also just curious as you mention the number of partitions changing while the consumer runs. In your use-case, do the partition count change frequently?

Not really, just saying it could happen :)

A solution may be implementing a configuration, where internally one Subscribe call will create N kafka consumers and then fans in the messages to a single go channel.

I will try go ahead and create a PR for that in watermill/kafka in the next few weeks (need to have a few days of "clear schedule"), is that just the way to go?

nachogiljaldo avatar Sep 19 '23 12:09 nachogiljaldo

is that just the way to go?

According to my limited knowledge, yes.

I will try go ahead and create a PR for that in watermill/kafka in the next few weeks

Let me know if you need a hand, I'd be happy to help.

sarkarshuvojit avatar Sep 19 '23 13:09 sarkarshuvojit

Looking at your code, I see no consumer group provided. Therefore the consumer assignment protocol is used, and acking a record doesn't really do anything useful

OneCricketeer avatar Sep 24 '23 04:09 OneCricketeer

Looking at your code, I see no consumer group provided. Therefore the consumer assignment protocol is used, and acking a record doesn't really do anything useful

It was just an example to illustrate the problem, but you're right, because there's no consumer group, the ACK does not result on offset changes. However, without the ACK no further events are received, so it does indeed do something useful :)

nachogiljaldo avatar Sep 30 '23 21:09 nachogiljaldo

Hey @sarkarshuvojit I created https://github.com/ThreeDotsLabs/watermill-kafka/pull/29 to try and implement a few different consumption methods, feel free to have a look.

nachogiljaldo avatar Oct 13 '23 22:10 nachogiljaldo