rust-rdkafka
rust-rdkafka copied to clipboard
Discovering the partition count in the subscripted topic
I try to discovery the partitions inside the topics I subscribed. Even if it looks simple, I could not manage it with this code
#[test]
fn test_partition() -> KafkaResult<()>{
let mut client_config = ClientConfig::new();
client_config.set("group.id", "try_me")
.set("bootstrap.servers", "localhost:9092")
.set("enable.partition.eof", "false")
.set("enable.auto.commit", "true")
.set("auto.offset.reset", "smallest");
client_config.set_log_level(RDKafkaLogLevel::Debug);
let consumer = BaseConsumer::from_config(&client_config)?;
consumer.subscribe(&["quickstart-events"])?;
let topic_partition_list = consumer.subscription().unwrap();
println!("{:?}", topic_partition_list);
Ok(())
}
which prints
>> TPL {(quickstart-events, -1): Invalid, }
However,
kafka-topics --describe --bootstrap-server localhost:9092 --topic quickstart-events
gives the
Topic: quickstart-events TopicId: 3IXtzC7TRfOjxclDWklNFA PartitionCount: 6 ReplicationFactor: 1 Configs:
Topic: quickstart-events Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: quickstart-events Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: quickstart-events Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: quickstart-events Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: quickstart-events Partition: 4 Leader: 1 Replicas: 1 Isr: 1
Topic: quickstart-events Partition: 5 Leader: 1 Replicas: 1 Isr: 1
I am not sure what I am doing wrong, any guess? I could not find anything on the documentation as well.
You need to use another API, as a subscription is only created asynchronously. When you call consumer.subscribe() it does not block until the subscription is successful. The signal you need to wait for here is the first rebalance call with a Rebalance::Assign(...) argument in the ConsumerContext: https://docs.rs/rdkafka/0.29.0/rdkafka/consumer/trait.ConsumerContext.html#method.post_rebalance If you received this, you usually know you are successfully subscribed. AFAIK, there is no other way to find this out. Also AFAIK the consumer.subscription() will not list all partitions, but only the topics. consumer.assignment() will only list the partitions assigned to this member in the consumer group, so you need to use the following instead:
You can ask the cluster about your topics via: consumer.client().fetch_metadata(Some("quickstart-events"), None) even without subscribing to the topic: https://docs.rs/rdkafka/0.29.0/rdkafka/client/struct.Client.html#method.fetch_metadata