rust-rdkafka icon indicating copy to clipboard operation
rust-rdkafka copied to clipboard

Discovering the partition count in the subscripted topic

Open metesynnada opened this issue 3 years ago • 1 comments
trafficstars

I try to discovery the partitions inside the topics I subscribed. Even if it looks simple, I could not manage it with this code

#[test]
fn test_partition() -> KafkaResult<()>{
    let mut client_config = ClientConfig::new();
        client_config.set("group.id", "try_me")
        .set("bootstrap.servers", "localhost:9092")
        .set("enable.partition.eof", "false")
        .set("enable.auto.commit", "true")
        .set("auto.offset.reset", "smallest");
    client_config.set_log_level(RDKafkaLogLevel::Debug);
    let consumer = BaseConsumer::from_config(&client_config)?;
    consumer.subscribe(&["quickstart-events"])?;
    let topic_partition_list = consumer.subscription().unwrap();
    println!("{:?}", topic_partition_list);
    Ok(())
}

which prints

>> TPL {(quickstart-events, -1): Invalid, }

However,

kafka-topics --describe --bootstrap-server localhost:9092 --topic quickstart-events

gives the

Topic: quickstart-events	TopicId: 3IXtzC7TRfOjxclDWklNFA	PartitionCount: 6	ReplicationFactor: 1	Configs: 
	Topic: quickstart-events	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: quickstart-events	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
	Topic: quickstart-events	Partition: 2	Leader: 1	Replicas: 1	Isr: 1
	Topic: quickstart-events	Partition: 3	Leader: 1	Replicas: 1	Isr: 1
	Topic: quickstart-events	Partition: 4	Leader: 1	Replicas: 1	Isr: 1
	Topic: quickstart-events	Partition: 5	Leader: 1	Replicas: 1	Isr: 1

I am not sure what I am doing wrong, any guess? I could not find anything on the documentation as well.

metesynnada avatar Nov 05 '22 20:11 metesynnada

You need to use another API, as a subscription is only created asynchronously. When you call consumer.subscribe() it does not block until the subscription is successful. The signal you need to wait for here is the first rebalance call with a Rebalance::Assign(...) argument in the ConsumerContext: https://docs.rs/rdkafka/0.29.0/rdkafka/consumer/trait.ConsumerContext.html#method.post_rebalance If you received this, you usually know you are successfully subscribed. AFAIK, there is no other way to find this out. Also AFAIK the consumer.subscription() will not list all partitions, but only the topics. consumer.assignment() will only list the partitions assigned to this member in the consumer group, so you need to use the following instead:

You can ask the cluster about your topics via: consumer.client().fetch_metadata(Some("quickstart-events"), None) even without subscribing to the topic: https://docs.rs/rdkafka/0.29.0/rdkafka/client/struct.Client.html#method.fetch_metadata

mfelsche avatar Nov 05 '22 20:11 mfelsche