rust-rdkafka
rust-rdkafka copied to clipboard
Problem consuming data always from beginning.
Hello there,
I'm trying to get all the data from the Kafka topic from the beginning. I set the auto.offset.reset
flag to earliest and group.id
to a random string, this works almost fine, but the problem is when I read the data, because every time I run this consumer, it gives me a different count of data, the data on the topic is around 13k, The most I have been able to read is 12k.
I also tried to set partitions offset to the beginning but the result was always the same. After that, I used the assign method instead of subscribing.
let t: &str = &topic[..];
partitions.add_partition_offset(t, 0, Offset::Beginning).unwrap();
partitions.add_partition_offset(t, 1, Offset::Beginning).unwrap();
partitions.add_partition_offset(t, 2, Offset::Beginning).unwrap();
Here is my consuming function:
type LoggingConsumer = StreamConsumer<CustomContext, AsyncStdRuntime>;
async fn consume(consumer: &LoggingConsumer) -> Result<Vec<String>, Box<dyn Error>> {
let mut result: Vec<String> = vec![];
loop {
info!("Consuming message ...");
let mut stream = consumer.stream();
let message = stream.next().await;
match message {
Some(Ok(message)) => println!(
"Received message: {}",
match message.payload_view::<str>() {
None => {
error!("Message payload is not a string");
""
},
Some(Ok(mess)) => {
info!("Message received: {}", mess);
result.push(String::from(mess));
mess
},
Some(Err(e)) => {
error!("Message payload error: {}", e);
""
},
}
),
Some(Err(KafkaError::PartitionEOF(_))) => {
info!("Reached end of topic");
break;
}
Some(Err(e)) => {
eprintln!("Error receiving message: {}", e);
if let RDKafkaErrorCode::TopicAuthorizationFailed = e.rdkafka_error_code().unwrap() {
info!("Topic authorization failed: {}", e);
break;
}
break;
}
None => {
eprintln!("Consumer unexpectedly returned no messages");
}
}
}
Ok(result)
}
After a few hours of research, this problem could be caused by this consumer looping method, I don't know exactly why but I saw in logs the offset is correctly set to the beginning, and every time I run this it gives me a different count of messages.
You might need to move this line outside of the loop
:
let mut stream = consumer.stream();
As written in your example, consumer.stream()
is constructing a new stream on each iteration, while you probably only need to get one stream and consume all the messages through that.
You might need to move this line outside of the
loop
:let mut stream = consumer.stream();
As written in your example,
consumer.stream()
is constructing a new stream on each iteration, while you probably only need to get one stream and consume all the messages through that.
Unfortunately, nothing happened after moving this line outside the loop, still won't return all the messages.
It looks like your data is split across 3 partitions? Are they evenly distributed and have you tried recording which partition(s) you are consuming from? If you are running this loop immediately after creating the consumer, there's a possibility that the loop completes before the rebalance happens; I'm not sure of the default behavior in that case. For 13K messages, you may want to log out additional metadata (offset, partition), to diagnose the issue.
If there is more than one partition, the break
statement in the PartitionEOF
branch of the match statement will cause this to exit the loop and stop consuming when the end of data is reached in one of the partitions, likely leaving the other partitions partially consumed.
☝🏽 that is almost surely the issue. Good spot, @jches!