rust-rdkafka
rust-rdkafka copied to clipboard
`offsets_for_times` for future time
When I call offsets_for_times
for a future time, instead of getting the last offset I'm getting rdkafka::Offset::End
which doesn't contain the latest offset. This forces me to call fetch_watermarks
to get the latest offset.
This is actually a big problem for me, I want to get the latest offset for a big list of partitions at once, calling fetch_watermarks
for each partition is too slow. The recommended solution by librdkafka
is to call rd_kafka_offsets_for_times
with max int64 which I can't do with this wrapper because of this issue.
@tzachshabtay can you provide a self contained example of what you have tried?
@davidblewett do you have a template for a self contained example somewhere which I can fork?
@tzachshabtay I'm not aware of a template, no. However, there are several examples of different functionality in the repo. You could adapt one that illustrates the problem you're seeing ( like https://github.com/fede1024/rust-rdkafka/blob/master/examples/simple_consumer.rs , without the actual loop consuming from the topic maybe).
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;
use std::time::Duration;
fn main() {
let consumer: BaseConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", "group_test")
.set("enable.auto.commit", "false")
.create().unwrap();
let topic = "__consumer_offsets";
let timeout = Duration::from_secs(10);
let mut assignment = TopicPartitionList::new();
assignment.add_partition_offset(topic, 0, rdkafka::Offset::Offset(std::i64::MAX)).unwrap();
let offsets = consumer.offsets_for_times(assignment, timeout).unwrap();
let partition_offsets = offsets.elements_for_topic(topic);
let partition_offset = &partition_offsets[0];
match partition_offset.offset() {
rdkafka::Offset::Offset(_) => eprintln!("This is what I expect to happen (getting the latest offset)"),
rdkafka::Offset::End => eprintln!("This is what actually happens (no specific offset is returned)"),
_ => eprintln!("Unexpected")
}
}
@tzachshabtay thanks! I'll take a look at this during my next support rotation (in a few weeks).
Hey @davidblewett I was wondering if you had time to look at this yet. Thanks.
@tzachshabtay I investigated the behaviour of librdkafka
's offsets_for_times
and found out that calling the function with INT64_MAX
sets the offset to -1
(equivalent to rdkafka::Offset::End
), the same happens when using rust-rdkafka
, here's some brief discussion on the topic https://github.com/confluentinc/librdkafka/issues/3737.
Alternatively when calling librdkafka
's offsets_for_times
with -1
the return offset is set to the latest. This same behavior can be seen with rust-rdkafka
(when using rdkafka::Offset::End
).
This is the code I used to test (in librdkafka), I added some lines into the consumer example code of the confluent getting started guide (at around line 73).
The output using INT64_MAX
as the offset:
$ ./consumer getting-started.ini
Topic: purchases, Partition: 0, Offset: -1
However using -1
as the offset, the output:
$ ./consumer getting-started.ini
Topic: purchases, Partition: 0, Offset: 90
Which is the expected behavior described in this issue.
To replicate it in rust-rdkafka
I made some changes to the template provided in this issue:
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;
use std::time::Duration;
fn main() {
let consumer: BaseConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", "group_test")
.set("enable.auto.commit", "false")
.create().unwrap();
let topic = "test"; // I used test as a partition
let timeout = Duration::from_secs(10);
let mut assignment = TopicPartitionList::new();
// Set the offset to End
assignment.add_partition_offset(topic, 0, rdkafka::Offset::End).unwrap();
let offsets = consumer.offsets_for_times(assignment, timeout).unwrap();
let partition_offsets = offsets.elements_for_topic(topic);
let partition_offset = &partition_offsets[0];
// Print the output
println!("Topic: {}, Partition: {}, Latest Offset: {:?}", partition_offset.topic(), partition_offset.partition(), partition_offset.offset());
match partition_offset.offset() {
rdkafka::Offset::Offset(_) => eprintln!("This is what I expect to happen (getting the latest offset)"),
rdkafka::Offset::End => eprintln!("This is what actually happens (no specific offset is returned)"),
_ => eprintln!("Unexpected")
}
}
The output is:
Topic: test, Partition: 0, Latest Offset: Offset(47)
This is what I expect to happen (getting the latest offset)
As a quick note, while testing I used a local "test" partition, Kafka-ZooKeeper, rust-rdkafka
version 0.30.0 and librdkafka
version 1.8.0.
Hope this proves helpful.