rust-rdkafka icon indicating copy to clipboard operation
rust-rdkafka copied to clipboard

[Question] split_partition_queue behavior

Open Malefaro opened this issue 5 months ago • 1 comments

Docs about split_partition_queue says that

Note that calling Consumer::assign will deactivate any existing partition queues. You will need to call this method for every partition that should be split after every call to assign.

But I don't see this behavior.

For example in this code snippet:

    use std::sync::Arc;

    use futures::StreamExt;
    use rdkafka::{
        consumer::{Consumer, StreamConsumer},
        ClientConfig, Message, TopicPartitionList,
    };

    #[tokio::test]
    async fn test_split_partition() {
        let config = ClientConfig::new()
            .set("group.id", "tests")
            .set("bootstrap.servers", "0.0.0.0:9092")
            .set("enable.partition.eof", "false")
            .set("session.timeout.ms", "6000")
            .set("enable.auto.commit", "true")
            .set_log_level(rdkafka::config::RDKafkaLogLevel::Debug)
            .clone();

        let client: StreamConsumer = config.create().unwrap();
        let client = Arc::new(client);
        let topic = "some_topic";
        let spq = client.split_partition_queue(topic, 0).unwrap();
        let mut tpl = TopicPartitionList::new();
        tpl.add_partition_offset(topic, 0, rdkafka::Offset::End)
            .expect("failed to add partition");

        client
            .incremental_assign(&tpl)
            .expect("failed to incremental_assign");

        let mut stream = spq.stream();
        println!("waiting msg");
        for i in 0..20 {
            let msg = stream.next().await;
            println!("got msg: {msg:?}");
        }

        tpl.add_partition_offset(
            "other_topic",
            0,
            rdkafka::Offset::End,
        )
        .unwrap();

        client.assign(&tpl).unwrap();

        println!("reading new after assign");
        loop {
            let msg = stream.next().await;
            println!("got msg: {msg:?}");
        }
    }

After assign with "other_topic" (and initial "some_topic") I still receiving new messages from topic (I checked that's definitely new messages that was produced after assign)

Also docs says

Note that there may be buffered messages for the specified partition that will continue to be returned by StreamConsumer::recv. For best results, call split_partition_queue before the first call to StreamConsumer::recv. You must periodically await StreamConsumer::recv, even if no messages are expected, to serve events.

May I lose some messages in StreamPartitionQueue if I call split_partition_queue after first call to StreamConsumer::recv ?

Should I use StreamConsumer::pause while calling split_partition_queue and StreamConsumer::resume after to not lose any messages in created queue?

I need dynamicaly subscribe and unsubscribe from some topics. Different topics has different handlers. If I will spawn background task that always call recv() on StreamConsumer I won't be able to call split_partition_queue before the first call to StreamConsumer::recv.

Initially I wanted to use incremental_assign for subscribing, but I don't know if the note about assign applies to incremental_assign. But I can't reproduce assign behavior, so can not test incremental_assign behavior.

So in short my questions is:

  1. Can I use incremental_assign/incremental_unassign to not recreate all split_partition_queue queues after call?
  2. Is it safe (in term of losing messages in created StreamPartitionQueue) to call split_partition_queue after first call to StreamConsumer::recv (if I have background task that just ignores any messages from StreamConsumer::recv) (my flow is first call split_partition_queue, then incremental_assign)

Malefaro avatar Jan 23 '24 17:01 Malefaro

And I would like to have an example of a case where multiple topics are received as a Stream and rebalanced when using split_partition_queue.

plasticbox avatar Mar 08 '24 04:03 plasticbox