rust-rdkafka icon indicating copy to clipboard operation
rust-rdkafka copied to clipboard

When calling Consumer::subscribe, there is no way to know which topic subscription has failed

Open edbird opened this issue 11 months ago • 0 comments

When calling the subscribe function with a vec of &str, to subscribe to multiple topics, there is no way to know which topic subscription causes a failure in the case where 1 or more topics does not exist on a particular Kafka cluster.

In the implementation of base_consumer.rs I can see this function:

    fn subscribe(&self, topics: &[&str]) -> KafkaResult<()> {
        let mut tpl = TopicPartitionList::new();
        for topic in topics {
            tpl.add_topic_unassigned(topic);
        }
        let ret_code = unsafe { rdsys::rd_kafka_subscribe(self.client.native_ptr(), tpl.ptr()) };
        if ret_code.is_error() {
            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
            return Err(KafkaError::Subscription(error));
        };
        Ok(())
    }

The important line is let ret_code = unsafe { rdsys::rd_kafka_subscribe(self.client.native_ptr(), tpl.ptr()) };.

This line returns an error code, but in doing so the information about which topic subscription has failed is lost.

Clearly, this call an external C function, so it may not be an easy job to get this information back.

However, from a user point of view this information is quite important.

For the particular use-case which I am working with, I have processes which subscribe typically to between 5 and 10 topics. If one name is spelled incorrectly, or a topic has not been created yet, then of course the subscribe will fail, but it isn't necessarily obvious as to why. (Which partition doesn't exist?)

May I kindly ask that such feedback is provided via an error message channel such that the client can figure out or report back to the user the missing or failed topic name?

edbird avatar Jul 17 '23 20:07 edbird