rust-rdkafka
rust-rdkafka copied to clipboard
Dropping `PartitionQueue` makes partition unavailable
Problem
After calling split_partition_queue on consumer (both StreamConsumer
and BaseConsumer
have same problem) when this PartitionQueue
is dropped, it is impossible to get retreive messages (maybe unless i call assign
manually). Also kafka won't rebalance this partitions.
Environment
OS: Linux fdr12.tcsbank.ru 6.0.8-300.fc37.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 11 15:09:04 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux Rust: rustc 1.66.0 (stable) rdkafka: 0.28.0 rdkafka-sys: 4.3.0+1.9.2
What I was trying to do
I was trying to handle each assigned partition in its own loop. When new message comes from consumer itself, I call split_partition_queue
with its topic and partition and tokio::spawn
, basically spawning new loop for each "unsplit" partition. And to avoid holding tasks for rebalanced partitions, I stop loop and drop PartitionQueue
after some time of inactivity.
Example
use std::{sync::Arc, time::Duration};
use rdkafka::{
consumer::{
stream_consumer::StreamPartitionQueue, Consumer, DefaultConsumerContext, StreamConsumer,
},
message::OwnedMessage,
ClientConfig, Message,
};
use tokio::time::Instant;
use tracing::{error, info, instrument};
fn client_config() -> ClientConfig {
let mut config = ClientConfig::new();
config.set("bootstrap.servers", "kafka01d.m1.ips.cloud:9092");
config.set("group.id", "test-group");
config.set(
"client.id",
&format!("test-group.{}@localhost", std::process::id()),
);
config.set("log_level", "7");
config
}
fn build_consumer() -> StreamConsumer {
let mut cc = client_config();
cc.set(
"group.instance.id",
&format!("test-group.{}", std::process::id()),
);
cc.set("max.poll.interval.ms", "10000");
cc.set("session.timeout.ms", "7000");
cc.create().expect("Consumer created")
}
#[instrument(skip(partition_queue, initial_message))]
async fn handle_partition_stream(
initial_message: OwnedMessage,
topic: String,
partition: i32,
partition_queue: StreamPartitionQueue<DefaultConsumerContext>,
) {
info!(
offset = initial_message.offset(),
"Starting partition queue loop"
);
log_next_kafka(Ok(initial_message));
let mut sleep = tokio::time::sleep(Duration::from_secs(10));
tokio::pin!(sleep);
loop {
tokio::select! {
res = partition_queue.recv() => {
log_next_kafka(res);
sleep.as_mut().reset(Instant::now() + Duration::from_secs(10))
},
_ = &mut sleep => {
info!("No messages from partition, stopping handler");
break;
}
}
}
drop(partition_queue);
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let consumer = Arc::new(build_consumer());
consumer
.subscribe(&["kafka-test", "kafka-test-2"])
.expect("subscribed");
loop {
match consumer.recv().await {
Ok(x) => {
let Some(pq) = consumer.split_partition_queue(x.topic(), x.partition()) else {
panic!("Failed to split partition queue");
};
let _ = tokio::spawn(handle_partition_stream(
x.detach(),
x.topic().into(),
x.partition(),
pq,
));
}
Err(err) => panic!("Consumer error: {}", err),
}
}
}
fn log_next_kafka<T: Message>(res: Result<T, rdkafka::error::KafkaError>) {
match res {
Ok(x) => {
info!(
"New message:
topic: {}
partition: {}
offset: {}
payload: {}
",
x.topic(),
x.partition(),
x.offset(),
x.payload()
.map(String::from_utf8_lossy)
.unwrap_or("<empty>".into())
)
}
Err(e) => {
error!("ERROR: {}", e)
}
}
}
What I was expecting
After sleep
expired and handle_partition_stream
is finished, new messages will be received from consumer.recv()
.
What happend
No more messages received from this partition and it is not being rebalanced.
How to fix
I'm not sure is it bug or normal behavior. If this is normal, then I believe it would be great to make a note about this in docs.