nats-server icon indicating copy to clipboard operation
nats-server copied to clipboard

Message Group / Sequential Convoy pattern

Open stepin opened this issue 3 years ago • 30 comments

I need to process messages in parallel but in some cases, they should be processed in sequence. A good simple example is the order case: messages for different order ids should be processed in parallel but for the same should be processed in sequence. This pattern is described in more details at Microsoft https://docs.microsoft.com/en-us/azure/architecture/patterns/sequential-convoy and at ActiveMQ http://activemq.apache.org/message-groups.html . It's a common use case for Event Sourcing scenarios.

Requirements:

  • at least once delivery (and exactly one within a window is even better)
  • messages have a sequence key: if it's null no limitations on parallelization, if it's not null its name of a sequence
  • single workers' group (in worst case 2 groups for null and non-null sequence keys)
  • timeouts for processing
  • up to 5 attempts to process (in error case it's better to send failed messages into the dedicated topic)

I see several options:

  1. NATS JetStream Workers
  • NATS JetStream Workers for jobs that can run in parallel
  • messages with ID header for exactly once detection
  • all preprocessing in a custom code with external storage for sequences

In this case, it's unclear how my code will receive notification that some message was processed (to check if there are more messages to process in the same sequence).

  1. NATS Core Services
  • NATS Core Service for jobs that can run in parallel
  • all preprocessing in a custom code with external storage for all messages in progress

It's clear how to implement this option but too many queue-related code outside of the queue.

Is this possible to implement the original pattern with storage inside JetStream? Or maybe even without custom code (maybe I missed or misunderstood some options like MaxConsumers)? Is this pattern planned for implementation in the future?

stepin avatar Mar 27 '21 07:03 stepin

We've discussed this one a bunch, I think we will need a solution to this but do not have an elegant one today - almost there but we have no way to rebalance consumers.

One for the back burner for future releases I think

ripienaar avatar Mar 28 '21 12:03 ripienaar

See https://github.com/nats-io/jetstream/issues/103

ripienaar avatar Mar 28 '21 12:03 ripienaar

Ok, I will close this as duplicate.

stepin avatar Mar 28 '21 22:03 stepin

could this be reopened?

https://github.com/nats-io/jetstream/issues/103 and the repo itself were archived because jetstream is now in GA and part of the nats server, which leads back to here again?

cortopy avatar Aug 07 '21 17:08 cortopy

We have been discussing this one quite a bit. No resolutions yet but it's definitely on our radar and part of active discussions.

derekcollison avatar Aug 07 '21 19:08 derekcollison

Hi, I have an idea. What will say community?

We can use distributed atomic lock system (for example ZooKeeper or ETCD).

  1. Consumer pulls 20 messages.
  2. Consumer iterates over messages' payloads.
  3. Consumer looks to some special key inside each payload (e.g. order_id) and tries to set lock in zookeeper say in /orders/{order_id}.
  4. If lock already exists then we skip this message by sending msg.Nak() (not acknowledged) because that means a message with same key already handling by another consumer.
  5. If lock doesn't exist then we'll take it and go to handle the message.

This logic can be implemented also inside the nats-server. But I don't know how this affects on performance.

timsolov avatar Aug 24 '21 07:08 timsolov

JetStream already does this. When you pull 20 messages you have a lock on that message for AckWait time.

Is it that you want to arrange it so that one consumer always handle all messages related to a given order ID? If that’s what you are asking then we are looking at adding something to the server itself. Currently discussed here https://github.com/nats-io/nats-architecture-and-design/pull/36

ripienaar avatar Aug 24 '21 07:08 ripienaar

@ripienaar not really, when we have GroupSubscription and 3 parallel consumers we can receive different events simultaneously which in microservice arch should be handled one by one. Example:

  1. Customer creates invitation and removes this invitation straight off.
  2. First consumer starts handle creation of invitation but doesn't have time complete transaction when another event (remove invitation) comes to Second consumer. And the Second consumer tries to remove non-existing invitation.

So what the nats-server offers to handle this situation?

There are two ways as I see to resolve this:

  1. Like a Kafka: send messages by Key from header to different partitions.
  2. Something what I've suggested before in https://github.com/nats-io/nats-server/issues/2043#issuecomment-904405811

timsolov avatar Aug 24 '21 08:08 timsolov

Yes, this is the feature we are working on fleshing out in the link given https://github.com/nats-io/nats-architecture-and-design/pull/36

ripienaar avatar Aug 24 '21 08:08 ripienaar

Any news on the feature design? I am also looking for this capability.

rbkrabbe avatar Apr 23 '22 13:04 rbkrabbe

We have some updates in 2.8 for folks that are more aligned to the way other tech does partitioning. @jnmoyne might be able to give some insight.

We are still working on a the solution though described above.

derekcollison avatar Apr 23 '22 18:04 derekcollison

Yes, the new feature introduced in 2.8 is called deterministic subject token partitioning as an extension to the existing subject mapping functionality (which can be applied at the account level and during import/exports).

Deterministic token partitioning allows you to use subject based addressing to deterministically divide (partition) a flow of messages where one or more of the subject tokens make up the key upon which the partitioning will be based, into a number of smaller message flows.

For example: new customer orders are published on neworders.<customer id>, you can partition those messages over 3 partition numbers (buckets), using the partition(number of partitions, wildcard token positions...) function which returns a partition number (between 0 and number of partitions-1) by using the following mapping "neworders.*" : "neworders.{{wildcard(1)}}.{{partition(3,1)}}".

This particular mapping means that any message published on neworders.<customer id> will be mapped to subject.<customer id>.<a partition number 0, 1, or 2>. i.e.:

Published on Mapped to
neworders.customerid1 neworders.customerid1.0
neworders.customerid2 neworders.customerid1.2
neworders.customerid3 neworders.customerid3.1
neworders.customerid4 neworders.customerid4.2
neworders.customerid5 neworders.customerid5.1
neworders.customerid6 neworders.customerid6.0

The mapping is deterministic because (as long as the number of partitions is 3) 'customerid1' will always map to the same partition number. The mapping is hash based, it's distribution is random but tending towards 'perfectly balanced' distribution (i.e. the more keys you map the more the number of keys for each partition will tend to converge to the same number).

You can partition on more than one subject wildcard token at a time, e.g.: {{partition(10,1,2)}} distributes the union of token wildcards 1 and 2 over 10 partitions.

Published on Mapped to
foo.1.a foo.1.a.1
foo.1.b foo.1.b.0
foo.2.b foo.2.b.9
foo.2.a foo.2.a.2

What this deterministic partition mapping enables is the distribution of the messages that are subscribed to using a single subscriber (on neworders.*) into three separate subscribers (respectively on neworders.*.0, neworders.*.1 and neworders.*.2) that can operate in parallel.

jnmoyne avatar Apr 24 '22 22:04 jnmoyne

@derekcollison @jnmoyne this is extremely good and probably one of the biggest features that people migrating from kafka may be looking for. Thanks so much for this

However, I just wanted to say that I'm a bit puzzled by not being able to find anything about this in the docs at nats.io. It would be a shame if this feature went unnoticed

cortopy avatar Apr 25 '22 09:04 cortopy

Thanks. I also believe that this was one of the few things some people coming from Kafka to NATS have been wanting that we didn't have until now.

The feature is absolutely brand new (2.8.0 was literally released last week) and the doc update will happen very soon (what I wrote here is actually a preview of what the doc will say about that feature (comments welcomed:))

jnmoyne avatar Apr 25 '22 14:04 jnmoyne

@cortopy that was all @jnmoyne ! We will get docs updated as soon as we can.

derekcollison avatar Apr 25 '22 16:04 derekcollison

Yeah I have a large doc PR in the works

jnmoyne avatar Apr 25 '22 19:04 jnmoyne

FYI docs have been updated now. I think it may be ok to close this issue now?

jnmoyne avatar May 06 '22 19:05 jnmoyne

Thank you for the updates. Sorry for disappearing after asking a question 😅

While the partitioning is nice, and the docs look good, Ideally, I would want to be able to horizontally scale the consumers for a stream and still get in-order delivery by some key. With partitioning, I would be restricted to run a single consumer per partition to get the ordering I need.

In the ADR linked above, I also see this:

we might require a header to be set for example NATS-Consumer-ID: 1 for sensor 1

Which I will be tricky. It requires the publisher to be aware of the topology of the consumers, but that topology should ideally be dynamic.

I'm wondering if something like message de-duplication could be repurposed for this? If the publisher adds something like a NATS-Convoy-ID: 1 header (note that the ID is for the sensor, not the consumer), and a message is considered duplicate if there are in-flight messages with the same convoy id.

At least for my use-case, it would be acceptable to break the ordering guarantee if a message has not been ACK'ed, NACK'ed or marked In-progress for a few minutes.

rbkrabbe avatar May 07 '22 06:05 rbkrabbe

@jnmoyne can you provide the link to documentation? I think it would be good to have direct link from this issue. Thank you.

timsolov avatar May 07 '22 13:05 timsolov

@timsolov Of course, here is the link: https://docs.nats.io/nats-concepts/subject_mapping#deterministic-subject-token-partitioning

jnmoyne avatar May 18 '22 08:05 jnmoyne

@rbkrabbe Partitioning does indeed allow you to scale processing while still getting in-order delivery per key, you just need to use more partition to scale further.

You can have guaranteed strict in-order delivery per partition (even if you happen to have more than one subscriber per partition consumer) simply by setting "Max acks pending" to 1 on the consumer for the partition, no need to use any headers for that (the key the partitioning happens on is one (or more) token(s) in the subject name) or to try to re-use message de-duplication.

jnmoyne avatar May 18 '22 08:05 jnmoyne

Hi, thank for this feature, but I have a question, in kafka we have migration for partition if our instance of counsumer gone down, but in this case are there any solutions to this problem?

z0mb1ek avatar Jun 04 '22 19:06 z0mb1ek

@jnmoyne That would cause head-of-line blocking on the partition right? It would effectively mean that only 1 consumer in the group would be active at a time while the others idled.

How easy is it to change the deterministic partitioning to add more partitions and is there a limit to how many partitions it is practical to have?

I have 2 use-cases in mind for this:

  1. Replicate blobs between buckets. In this case I would ideally like convoy-per-blob, but could consider partition per bucket with "Max acks pending = 1", or deterministic partitioning by blob name. My concern with partition per bucket is that I would have hot/cold buckets, so some consumers would be busy and might suffer delays while others would be idling. Deterministic partitioning per blob name would probably work OK because the likelihood of the consumer failing would be quite low. If the consumer fails, the entire partition would be blocked as I understand it, which would mean that a failure on 1 bucket would leak onto other buckets.
  2. Event stream per object I have a set of objects, much like the "order_id"/"customerid" examples above where I need a convoy per id. In this case there is a higher risk that a consumer might fail, and I can't tolerate a failure on processing for 1 customer blocking processing of another customer. This seems to rule out deterministic partitioning, or am I missing some trick that would allow me to work around the head-of-line blocking?

rbkrabbe avatar Jun 08 '22 09:06 rbkrabbe

Note on the terminology in use when talking about JS: 'streams' are what record messages, and you create 'consumers' on streams, those consumers are a bit like a DataBase 'view' in the sense that they let applications receive all or a subset of the messages in a stream, consumers have some internal state (which is kept and maintained by the JS servers) which consists of sequence number counters and tables which are updated as the messages are sent by the consumer and acknowledged by the subscribers. The actual applications that are going to process the messages 'subscribe' to a consumer. So those applications although they can be seen as 'consuming messages' are not consumers, but subscribers to consumers. There are no 'consumer groups' like you have on Kafka and no 'rebalancing', rather you have subscribers to consumers and typically you would rely on your container orchestration system of choice to start/stop/monitor and restart your applications such that you have whatever number of subscriber per consumer (partition) you want, each subscriber being passed the consumer name (i.e. partition number) that it should subscribe to as part of it's runtime information (e.g. command line argument or environment variable, which is set by the container orchestration system you are using).

If you want a guaranteed strictly ordered processing then you indeed need to set the 'max acks pending' value for the partition's consumer to 1, meaning that only one of the subscribers to that consumer gets a message to process at a time. If you are using partitioning for some other reason than strictly ordered processing, then you can increase the number of max acks pending to more than 1 and have more than one subscriber to the consumer receiving a message at the same time.

Even with max acks pending set to 1, you can have more than one subscriber at a time on the consumer (but only one of them would get a message to process at a time) which means that if one of those subscribers were to die, processing would still continue as long as you have at least one subscriber to that consumer.

The partition mapping (e.g. the number of partitions) can be changed at any time, to increase the number of partitions you would: first create streams/consumers for the new partitions you are going to add, then change the mapping to increase the number of partitions, then start workers to subscribe to those new partitions (if you don't need strict ordering, you can even start them before changing the mapping). To decrease the number of partitions you would: first change the mapping to decrease the number of partitions, then monitor the consumers for the partitions you removed and once those partitions have delivered all the messages they may still have buffered to their subscribers you can stop them and remove the streams/consumers for those partitions you removed.

jnmoyne avatar Jun 08 '22 19:06 jnmoyne

@jnmoyne I understand your point of view, but if I have 3 instance I want to balance my work and have 1 consumer per 1 instance of my service, but in your case I can get 3 working consumers on 1 instance, and 2 instance will rest

z0mb1ek avatar Jun 09 '22 01:06 z0mb1ek

You control (for example using your container orchestration system of choice) how many instances of your worker you want to have per partition. So you can have 3 partitions and one subscriber/worker per partition and they will all be doing work.

There is a disconnect between the term "consumer" as it is used in Kafka (coming from the "consumer group" term) and the meaning of that term in JetStream. To simplify: a "consumer" in JetStream is like a "partition" in Kafka and a "subscriber" in JetStream is like a "consumer" in Kafka.

jnmoyne avatar Jun 09 '22 01:06 jnmoyne

If I have one subscriber/worker per partition and my instance go down, and there is no more free resources in my cluster, work will stop. Moreover for example in k8s we need StatefulSet for instance numbering and when the instance crashes - k8s will not start another. In kafka this case will work

z0mb1ek avatar Jun 09 '22 01:06 z0mb1ek

There is no concept of "consumer groups" in JetStream where that functionality happens at the administrative/ops level and can be easily implemented using container orchestration. And I would argue that it is the right model rather than having something in the streaming service itself because for example your container orchestration system can start or restart new instances (containers) of your workers automatically for if they die while the consumer group feature of Kafka will never be able to start new worker containers automatically.

jnmoyne avatar Jun 09 '22 02:06 jnmoyne

That's true, but in real life there are many different situations (some about resources i said) and changing consumers on the fly is good feature. We will try to make this like a framework with nats cluster api, thanks for your support

z0mb1ek avatar Jun 09 '22 02:06 z0mb1ek

Just wanted to chime in and say you can still have multiple subscribers (workers) deployed for a given consumer (for a given partition) and maintain ordering. You need to set MaxAckPending to 1 which means there will only be a single message in flight across all workers for that partition. The benefit is that if one of the workers goes offline or you need to do a rolling deployment, you will always have one worker up.. so its like HA for your workers per partition while still maintaining order.

bruth avatar Jun 21 '22 21:06 bruth