rust-rdkafka icon indicating copy to clipboard operation
rust-rdkafka copied to clipboard

unable to read messages in subscribe/ assign

Open ravieze opened this issue 1 year ago • 5 comments

Hi,

Below code is copy pasted from the test cases of this project. Either the consumer is attached to kafka as subscriber or assigned (in both cases auto.offset.reset is set to earliest) on consumer.poll() ; or consumer.take() i don't get any message.

Please note that the grouped is a new one; and once subscribed happened i see that the groupid moved its offsets on the kafka broker; however i don't get them on to the client side at all.


fn create_consumer(
    config_overrides: Option<HashMap<&str, &str>>,
) -> Result<BaseConsumer, KafkaError> {
    consumer_config("gp3", config_overrides).create()
}
pub fn consumer_config(
    group_id: &str,
    config_overrides: Option<HashMap<&str, &str>>,
) -> ClientConfig {
    let mut config = ClientConfig::new();

    config.set("group.id", group_id);
    config.set("client.id", "rdkafka");
    config.set("bootstrap.servers", "qa-company.aws.phenom.local:9092");
    config.set("enable.partition.eof", "false");
    config.set("session.timeout.ms", "6000");
    config.set("enable.auto.commit", "false");
    config.set("statistics.interval.ms", "5000");
    config.set("api.version.request", "true");
    config.set("debug", "all");
    config.set("auto.offset.reset", "earliest");

    if let Some(overrides) = config_overrides {
        for (key, value) in overrides {
            config.set(key, value);
        }
    }
    config
}

test case below

  #[tokio::test]
    pub async fn test_producer(){
        let consumer = create_base_consumer("gp3", None);
        let mut tpl = TopicPartitionList::new();
        tpl.add_partition_offset("fast_topic",0, Offset::Beginning).unwrap();
        tpl.add_partition_offset("fast_topic",1, Offset::Beginning).unwrap();
        tpl.add_partition_offset("fast_topic",2, Offset::Beginning).unwrap();
        tpl.add_partition_offset("fast_topic",3, Offset::Beginning).unwrap();
        tpl.add_partition_offset("fast_topic",4, Offset::Beginning).unwrap();
        tpl.add_partition_offset("fast_topic",5, Offset::Beginning).unwrap();
        consumer.assign(&tpl).unwrap();
        
        let msgs = consumer.poll(Timeout::After(Duration::from_secs(4))).unwrap().unwrap();
        println!("msg is:: {:?} ",msgs.payload().unwrap());
        // for (i, message) in consumer.iter().take(3).enumerate() {
        //     match message {
        //         Ok(message) => {
        //             print!("msg is: {}",message.payload_view::<str>().unwrap().unwrap()  )
        //         },
        //         Err(e) => panic!("Error receiving message: {:?}", e),
        //     }
        // }
    }

The above throws error

stack backtrace:
   0: rust_begin_unwind
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/std/src/panicking.rs:575:5
   1: core::panicking::panic_fmt
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/panicking.rs:64:14
   2: core::panicking::panic
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/panicking.rs:111:5
   3: core::option::Option<T>::unwrap
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/option.rs:778:21
   4: rust_fair_scheduler::utils::utils::test::test_producer::{{closure}}
             at ./src/utils/utils.rs:329:20
   5: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/future/future.rs:124:9
   6: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/future/future.rs:124:9

ravieze avatar Apr 05 '23 08:04 ravieze

Hi. i have the same issue. found any work around? i posted it in this stack overflow question

miladamery avatar Aug 06 '23 08:08 miladamery

Finally this is the working code. I am trying to stitch code from multiple places in the project.


pub fn create_consumer(
    broker_url: &str,
    group_id: &str,
    config_overrides: Option<HashMap<&str, &str>>,
) -> BaseConsumer<ConsumerTestContext> {
    consumer_config(broker_url, group_id, config_overrides)
        .create_with_context(ConsumerTestContext {})
        .expect("Consumer creation failed")
}

pub struct ConsumerTestContext {}

impl ClientContext for ConsumerTestContext {
    fn stats(&self, stats: Statistics) {
        let stats_str = format!("{:?}", stats);
        // println!("Stats received: {} bytes", stats_str.len());
    }
}

impl ConsumerContext for ConsumerTestContext {
    // fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {
    //     println!("Committing offsets: {:?}", result);
    // }
}

now reading from the topic

        /* get consumer;  start from the offsets mentioned in partition_offsets; or from Beginning */
        let mut tpl = TopicPartitionList::new();
        for i in 0..config.max_partitions {
            let mut offset = rdkafka::Offset::Beginning;
            if partition_offsets.contains_key(&i) {
                /* if offsets are not available start from Beginning */
                let offset_num = *partition_offsets.get(&i).unwrap();
                offset = rdkafka::Offset::Offset(offset_num);
            }
            tpl.add_partition_offset(&config.topic, i as i32, offset)
                .unwrap();
        }
        kafka_consumer
            .assign(&tpl)
            .expect("couldnt assign consumer to topic");
        /* done: got consumer assigned to topic, got partition_offsets */

        loop {
            let polled_msg = kafka_consumer.poll(Duration::from_millis(config.poll_time_ms));
            if polled_msg.is_none() {
                break;
            }
       }

the above should work.

chandu-1101 avatar Aug 06 '23 12:08 chandu-1101

@chandu-1101 Thanks . this solutions works. but assign is different from subscribe in terms of functionality. I'm super new to rust and can't do this heavy debugging, reading codes etc and finding solutions. do you know a solution that works with subscribe?

miladamery avatar Aug 06 '23 22:08 miladamery

You need to go through these: https://github.com/fede1024/rust-rdkafka/tree/master/tests and figure out yourself. It requires several experiments to get something working.

chandu-1101 avatar Aug 07 '23 04:08 chandu-1101

Sorry, i didn't realise you were asking about subscribe here is the code that worked for me.

        let consumer: StreamConsumer = ClientConfig::new()
            .set("group.id", "gp")
            .set("bootstrap.servers", "localhost:9092")
            .set("session.timeout.ms", "6000")
            .set("max.poll.interval.ms", "6000")
            .set("enable.auto.commit", "true")
            .set("auto.offset.reset", "earliest")
            .create()
            .expect("unable to create stream consumer ");
        consumer
            .subscribe(&["d_tmain"])
            .expect("couldnt subscribe to the topic ");
        for i in 0..9 {
            let msg = consumer.recv().await.unwrap();
            println!(
                "message 1st pass:::: i{} ... {:?}",
                i,
                std::str::from_utf8(msg.payload().unwrap())
            )
        }

chandu-1101 avatar Aug 07 '23 04:08 chandu-1101