quickwit
quickwit copied to clipboard
Offer a way to select a subset of kafka partition in a kafka source
The objective would be to allow having a larger indexing throughput by running K indexing pipelines for a single index.
The selector coudl be k % N, or a list of partitions maybe?
Why don't we just remove the manually partition assignment? Set group.id with the same ID across multiple indexer, thus consumergroup bases partition assignment can do the same things for us
Good question. I think it might be surprisign that we do not rely on consumer groups at all today, and we do not rely on kafka to store checkpoint information.
There are two reasons for this: The first one is that we want to handle more sources than just kafka, and some of these source do not offer the concept of consumer group. Kinesis for instance... But also files etc.
The second reason -which is more about why we don't use kafka transaction and have kafka deal with checkpoints- is that we want to offer exactly-once semantics atomically and for the entire pipeline up to publishing.
Publishing a split requires interacting with a metastore (today write a file on S3 or postgresql). Building exactly once processing by creating a contraption that atomically publishing AND commits a kafka transaction together is much more complex than what is done today.
The first one is that we want to handle more sources than just kafka, and some of these source do not offer the concept of consumer group. Kinesis for instance... But also files etc.
That's great, but every source has it's own pros, we should take the advantage of them while we intergrate them, other than fallback to their version 1.0, WDYT?
The second reason -which is more about why we don't use kafka transaction and have kafka deal with checkpoints- is that we want to offer exactly-once semantics atomically and for the entire pipeline up to publishing.
Publishing a split requires interacting with a metastore (today write a file on S3 or postgresql). Building exactly once processing by creating a contraption that atomically publishing AND commits a kafka transaction together is much more complex than what is done today.
IMO, msg consumed from source and acked for the source, then source has done it's duty. the exactly-once semantics should take care within the pipeline itself or at split level, the source shouldn't be accessed anymore.
That's great, but every source has it's own pros, we should take the advantage of them while we intergrate them, other than fallback to their version 1.0, WDYT?
This would be a more interesting debate if there was more benefits to using kafka consumer group in Quickwit.
@mxlxm IMO, msg consumed from source and acked for the source, then source has done it's duty. the exactly-once semantics should take care within the pipeline itself or at split level, the source shouldn't be accessed anymore.
Anything can happen between the moment when the source delivered its message and a split was actually published. From the quickwit user point of view, the solution you are describing is "at most once" delivery, which is rarely what people want (usually people want at least once)
The selector coudl be k % N
looks good enough to select a subset of kafka partition.
Publishing a split requires interacting with a metastore (today write a file on S3 or postgresql).
how do we plan to maintain the partation:offset cross the K indexing pipelines? store the information in metastore?
It is stored in the metastore.
The selector coudl be k % N
can we use something like consistent hash? If we use k % N, when there's some infra changes like remove/add in indexer node in the cluster, there will be a lot partition will be relocated to other nodes.
Something else that could be considered here is that it should be possible to use the existing Kafka Consumer group logic without storing the offsets in Kafka. This will allow the usage of the group and rebalancing logic that already exists but you simply never call commit on the consumer it self.
For this to work and maintain exactly-once semantics you have to hook the rebalance events on the consumer and correctly handle the dynamic partition assignments when a rebalance occurs. I believe the rust rdkafka lib exposes everything required for this.
The approach is something like this.
- Create a consumer in a group for the
sourcewith auto commit and offset storage disabled. - Add hooks for the rebalance events
- Subscribe to the topic
- Rebalance events will provide partition assignments
- use the metastore offset to seek assigned partitions to the latest offset.
- Start reading using the standard Kafka APIs
- At each checkpoint store the offsets in the metastore
- If a rebalance occurs while split creation is in progress and partition assignments change -> the in progress split will have to be invalidated and restarted on the latest assignments.
- In a correctly functioning system this should be very rare
This should allow running N ingest nodes with exactly-once semantics, HA and dynamic scalability without having to implement the clustering manually.
In the metastore it will require a separate table for the offsets with 1 row per partition and the following fields.
- source
- topic
- partition
- offset
I've created a prototype for the concept that I describe above: https://github.com/kstaken/qw-kafka-test
It's actually very simple but one question I have is, in QuickWit if you wanted to abort the in progress split what would be the correct way to do that? Is this what ActorContext.kill_switch is for?
The best is to return an anyhow::Error, the framework will kill the other actor for you and log properly
@guilload is that in your radar?
Yes, I'm keeping an eye on this. I want to complete my ongoing work on k8s and retention before.
I've started looking at integrating this. I think I have it mostly mapped out but we'll see how far I get.
If a rebalance occurs while split creation is in progress and partition assignments change -> the in progress split will have to be invalidated and restarted on the latest assignments. In a correctly functioning system this should be very rare
just curious how could quickwit handle this condition? how did the indexing pipeline know there's an assignment change on the partition? Pass the change message to pipeline? then let actors monitor the change message, if the partition is reassigned, then return a anyhow::Error? @kstaken do you have any idea?
I'll most likely use a messaging channel between the rebalance callback and the kafka actor.
I created a specific ticket for your solution @kstaken
I am closing this generic issue in favor of more specific ones.