rust-rdkafka icon indicating copy to clipboard operation
rust-rdkafka copied to clipboard

Dropping `PartitionQueue` makes partition unavailable

Open Flowneee opened this issue 1 year ago • 0 comments

Problem

After calling split_partition_queue on consumer (both StreamConsumer and BaseConsumer have same problem) when this PartitionQueue is dropped, it is impossible to get retreive messages (maybe unless i call assign manually). Also kafka won't rebalance this partitions.

Environment

OS: Linux fdr12.tcsbank.ru 6.0.8-300.fc37.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 11 15:09:04 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux Rust: rustc 1.66.0 (stable) rdkafka: 0.28.0 rdkafka-sys: 4.3.0+1.9.2

What I was trying to do

I was trying to handle each assigned partition in its own loop. When new message comes from consumer itself, I call split_partition_queue with its topic and partition and tokio::spawn, basically spawning new loop for each "unsplit" partition. And to avoid holding tasks for rebalanced partitions, I stop loop and drop PartitionQueue after some time of inactivity.

Example

use std::{sync::Arc, time::Duration};

use rdkafka::{
    consumer::{
        stream_consumer::StreamPartitionQueue, Consumer, DefaultConsumerContext, StreamConsumer,
    },
    message::OwnedMessage,
    ClientConfig, Message,
};
use tokio::time::Instant;
use tracing::{error, info, instrument};

fn client_config() -> ClientConfig {
    let mut config = ClientConfig::new();
    config.set("bootstrap.servers", "kafka01d.m1.ips.cloud:9092");
    config.set("group.id", "test-group");
    config.set(
        "client.id",
        &format!("test-group.{}@localhost", std::process::id()),
    );
    config.set("log_level", "7");
    config
}

fn build_consumer() -> StreamConsumer {
    let mut cc = client_config();
    cc.set(
        "group.instance.id",
        &format!("test-group.{}", std::process::id()),
    );
    cc.set("max.poll.interval.ms", "10000");
    cc.set("session.timeout.ms", "7000");
    cc.create().expect("Consumer created")
}

#[instrument(skip(partition_queue, initial_message))]
async fn handle_partition_stream(
    initial_message: OwnedMessage,
    topic: String,
    partition: i32,
    partition_queue: StreamPartitionQueue<DefaultConsumerContext>,
) {
    info!(
        offset = initial_message.offset(),
        "Starting partition queue loop"
    );

    log_next_kafka(Ok(initial_message));

    let mut sleep = tokio::time::sleep(Duration::from_secs(10));
    tokio::pin!(sleep);

    loop {
        tokio::select! {
            res = partition_queue.recv() => {
                log_next_kafka(res);
                sleep.as_mut().reset(Instant::now() + Duration::from_secs(10))
            },
            _ = &mut sleep => {
                info!("No messages from partition, stopping handler");
                break;
            }
        }
    }
    drop(partition_queue);
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    let consumer = Arc::new(build_consumer());
    consumer
        .subscribe(&["kafka-test", "kafka-test-2"])
        .expect("subscribed");

    loop {
        match consumer.recv().await {
            Ok(x) => {
                let Some(pq) = consumer.split_partition_queue(x.topic(), x.partition()) else {
                    panic!("Failed to split partition queue");
                };
                let _ = tokio::spawn(handle_partition_stream(
                    x.detach(),
                    x.topic().into(),
                    x.partition(),
                    pq,
                ));
            }
            Err(err) => panic!("Consumer error: {}", err),
        }
    }
}

fn log_next_kafka<T: Message>(res: Result<T, rdkafka::error::KafkaError>) {
    match res {
        Ok(x) => {
            info!(
                "New message:
    topic: {}
    partition: {}
    offset: {}
    payload: {}
",
                x.topic(),
                x.partition(),
                x.offset(),
                x.payload()
                    .map(String::from_utf8_lossy)
                    .unwrap_or("<empty>".into())
            )
        }
        Err(e) => {
            error!("ERROR: {}", e)
        }
    }
}

What I was expecting

After sleep expired and handle_partition_stream is finished, new messages will be received from consumer.recv().

What happend

No more messages received from this partition and it is not being rebalanced.

How to fix

I'm not sure is it bug or normal behavior. If this is normal, then I believe it would be great to make a note about this in docs.

Flowneee avatar Jan 12 '23 14:01 Flowneee