rust-rdkafka icon indicating copy to clipboard operation
rust-rdkafka copied to clipboard

Problem consuming data always from beginning.

Open sectasy0 opened this issue 1 year ago • 3 comments

Hello there,

I'm trying to get all the data from the Kafka topic from the beginning. I set the auto.offset.reset flag to earliest and group.id to a random string, this works almost fine, but the problem is when I read the data, because every time I run this consumer, it gives me a different count of data, the data on the topic is around 13k, The most I have been able to read is 12k.

I also tried to set partitions offset to the beginning but the result was always the same. After that, I used the assign method instead of subscribing.

let t: &str = &topic[..];
partitions.add_partition_offset(t, 0, Offset::Beginning).unwrap();
partitions.add_partition_offset(t, 1, Offset::Beginning).unwrap();
partitions.add_partition_offset(t, 2, Offset::Beginning).unwrap();

Here is my consuming function:

type LoggingConsumer = StreamConsumer<CustomContext, AsyncStdRuntime>;

async fn consume(consumer: &LoggingConsumer) -> Result<Vec<String>, Box<dyn Error>> {
    let mut result: Vec<String> = vec![];
    loop {
        info!("Consuming message ...");
        let mut stream = consumer.stream();
        let message = stream.next().await;
        match message {
            Some(Ok(message)) => println!(
                "Received message: {}",
                match message.payload_view::<str>() {
                    None => {
                        error!("Message payload is not a string");
                        ""
                    },
                    Some(Ok(mess)) => {
                        info!("Message received: {}", mess);
                        result.push(String::from(mess));
                        mess
                    },
                    Some(Err(e)) => {
                        error!("Message payload error: {}", e);
                        ""
                    },
                }
            ),
            Some(Err(KafkaError::PartitionEOF(_))) => {
                info!("Reached end of topic");
                break;
            }
            Some(Err(e)) => {
                eprintln!("Error receiving message: {}", e);
                if let RDKafkaErrorCode::TopicAuthorizationFailed = e.rdkafka_error_code().unwrap() {
                    info!("Topic authorization failed: {}", e);
                    break;
                }
                break;
            }
            None => {
                eprintln!("Consumer unexpectedly returned no messages");
            }
        }
    }

    Ok(result)
}

sectasy0 avatar Aug 31 '22 08:08 sectasy0

After a few hours of research, this problem could be caused by this consumer looping method, I don't know exactly why but I saw in logs the offset is correctly set to the beginning, and every time I run this it gives me a different count of messages.

sectasy0 avatar Sep 02 '22 06:09 sectasy0

You might need to move this line outside of the loop:

        let mut stream = consumer.stream();

As written in your example, consumer.stream() is constructing a new stream on each iteration, while you probably only need to get one stream and consume all the messages through that.

jches avatar Sep 10 '22 22:09 jches

You might need to move this line outside of the loop:

        let mut stream = consumer.stream();

As written in your example, consumer.stream() is constructing a new stream on each iteration, while you probably only need to get one stream and consume all the messages through that.

Unfortunately, nothing happened after moving this line outside the loop, still won't return all the messages.

sectasy0 avatar Sep 12 '22 17:09 sectasy0

It looks like your data is split across 3 partitions? Are they evenly distributed and have you tried recording which partition(s) you are consuming from? If you are running this loop immediately after creating the consumer, there's a possibility that the loop completes before the rebalance happens; I'm not sure of the default behavior in that case. For 13K messages, you may want to log out additional metadata (offset, partition), to diagnose the issue.

uncommoncense avatar Oct 23 '22 20:10 uncommoncense

If there is more than one partition, the break statement in the PartitionEOF branch of the match statement will cause this to exit the loop and stop consuming when the end of data is reached in one of the partitions, likely leaving the other partitions partially consumed.

jches avatar Oct 31 '22 21:10 jches

☝🏽 that is almost surely the issue. Good spot, @jches!

benesch avatar Oct 31 '22 23:10 benesch