[💡 FEATURE REQUEST]: Support partition ordering
Plugin
Jobs driver
I have an idea!
Partitions in kafka are the key mechanism for guaranteeing message ordering. Some processing of topics are sensitive to this ordering, but unfortunately there are a couple implementation details in Roadrunner that are at odds with this.
Kafka clients push multiple messages into the jobs pipeline prior to the current offset being handled
My recommendation is to push events (per partition) serially into the jobs pipeline. This would be separate from whether the job is ack-ed or nack-ed for two reasons. A client may wish to skip over an event or field events from multiple drivers without blocking progress on the partition and if they wish to halt, they can always pause the pipeline.
Pollers fan out tasks from the pipeline to workers
This is not an issue if the clients push one topic message at a time. I'm noting it since all of num_pollers pipeline_size and pool.num_workers all must be 1 currently to achieve ordering (at the cost of throughput).
My initial though is changing this behaviour is not ideal as fan out is a generally useful characteristic of these pollers.
The priority queue implementation is not stable.
This can be patched with minimal changes and negligible performance overhead. Even with the first gap here in the clients addressed, this could remain problematic if a high throughput topic is being replayed and consuming many messages that are queued with equal priority. FIFO solves this at the priority level, which is a slightly different ask than this CFS enhancement
Worth noting, this enhancement parallels the SQS FIFO support nicely, in particular max_messages_in_flight (rather than prefetch as described in https://github.com/roadrunner-server/roadrunner/issues/1380), but would ideally be at the partition, not topic or pipeline level.
Hey @adamsnoah98 👋🏻 Thank you for the proposal 👍🏻
Yeah, I thought about most of these things. The problem atm is the current architecture of the Jobs plugin+attachable drivers. My initial decision was to unify ack/nack/send/receive/etc mechanisms for the different brokers (e.g. kafka, bolt, amqp implementations (rmq, 0.9.0, 1.0, etc)).
For the simple use cases that worked fine, until I see more complicated scenarios (in PHP world) (which I can't foresee, since I'm not a PHP dev at all 😃).
Second problem here is let's say... PHP nature. We have to use a pool with PHP workers. And we cannot guarantee that worker2 returns the response after worker1. We have scenarios when people use 50-100-200 workers... So if you set max_message_in_flight - there is 0 reason to use RR, since all messages are processed sequentially in that case, and there is no need to overcomplicate the setup by introducing RR into it. Because of that, a priority queue was introduced.
Small remark, pollers do not fan out messages from the pipelines, but from the priority queue. ATM, you're not able to configure the number of pollers, because it is always configured internally based on the number of PHP workers.
The solution might be a worker pool per pipeline. But this is a huge memory waste. Because every worker consumes 100-150+ mb of RAM, 10 workers - minus 1.5Gb. 2 pipelines - minus 3Gb. Thus, I decided to develop a unified pool of workers + priority queue. But that introduces another problem with fairness... So this is very hard to generalize that thing, and now I'm thinking that having a separate (independent) Jobs plugin with all features (configurable) per broker would be at least a better solution. Not a jobs plugin and kafka driver. But full featured, independent kafka, rmq, nats, etc plugins.
Second problem here is let's say... PHP nature. We have to use a pool with PHP workers. And we cannot guarantee that worker2 returns the response after worker1. We have scenarios when people use 50-100-200 workers... So if you set
max_message_in_flight- there is 0 reason to use RR, since all messages are processed sequentially in that case, and there is no need to overcomplicate the setup by introducing RR into it. Because of that, a priority queue was introduced.
Yes, it's generally more useful when ordering is not important. For fct topics we handle this is great to offer fan out on the consumer side. For some cdc topics, the sequential nature is necessary, and ideally it's more convenient to just use RR for both cases, rather than splitting out another strategy with ext-rdkafka or something here.
Small remark, pollers do not fan out messages from the pipelines, but from the priority queue. ATM, you're not able to configure the number of pollers, because it is always configured internally based on the number of PHP workers.
Does this imply the num_poller configuration option here is ignored? It does make intuitive sense that these numbers would match for the plugin 👍
The solution might be a worker pool per pipeline. But this is a huge memory waste. Because every worker consumes 100-150+ mb of RAM, 10 workers - minus 1.5Gb. 2 pipelines - minus 3Gb. Thus, I decided to develop a unified pool of workers + priority queue. But that introduces another problem with fairness... So this is very hard to generalize that thing, and now I'm thinking that having a separate (independent) Jobs plugin with all features (configurable) per broker would be at least a better solution. Not a jobs plugin and kafka driver. But full featured, independent kafka, rmq, nats, etc plugins.
Yes, I see the coupling/cohesion difficulty here. For this particular request, I believe we can stay out of the jobs package and still service the request by focusing on the first and third points:
- Update the PQ implementation to be stable.
-
Enhance the consumers configurability such that it can be set to wait for response per topic/partition before further pushing that partition to the pipeline. This allows the driver to extend the ordering guarantee that kafka's partitions promise, Maintains the ability for the pipeline in a global scope to continue fan outs, and isolates the change to the driver.
Are you open to either of those changes in their respective packages? I'm happy to submit PR's for either
Update the PQ implementation to be stable.
I'm still not sure why you named it not stable. PQ algorithm implemented based on the binary heaps paper. It does what it is supposed to do: sort the messages based on their priority. A completely different question is - should we apply it to the Kafka driver or not? And specifically for that, I've mentioned that my idea is not to introduce more and more crunches, but reimplement the whole concept from the architectural standpoint. Let drivers be more broker specific, w/o generalization. FIFO is not the answer here, since you can't guarantee that everything will be in order on the workers' side. In the SQS driver, you may achieve a true FIFO by limiting the number of in-flight messages, but not in the Kafka driver.
Enhance the consumers configurability such that it can be set to wait for response per topic/partition before further pushing that partition to the pipeline. This allows the driver to extend the ordering guarantee that kafka's partitions promise, Maintains the ability for the pipeline in a global scope to continue fan outs, and isolates the change to the driver. Are you open to either of those changes in their respective packages? I'm happy to submit PR's for either
Yeah, sure, would be great if you'd be able to send me a PR, so we can discuss your actual idea in code 👍🏻
I'm still not sure why you named it not stable. PQ algorithm implemented based on the binary heaps paper. It does what it is supposed to do: sort the messages based on their priority.
I mean stable in the context of a stable sort - that elements of the same priority are popped in the order they were inserted. This is only a slightly stricter guarantee that the current PQ provides and is I think a good enhancement beyond any potential applicability to the Kafka driver. This is another type of fairness under back pressure, beyond that expressed in the CFS issue I linked, and is agnostic of the driver(s) at play.
Yeah, sure, would be great if you'd be able to send me a PR, so we can discuss your actual idea in code 👍🏻
I'll try to prepare something this week to orient a discussion around. Hopefully this also brings a bit of context to my FIFI comment... It's similar to the SQS in-flight, but not identical due to the nature of how kafka clients poll for multiple topics and partitions within.
I mean stable in the context of a stable sort - that elements of the same priority are popped in the order they were inserted.
Yeah, while PQ guarantees that you won't get a priority 3 before priority 2, items inside priority 2 are not ordered (by when they arrived). But this does not matter in the architecture with the PHP workers pool, where you don't have any order on the pool side. So even if you sort the messages or implement some kind of FIFO, you'd still have an unordered distribution of the messages for the workers (worker1 gets message5 for example). You can solve that only if you have 1 worker, and PQ size set to 1.
Yeah, sure, would be great if you'd be able to send me a PR, so we can discuss your actual idea in code 👍🏻
Here's the (draft) for kafka changes https://github.com/roadrunner-server/kafka/pull/523