rust-rdkafka
rust-rdkafka copied to clipboard
Getting the error: KafkaError (Seek error: Local: Unknown partition)' when trying to seek
Hi,
I am new to kafka on rust.
I used kafka-consumer-groups.sh
command and discovered for the group id gp
the offsets for all partitions are above 100
. Now, using rdkafka
i am trying to create a consumer
, set the offsets to 100
for all the partitions of this topic. I get the error.
Please note that only my test case uses this consumer group gp
and only 1 consumer was running during the test.
Kindly correct my code / give me pointers on the fix.
running 1 test
thread 'utils::utils::test::test_offset_assignment' panicked at 'called `Result::unwrap()` on an `Err` value: KafkaError (Seek error: Local: Unknown partition)', src/utils/utils.rs:244:18
stack backtrace:
0: rust_begin_unwind
at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/std/src/panicking.rs:575:5
1: core::panicking::panic_fmt
at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/panicking.rs:64:14
2: core::result::unwrap_failed
at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/result.rs:1791:5
3: core::result::Result<T,E>::unwrap
at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/result.rs:1113:23
4: rust_fair_scheduler::utils::utils::KafkaSt::get_consumer_from_cache
at ./src/utils/utils.rs:237:13
5: rust_fair_scheduler::utils::utils::test::test_offset_assignment::{{closure}}
at ./src/utils/utils.rs:255:24
6: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/future/future.rs:124:9
7: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/future/future.rs:124:9
8: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}::{{closure}}
at /Users/ravichandra.chitrapu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.26.0/src/runtime/scheduler/current_thread.rs:541:57
9: tokio::runtime::coop::with_budget
code
mod test {
use super::*;
#[tokio::test]
pub async fn test_offset_assignment() {
let consumer = KafkaSt::get_consumer(
"kafka04.aws.company.local:9092".to_string(),
"fast_topic".to_string(),
"gp".to_string(),
vec![100, 100, 100, 100, 100, 100],
);
let data = consumer.recv().await.unwrap();
print!("data ::: {:?}", data);
}
}
The above code calls the below code. The topic has 6 partitions. I want to set all partition offsets to 100.
impl ClientContext for CustomContext2 {}
pub fn get_consumer(
boot_strap_servers: String,
topic: String,
group_id: String,
partition_offsets: Vec<i64>,
) -> StreamConsumer<CustomContext2>{
let context = CustomContext2 {
topic: topic.clone(),
partition_offsets: partition_offsets.clone(),
};
let consumer: StreamConsumer<CustomContext2> = ClientConfig::new()
.set("group.id", group_id.to_owned())
.set("bootstrap.servers", boot_strap_servers.to_owned())
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "2000")
.set("enable.auto.commit", "true")
.set_log_level(RDKafkaLogLevel::Debug)
.create_with_context(context)
.expect("kafka consumer creation has failed");
for (i, offset) in partition_offsets.iter().enumerate() {
consumer
.seek(
&topic.clone(),
i as i32,
Offset::from_raw(*offset),
Timeout::After(Duration::from_secs(5)),
)
.unwrap();
}
return consumer;
}
The code is similar to the once presented here: https://github.com/fede1024/rust-rdkafka/blob/2a81c86359efaf901dc28c2c91c81b6134b30337/tests/test_low_consumers.rs#L19. I am not sure then why is consumer.seek
not working.
Any help please?
You've to subscribe to the topic first.