rust-rdkafka icon indicating copy to clipboard operation
rust-rdkafka copied to clipboard

Getting the error: KafkaError (Seek error: Local: Unknown partition)' when trying to seek

Open ravieze opened this issue 1 year ago • 1 comments

Hi,

I am new to kafka on rust.

I used kafka-consumer-groups.sh command and discovered for the group id gp the offsets for all partitions are above 100. Now, using rdkafka i am trying to create a consumer, set the offsets to 100 for all the partitions of this topic. I get the error.

Please note that only my test case uses this consumer group gp and only 1 consumer was running during the test.

Kindly correct my code / give me pointers on the fix.

running 1 test
thread 'utils::utils::test::test_offset_assignment' panicked at 'called `Result::unwrap()` on an `Err` value: KafkaError (Seek error: Local: Unknown partition)', src/utils/utils.rs:244:18
stack backtrace:
   0: rust_begin_unwind
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/std/src/panicking.rs:575:5
   1: core::panicking::panic_fmt
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/panicking.rs:64:14
   2: core::result::unwrap_failed
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/result.rs:1791:5
   3: core::result::Result<T,E>::unwrap
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/result.rs:1113:23
   4: rust_fair_scheduler::utils::utils::KafkaSt::get_consumer_from_cache
             at ./src/utils/utils.rs:237:13
   5: rust_fair_scheduler::utils::utils::test::test_offset_assignment::{{closure}}
             at ./src/utils/utils.rs:255:24
   6: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/future/future.rs:124:9
   7: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/future/future.rs:124:9
   8: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}::{{closure}}
             at /Users/ravichandra.chitrapu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:541:57
   9: tokio::runtime::coop::with_budget

code


mod test {
    use super::*;

     #[tokio::test]
    pub async fn test_offset_assignment() {
        let consumer = KafkaSt::get_consumer(
            "kafka04.aws.company.local:9092".to_string(),
            "fast_topic".to_string(),
            "gp".to_string(),
            vec![100, 100, 100, 100, 100, 100],
        );
        let data = consumer.recv().await.unwrap();
        print!("data ::: {:?}", data);
    }
}

The above code calls the below code. The topic has 6 partitions. I want to set all partition offsets to 100.

impl ClientContext for CustomContext2 {}
    pub fn get_consumer(
        boot_strap_servers: String,
        topic: String,
        group_id: String,
        partition_offsets: Vec<i64>,
    ) -> StreamConsumer<CustomContext2>{
        let context = CustomContext2 {
            topic: topic.clone(),
            partition_offsets: partition_offsets.clone(),
        };
        let consumer: StreamConsumer<CustomContext2> = ClientConfig::new()
            .set("group.id", group_id.to_owned())
            .set("bootstrap.servers", boot_strap_servers.to_owned())
            .set("enable.partition.eof", "false")
            .set("session.timeout.ms", "2000")
            .set("enable.auto.commit", "true")
            .set_log_level(RDKafkaLogLevel::Debug)
            .create_with_context(context)
            .expect("kafka consumer creation has failed");
       
        for (i, offset) in partition_offsets.iter().enumerate() {
            consumer
                .seek(
                    &topic.clone(),
                    i as i32,
                    Offset::from_raw(*offset),
                    Timeout::After(Duration::from_secs(5)),
                )
                .unwrap();
        }
        return consumer;
    }

The code is similar to the once presented here: https://github.com/fede1024/rust-rdkafka/blob/2a81c86359efaf901dc28c2c91c81b6134b30337/tests/test_low_consumers.rs#L19. I am not sure then why is consumer.seek not working.

Any help please?

ravieze avatar Apr 04 '23 13:04 ravieze

You've to subscribe to the topic first.

ladislavmacoun avatar Apr 13 '23 11:04 ladislavmacoun