quickwit icon indicating copy to clipboard operation
quickwit copied to clipboard

Offer a way to select a subset of kafka partition in a kafka source

Open fulmicoton opened this issue 3 years ago • 16 comments

The objective would be to allow having a larger indexing throughput by running K indexing pipelines for a single index.

The selector coudl be k % N, or a list of partitions maybe?

fulmicoton avatar Jun 09 '22 02:06 fulmicoton

Why don't we just remove the manually partition assignment? Set group.id with the same ID across multiple indexer, thus consumergroup bases partition assignment can do the same things for us

mxlxm avatar Jun 13 '22 01:06 mxlxm

Good question. I think it might be surprisign that we do not rely on consumer groups at all today, and we do not rely on kafka to store checkpoint information.

There are two reasons for this: The first one is that we want to handle more sources than just kafka, and some of these source do not offer the concept of consumer group. Kinesis for instance... But also files etc.

The second reason -which is more about why we don't use kafka transaction and have kafka deal with checkpoints- is that we want to offer exactly-once semantics atomically and for the entire pipeline up to publishing.

Publishing a split requires interacting with a metastore (today write a file on S3 or postgresql). Building exactly once processing by creating a contraption that atomically publishing AND commits a kafka transaction together is much more complex than what is done today.

fulmicoton avatar Jun 13 '22 02:06 fulmicoton

The first one is that we want to handle more sources than just kafka, and some of these source do not offer the concept of consumer group. Kinesis for instance... But also files etc.

That's great, but every source has it's own pros, we should take the advantage of them while we intergrate them, other than fallback to their version 1.0, WDYT?

The second reason -which is more about why we don't use kafka transaction and have kafka deal with checkpoints- is that we want to offer exactly-once semantics atomically and for the entire pipeline up to publishing.

Publishing a split requires interacting with a metastore (today write a file on S3 or postgresql). Building exactly once processing by creating a contraption that atomically publishing AND commits a kafka transaction together is much more complex than what is done today.

IMO, msg consumed from source and acked for the source, then source has done it's duty. the exactly-once semantics should take care within the pipeline itself or at split level, the source shouldn't be accessed anymore.

mxlxm avatar Jun 13 '22 03:06 mxlxm

That's great, but every source has it's own pros, we should take the advantage of them while we intergrate them, other than fallback to their version 1.0, WDYT?

This would be a more interesting debate if there was more benefits to using kafka consumer group in Quickwit.

@mxlxm IMO, msg consumed from source and acked for the source, then source has done it's duty. the exactly-once semantics should take care within the pipeline itself or at split level, the source shouldn't be accessed anymore.

Anything can happen between the moment when the source delivered its message and a split was actually published. From the quickwit user point of view, the solution you are describing is "at most once" delivery, which is rarely what people want (usually people want at least once)

fulmicoton avatar Jun 13 '22 08:06 fulmicoton

The selector coudl be k % N

looks good enough to select a subset of kafka partition.

Publishing a split requires interacting with a metastore (today write a file on S3 or postgresql).

how do we plan to maintain the partation:offset cross the K indexing pipelines? store the information in metastore?

sunisdown avatar Jun 24 '22 09:06 sunisdown

It is stored in the metastore.

fulmicoton avatar Jun 24 '22 09:06 fulmicoton

The selector coudl be k % N

can we use something like consistent hash? If we use k % N, when there's some infra changes like remove/add in indexer node in the cluster, there will be a lot partition will be relocated to other nodes.

sunisdown avatar Jun 24 '22 09:06 sunisdown

Something else that could be considered here is that it should be possible to use the existing Kafka Consumer group logic without storing the offsets in Kafka. This will allow the usage of the group and rebalancing logic that already exists but you simply never call commit on the consumer it self.

For this to work and maintain exactly-once semantics you have to hook the rebalance events on the consumer and correctly handle the dynamic partition assignments when a rebalance occurs. I believe the rust rdkafka lib exposes everything required for this.

The approach is something like this.

  • Create a consumer in a group for the source with auto commit and offset storage disabled.
  • Add hooks for the rebalance events
  • Subscribe to the topic
  • Rebalance events will provide partition assignments
    • use the metastore offset to seek assigned partitions to the latest offset.
  • Start reading using the standard Kafka APIs
  • At each checkpoint store the offsets in the metastore
  • If a rebalance occurs while split creation is in progress and partition assignments change -> the in progress split will have to be invalidated and restarted on the latest assignments.
    • In a correctly functioning system this should be very rare

This should allow running N ingest nodes with exactly-once semantics, HA and dynamic scalability without having to implement the clustering manually.

In the metastore it will require a separate table for the offsets with 1 row per partition and the following fields.

  • source
  • topic
  • partition
  • offset

kstaken avatar Jul 02 '22 00:07 kstaken

I've created a prototype for the concept that I describe above: https://github.com/kstaken/qw-kafka-test

It's actually very simple but one question I have is, in QuickWit if you wanted to abort the in progress split what would be the correct way to do that? Is this what ActorContext.kill_switch is for?

kstaken avatar Jul 13 '22 22:07 kstaken

The best is to return an anyhow::Error, the framework will kill the other actor for you and log properly

fulmicoton avatar Jul 13 '22 23:07 fulmicoton

@guilload is that in your radar?

fulmicoton avatar Jul 18 '22 01:07 fulmicoton

Yes, I'm keeping an eye on this. I want to complete my ongoing work on k8s and retention before.

guilload avatar Jul 18 '22 16:07 guilload

I've started looking at integrating this. I think I have it mostly mapped out but we'll see how far I get.

kstaken avatar Jul 18 '22 16:07 kstaken

If a rebalance occurs while split creation is in progress and partition assignments change -> the in progress split will have to be invalidated and restarted on the latest assignments. In a correctly functioning system this should be very rare

just curious how could quickwit handle this condition? how did the indexing pipeline know there's an assignment change on the partition? Pass the change message to pipeline? then let actors monitor the change message, if the partition is reassigned, then return a anyhow::Error? @kstaken do you have any idea?

sunisdown avatar Jul 21 '22 06:07 sunisdown

I'll most likely use a messaging channel between the rebalance callback and the kafka actor.

kstaken avatar Jul 21 '22 16:07 kstaken

I created a specific ticket for your solution @kstaken

fulmicoton avatar Jul 25 '22 08:07 fulmicoton

I am closing this generic issue in favor of more specific ones.

guilload avatar Sep 09 '22 11:09 guilload