quickwit
quickwit copied to clipboard
Add support for kafka indexing with consumer groups and exactly-once semantics
As suggested by @kstaken in #1625
omething else that could be considered here is that it should be possible to use the existing Kafka Consumer group logic without storing the offsets in Kafka. This will allow the usage of the group and rebalancing logic that already exists but you simply never call commit on the consumer it self.
For this to work and maintain exactly-once semantics you have to hook the rebalance events on the consumer and correctly handle the dynamic partition assignments when a rebalance occurs. I believe the rust rdkafka lib exposes everything required for this.
The approach is something like this.
Create a consumer in a group for the source with auto commit and offset storage disabled.
Add hooks for the rebalance events
Subscribe to the topic
Rebalance events will provide partition assignments
use the metastore offset to seek assigned partitions to the latest offset.
Start reading using the standard Kafka APIs
At each checkpoint store the offsets in the metastore
If a rebalance occurs while split creation is in progress and partition assignments change -> the in progress split will have to be invalidated and restarted on the latest assignments.
In a correctly functioning system this should be very rare
This should allow running N ingest nodes with exactly-once semantics, HA and dynamic scalability without having to implement the clustering manually.
In the metastore it will require a separate table for the offsets with 1 row per partition and the following fields.
source
topic
partition
offset
Also https://github.com/kstaken/qw-kafka-test
Assigned to @guilload...
From offline discussion with Kimbro: Kafka can be configured to only trigger the rebalance stuff on the affected nodes/partition, and wait for rebalance hooks to terminate on all targetted nodes.
We of course still need a neat solution to prevent splits going through the pipeline to NOT be published in order to observe exactly once. We cannot use our traditional kill the pipeline approach as it would trigger a rebalance.
My 50c:
We probably want to have the equivalent of the killswitch on the split itself. We could then mark a generation of splits as obsolete, and abort upload / packaging etc. as soon as possible... without killing the pipeline. (The killswitch struct could be reused as is, maybe with a rename to avoid the confusion)
We could then push the split generation obsolescence switch on the pre-rebalance hook, and get the offset on the post rebalance obsolescence switch for instance. (getting the offset cannot be done on the pre-rebalance, because we do not have a any guarantee that a peer won't publish after that)
We still need to take in account the case where one consumer is disconnected from the cluster and does not do the rebalance dance and keeps publishing splits... The behavior willl still be correct, but that will trigger a pipeline failure on the nodes with affected partititions.
It would be nice to make sure we detect the fact that we are disconnected from kafka and kill the pipeline at that point, to minimize that risk.
@kstaken @guilload What solutions did you have in mind? (don't be afraid to be descriptive :) )
Something that I had added in the metastore checkpoint table schema that I backed out of #1798 was a per partition serial number.
The idea was that as soon as a rebalance was triggered the serial number would be bumped on the new partitions assigned to the node. Then as a last guard, any node that had splits in flight would include the current serial number for all partitions in the update to the checkpoint and the transaction would then fail if any of them had changed.
So there would need to be two levels of protection here.
- A killswitch to immediately stop any inflight splits on the node that is receiving new partition assignments
- The serial number based commit guard to protect against publication from nodes that may have had partitions move away but for some reason did not get their splits invalidated.
The serial number could also be implemented in the JSON index config.
I was looking at this on a per partition basis in order to avoid having to invalidate splits on nodes where no changes to partition assignments actually happen on rebalance. It will also be a simple optimization to kafka_source
to not invalidate splits if the partition assignments do not actually change for that node.
I haven't dug into the actor stuff enough to understand how the killswitch might work. @guilload do you have a clear picture of what that API will look like?