rust-rdkafka icon indicating copy to clipboard operation
rust-rdkafka copied to clipboard

`offsets_for_times` for future time

Open tzachshabtay opened this issue 2 years ago • 5 comments

When I call offsets_for_times for a future time, instead of getting the last offset I'm getting rdkafka::Offset::End which doesn't contain the latest offset. This forces me to call fetch_watermarks to get the latest offset.

This is actually a big problem for me, I want to get the latest offset for a big list of partitions at once, calling fetch_watermarks for each partition is too slow. The recommended solution by librdkafka is to call rd_kafka_offsets_for_times with max int64 which I can't do with this wrapper because of this issue.

tzachshabtay avatar Apr 07 '22 14:04 tzachshabtay

@tzachshabtay can you provide a self contained example of what you have tried?

davidblewett avatar Apr 07 '22 14:04 davidblewett

@davidblewett do you have a template for a self contained example somewhere which I can fork?

tzachshabtay avatar Apr 08 '22 18:04 tzachshabtay

@tzachshabtay I'm not aware of a template, no. However, there are several examples of different functionality in the repo. You could adapt one that illustrates the problem you're seeing ( like https://github.com/fede1024/rust-rdkafka/blob/master/examples/simple_consumer.rs , without the actual loop consuming from the topic maybe).

davidblewett avatar Apr 08 '22 19:04 davidblewett

use rdkafka::config::ClientConfig;
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;
use std::time::Duration;

fn main() {
    let consumer: BaseConsumer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .set("group.id", "group_test")
        .set("enable.auto.commit", "false")
        .create().unwrap();

    let topic = "__consumer_offsets";
    let timeout = Duration::from_secs(10);
    let mut assignment = TopicPartitionList::new();
    assignment.add_partition_offset(topic, 0, rdkafka::Offset::Offset(std::i64::MAX)).unwrap();
    let offsets = consumer.offsets_for_times(assignment, timeout).unwrap();
    let partition_offsets = offsets.elements_for_topic(topic);
    let partition_offset = &partition_offsets[0];
    match partition_offset.offset() {
        rdkafka::Offset::Offset(_) => eprintln!("This is what I expect to happen (getting the latest offset)"),
        rdkafka::Offset::End => eprintln!("This is what actually happens (no specific offset is returned)"),
        _ => eprintln!("Unexpected")
    }
}

tzachshabtay avatar Apr 11 '22 12:04 tzachshabtay

@tzachshabtay thanks! I'll take a look at this during my next support rotation (in a few weeks).

davidblewett avatar Apr 14 '22 15:04 davidblewett

Hey @davidblewett I was wondering if you had time to look at this yet. Thanks.

tzachshabtay avatar Nov 15 '22 17:11 tzachshabtay

@tzachshabtay I investigated the behaviour of librdkafka's offsets_for_times and found out that calling the function with INT64_MAX sets the offset to -1 (equivalent to rdkafka::Offset::End), the same happens when using rust-rdkafka, here's some brief discussion on the topic https://github.com/confluentinc/librdkafka/issues/3737.

Alternatively when calling librdkafka's offsets_for_times with -1 the return offset is set to the latest. This same behavior can be seen with rust-rdkafka (when using rdkafka::Offset::End). This is the code I used to test (in librdkafka), I added some lines into the consumer example code of the confluent getting started guide (at around line 73). The output using INT64_MAX as the offset:

$ ./consumer getting-started.ini 
Topic: purchases, Partition: 0, Offset: -1

However using -1 as the offset, the output:

$ ./consumer getting-started.ini 
Topic: purchases, Partition: 0, Offset: 90

Which is the expected behavior described in this issue. To replicate it in rust-rdkafka I made some changes to the template provided in this issue:

use rdkafka::config::ClientConfig;
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;
use std::time::Duration;

fn main() {
    let consumer: BaseConsumer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .set("group.id", "group_test")
        .set("enable.auto.commit", "false")
        .create().unwrap();

    let topic = "test"; // I used test as a partition
    let timeout = Duration::from_secs(10);
    let mut assignment = TopicPartitionList::new();
    // Set the offset to End
    assignment.add_partition_offset(topic, 0, rdkafka::Offset::End).unwrap();
    let offsets = consumer.offsets_for_times(assignment, timeout).unwrap();
    let partition_offsets = offsets.elements_for_topic(topic);
    let partition_offset = &partition_offsets[0];
    // Print the output
    println!("Topic: {}, Partition: {}, Latest Offset: {:?}", partition_offset.topic(), partition_offset.partition(), partition_offset.offset());
    match partition_offset.offset() {
        rdkafka::Offset::Offset(_) => eprintln!("This is what I expect to happen (getting the latest offset)"),
        rdkafka::Offset::End => eprintln!("This is what actually happens (no specific offset is returned)"),
        _ => eprintln!("Unexpected")
    }
}

The output is:

Topic: test, Partition: 0, Latest Offset: Offset(47)
This is what I expect to happen (getting the latest offset)

As a quick note, while testing I used a local "test" partition, Kafka-ZooKeeper, rust-rdkafka version 0.30.0 and librdkafka version 1.8.0.

Hope this proves helpful.

Tortol27 avatar May 27 '23 19:05 Tortol27