ziggurat
ziggurat copied to clipboard
Adressing Out Of Order Processing via a datastore solution
Problem and context
I can explain a OutOfOrder Processing problem using an example. Assume the following is the timeline of events in a kafka topic ->
Message | Timestamp | Key ( here order id) | Record |
---|---|---|---|
M1 | 10:00 pm | 123 | ORDER_CREATED |
M2 | 10:01 pm | 123 | ORDER_MODIFIED |
M3 | 10:02 pm | 123 | ORDER_CANCELLED |
M4 | 10:01 pm | 456 | ORDER_CANCELLED |
Chances of Out of Order Processing will happen when a consumer
- consumes M1 and commits it
- consumes M2 but fails to process it and pushes it to DLQ
- consumes M3 and commits it
Now someone cannot replay the DLQ as if they do M2 might get processed and that's a issue cause you cant have a cancelled order modified . It should be handled in the consumer but then all these rules need to be written in the consumer to detail out an acceptable sequence of events which might not be feasible.
Kafka guarantees that messages pushed in the same partition will always be delivered in order of pushing . Here messages for a single order are being pushed to same partition as the key of the record is order_id . Kafka wants you to segment your data this way as well . After all, this allows for in order processing .
However it is not always necessary that developers push the change event for the same resource in the same partition and instead choose to generate a unique key for each kafka record . Usecase being to even out the partitions and not have one partition completely blocked if one message for a order is creating a problem .
Solution
A universal solution is for all consumers is to remember when was the last seen timestamp for each order . So for above example said store would look like
Last seen timestamp | Key |
---|---|
10:02 pm | 123 |
10:07 pm | 456 |
Every new message that comes into the queue makes an entry in the table and based on its staleness we take an action. Lets say since M2 with timestamp 10:01 arrived after M3 , it can be skipped or handled differently post comparison with this table ( table shows that we already processed a later message so this message M2 is stale event ).
Proposal
We are proposing an optional middle-ware using which a user can get in there message-metadata in the mapper function whether or not the message is stale. ( behind latest processed message)
The function would take in a message , extract the key ( userid/orderid/driverid ) from it and upsert into the store the id and timestamp it has against itself. Every time a new message is processed it would compare timestamps and add a is-stale? key to message-metadata persistentMap. If key is not present in the store i.e. the message is seen for the first time OR message is fresh i.e. timestamp greater than present timestamp in store it will just upsert the entry ( id ,timestamp) in the store.
We would be using redis as a data store . We would need four configs from the user
Config | Purpose | Sample Value |
---|---|---|
ZIGGURAT_STREAM_OOO_ENABLED | Enable/Disable the feature | true |
ZIGGURAT_STREAM_OOO_UNIQUE_ID_PATH | How to extract key from your message in this stream | [:key :order :id] |
ZIGGURAT_STREAM_OOO_PRODUCED_TIMESTAMP_PATH | How to extract timestamp from your message in this stream | [:key :timestamp] |
ZIGGURAT_STREAM_OOO_REDIS_HOSTNAME | Hostname of redis , port is 6379 by default | 13.90.78.6 |
ZIGGURAT_STREAM_OOO_REDIS_TTL | TTL of each record in seconds | 86400 |
For any issues do reach out .
Thanks, Rijul
However it is not always necessary that developers push the change event for the same resource in the same partition and instead choose to generate a unique key for each kafka record . Usecase being to even out the partitions and not have one partition completely blocked if one message for a order is creating a problem .
Won't other messages (not necessarily of the same order Id) be blocked in the partition even if we use a unique key?? Shouldn't we be offloading such poison messages into a DLQ(which ziggurat already offers) according to a timeout. This would ensure the blocking can be scoped according to expected SLA.
Also, what happens if there's a timeout or degradation on redis end (although not common). Will the message be treated as a successfully processed message or not...?
Also, what advantage does native ziggurat support for OOO offer except for DRY.
Also, what happens if there's a timeout or degradation on redis end (although not common). Will the message be treated as a successfully processed message or not...?
We are thinking of having a guarantee that is-stale? will deliver three statuses -> true / false / failed . The third status implies that we couldn't determine the stalesness of the message in which case it is upto user to choose behavior.
Also, what advantage does native ziggurat support for OOO offer except for DRY.
DRY is one of the motivations definitely and a big one . However another one is a usecase that I often see in our org .
A lot of services using ziggurat are written in a way where we treat the kafka messages as a notification-log/ticker , call the api of the same service when we get message to get the information it needs to process the message . We do this even if the message contains everything the actor needs to process the message .
This causes a perf problem where the service first constructs the message and sends it in the kafka event log but the actor calls it again hence forcing it to reconstruct the message by recomputing the same info .
Why do developers do this ?
This is mostly to protect against out of order in case of retrying messages or replaying messages from DLQ . Calling the originating/producer service always ensures that you get latest state of resource. Two solutions to it are :
- There is a way to distinguish messages that are retried/replayed in the mapper function from the messages coming in kafka topic . We can then write code in mapper fxn so that we treat these messages separately ( ignore or call service only for these ). these meaning DLQ messages.
- OR Actor maintains a store of <ID,last_seen_timestamp> and then for messages < last_seen_timestamp message is ignored / service is called. People dont implement this as this means having to integrate with a data base and maintaining state in an otherwise stateless system and what if the ID of the message is order id then you have to purge as well.
The idea is for ziggurat to provide this small middleware so that the only thing devs need to do is create / maintain the infra as well as fill in the configs.
We are thinking of having a guarantee that is-stale? will deliver three statuses -> true / false / failed . The third status implies that we couldn't determine the stalesness of the message in which case it is upto user to choose behavior.
I meant while updating the timestamp in redis. If the set request times out, the timestamp itself in the remote store would be incorrect. And future messages won't know that the redis is technically stale unless we have unlimited retries, which in turn results in mapper time spike and in turn causes lag.
In principle, I think this arises because we are now keeping the state of truth in two places
- The actual source of truth.
- The redis timestamp for is stale feature. And the two places don't have a transactional mechanism.
Which timestamp are we talking about here? Timestamps might not be strictly increasing as well (given event in same millisecond) and subject to clock-skews. Thus could be better to use offset number and the partition key as a combination.
Which timestamp are we talking about here?
Server time
yes it is subject to the problem you are mentioning , also same millisecond message for the same user is a very real possibility . yes .