rust-rdkafka
rust-rdkafka copied to clipboard
unable to read messages in subscribe/ assign
Hi,
Below code is copy pasted from the test cases of this project. Either the consumer is attached to kafka as subscriber
or assigned
(in both cases auto.offset.reset
is set to earliest
) on consumer.poll() ;
or consumer.take()
i don't get any message.
Please note that the grouped is a new one; and once subscribed
happened i see that the groupid moved its offsets on the kafka broker; however i don't get them on to the client side at all.
fn create_consumer(
config_overrides: Option<HashMap<&str, &str>>,
) -> Result<BaseConsumer, KafkaError> {
consumer_config("gp3", config_overrides).create()
}
pub fn consumer_config(
group_id: &str,
config_overrides: Option<HashMap<&str, &str>>,
) -> ClientConfig {
let mut config = ClientConfig::new();
config.set("group.id", group_id);
config.set("client.id", "rdkafka");
config.set("bootstrap.servers", "qa-company.aws.phenom.local:9092");
config.set("enable.partition.eof", "false");
config.set("session.timeout.ms", "6000");
config.set("enable.auto.commit", "false");
config.set("statistics.interval.ms", "5000");
config.set("api.version.request", "true");
config.set("debug", "all");
config.set("auto.offset.reset", "earliest");
if let Some(overrides) = config_overrides {
for (key, value) in overrides {
config.set(key, value);
}
}
config
}
test case below
#[tokio::test]
pub async fn test_producer(){
let consumer = create_base_consumer("gp3", None);
let mut tpl = TopicPartitionList::new();
tpl.add_partition_offset("fast_topic",0, Offset::Beginning).unwrap();
tpl.add_partition_offset("fast_topic",1, Offset::Beginning).unwrap();
tpl.add_partition_offset("fast_topic",2, Offset::Beginning).unwrap();
tpl.add_partition_offset("fast_topic",3, Offset::Beginning).unwrap();
tpl.add_partition_offset("fast_topic",4, Offset::Beginning).unwrap();
tpl.add_partition_offset("fast_topic",5, Offset::Beginning).unwrap();
consumer.assign(&tpl).unwrap();
let msgs = consumer.poll(Timeout::After(Duration::from_secs(4))).unwrap().unwrap();
println!("msg is:: {:?} ",msgs.payload().unwrap());
// for (i, message) in consumer.iter().take(3).enumerate() {
// match message {
// Ok(message) => {
// print!("msg is: {}",message.payload_view::<str>().unwrap().unwrap() )
// },
// Err(e) => panic!("Error receiving message: {:?}", e),
// }
// }
}
The above throws error
stack backtrace:
0: rust_begin_unwind
at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/std/src/panicking.rs:575:5
1: core::panicking::panic_fmt
at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/panicking.rs:64:14
2: core::panicking::panic
at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/panicking.rs:111:5
3: core::option::Option<T>::unwrap
at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/option.rs:778:21
4: rust_fair_scheduler::utils::utils::test::test_producer::{{closure}}
at ./src/utils/utils.rs:329:20
5: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/future/future.rs:124:9
6: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/future/future.rs:124:9
Hi. i have the same issue. found any work around? i posted it in this stack overflow question
Finally this is the working code. I am trying to stitch code from multiple places in the project.
pub fn create_consumer(
broker_url: &str,
group_id: &str,
config_overrides: Option<HashMap<&str, &str>>,
) -> BaseConsumer<ConsumerTestContext> {
consumer_config(broker_url, group_id, config_overrides)
.create_with_context(ConsumerTestContext {})
.expect("Consumer creation failed")
}
pub struct ConsumerTestContext {}
impl ClientContext for ConsumerTestContext {
fn stats(&self, stats: Statistics) {
let stats_str = format!("{:?}", stats);
// println!("Stats received: {} bytes", stats_str.len());
}
}
impl ConsumerContext for ConsumerTestContext {
// fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {
// println!("Committing offsets: {:?}", result);
// }
}
now reading from the topic
/* get consumer; start from the offsets mentioned in partition_offsets; or from Beginning */
let mut tpl = TopicPartitionList::new();
for i in 0..config.max_partitions {
let mut offset = rdkafka::Offset::Beginning;
if partition_offsets.contains_key(&i) {
/* if offsets are not available start from Beginning */
let offset_num = *partition_offsets.get(&i).unwrap();
offset = rdkafka::Offset::Offset(offset_num);
}
tpl.add_partition_offset(&config.topic, i as i32, offset)
.unwrap();
}
kafka_consumer
.assign(&tpl)
.expect("couldnt assign consumer to topic");
/* done: got consumer assigned to topic, got partition_offsets */
loop {
let polled_msg = kafka_consumer.poll(Duration::from_millis(config.poll_time_ms));
if polled_msg.is_none() {
break;
}
}
the above should work.
@chandu-1101 Thanks . this solutions works. but assign is different from subscribe in terms of functionality. I'm super new to rust and can't do this heavy debugging, reading codes etc and finding solutions. do you know a solution that works with subscribe?
You need to go through these: https://github.com/fede1024/rust-rdkafka/tree/master/tests and figure out yourself. It requires several experiments to get something working.
Sorry, i didn't realise you were asking about subscribe
here is the code that worked for me.
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", "gp")
.set("bootstrap.servers", "localhost:9092")
.set("session.timeout.ms", "6000")
.set("max.poll.interval.ms", "6000")
.set("enable.auto.commit", "true")
.set("auto.offset.reset", "earliest")
.create()
.expect("unable to create stream consumer ");
consumer
.subscribe(&["d_tmain"])
.expect("couldnt subscribe to the topic ");
for i in 0..9 {
let msg = consumer.recv().await.unwrap();
println!(
"message 1st pass:::: i{} ... {:?}",
i,
std::str::from_utf8(msg.payload().unwrap())
)
}