rust-rdkafka
rust-rdkafka copied to clipboard
[Question] split_partition_queue behavior
Docs about split_partition_queue
says that
Note that calling Consumer::assign will deactivate any existing partition queues. You will need to call this method for every partition that should be split after every call to assign.
But I don't see this behavior.
For example in this code snippet:
use std::sync::Arc;
use futures::StreamExt;
use rdkafka::{
consumer::{Consumer, StreamConsumer},
ClientConfig, Message, TopicPartitionList,
};
#[tokio::test]
async fn test_split_partition() {
let config = ClientConfig::new()
.set("group.id", "tests")
.set("bootstrap.servers", "0.0.0.0:9092")
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
.set_log_level(rdkafka::config::RDKafkaLogLevel::Debug)
.clone();
let client: StreamConsumer = config.create().unwrap();
let client = Arc::new(client);
let topic = "some_topic";
let spq = client.split_partition_queue(topic, 0).unwrap();
let mut tpl = TopicPartitionList::new();
tpl.add_partition_offset(topic, 0, rdkafka::Offset::End)
.expect("failed to add partition");
client
.incremental_assign(&tpl)
.expect("failed to incremental_assign");
let mut stream = spq.stream();
println!("waiting msg");
for i in 0..20 {
let msg = stream.next().await;
println!("got msg: {msg:?}");
}
tpl.add_partition_offset(
"other_topic",
0,
rdkafka::Offset::End,
)
.unwrap();
client.assign(&tpl).unwrap();
println!("reading new after assign");
loop {
let msg = stream.next().await;
println!("got msg: {msg:?}");
}
}
After assign
with "other_topic"
(and initial "some_topic"
) I still receiving new messages from topic
(I checked that's definitely new messages that was produced after assign)
Also docs says
Note that there may be buffered messages for the specified partition that will continue to be returned by StreamConsumer::recv. For best results, call split_partition_queue before the first call to StreamConsumer::recv. You must periodically await StreamConsumer::recv, even if no messages are expected, to serve events.
May I lose some messages in StreamPartitionQueue
if I call split_partition_queue
after first call to StreamConsumer::recv
?
Should I use StreamConsumer::pause while calling split_partition_queue
and StreamConsumer::resume after to not lose any messages in created queue?
I need dynamicaly subscribe and unsubscribe from some topics. Different topics has different handlers.
If I will spawn background task that always call recv()
on StreamConsumer
I won't be able to call split_partition_queue before the first call to StreamConsumer::recv
.
Initially I wanted to use incremental_assign
for subscribing, but I don't know if the note about assign
applies to incremental_assign
. But I can't reproduce assign
behavior, so can not test incremental_assign
behavior.
So in short my questions is:
- Can I use
incremental_assign
/incremental_unassign
to not recreate allsplit_partition_queue
queues after call? - Is it safe (in term of losing messages in created
StreamPartitionQueue
) to callsplit_partition_queue
after first call toStreamConsumer::recv
(if I have background task that just ignores any messages fromStreamConsumer::recv
) (my flow is first callsplit_partition_queue
, thenincremental_assign
)
And I would like to have an example of a case where multiple topics are received as a Stream and rebalanced when using split_partition_queue.