rust-rdkafka icon indicating copy to clipboard operation
rust-rdkafka copied to clipboard

How can I determine the partition assign in the stream consumer case?

Open shanicky opened this issue 2 years ago • 6 comments

Hi, fede1024

We have a scenario when using rust-rdkafka, first we use tokio as an asynchronous framework, so we choose a high level consumer, so we can use stream() for async/await operations

But we need to get the assignment of the consumer's partition as early as possible, usually by calling poll and assignment functions until the assignment is not empty, but in the case of stream consumer we can't call poll() directly.

So is there an elegant way to get the assignment in the case of StreamConsumer?

shanicky avatar Nov 08 '21 07:11 shanicky

The construction of a StreamConsumer starts a background poll task. After you construct a StreamConsumer, you could probably just call assignment repeatedly until it's not empty. Not exactly elegant, but it'd probably work.

benesch avatar Nov 10 '21 04:11 benesch

Thank you very much for your reply!

But there may be a scenario. For example, I use 4 consumers to consume a topic with 3 partitions, usually, there will be a consumer who has not been assigned so that it will keep a dead-loop.

Or is there a way to know that the current consumer group is in a state where all the partitions have been assigned?

Thank you very much!

shanicky avatar Nov 10 '21 07:11 shanicky

Hrm, I'm afraid I just don't know! That state is tracked internally by librdkafka, but I'm not sure if it is exposed anywhere. You might try logging in the rebalance callback and seeing if that yields any insights.

benesch avatar Nov 21 '21 02:11 benesch

I just wanted to chime in here, on the lack of access to BaseConsumer.poll() from StreamConsumer. I found that if I didn't have a simple loop that called BaseConsumer.poll, the stats callback would never be called by librdkafka. I had to add a tiny tokio task that called poll every 250ms to get reliable metrics reporting.

davidblewett avatar Dec 06 '21 17:12 davidblewett

I just wanted to chime in here, on the lack of access to BaseConsumer.poll() from StreamConsumer. I found that if I didn't have a simple loop that called BaseConsumer.poll, the stats callback would never be called by librdkafka. I had to add a tiny tokio task that called poll every 250ms to get reliable metrics reporting.

Have you tried this with the implementation in the latest release? If you're using split partition queues, you'll need to launch a background task that drains the main message queue:

https://github.com/fede1024/rust-rdkafka/blob/bbd3417b02e354ebc194b315cbf959222a35f016/src/consumer/stream_consumer.rs#L310-L322

Exposing poll directly isn't safe because the StreamConsumer and BaseConsumer configure the underlying consumer queue directly. But the example in the code linked above should ensure that those stats callbacks get drained.

benesch avatar Dec 06 '21 18:12 benesch

I had the same requirement and I ended up creating a StreamConsumer<RebalanceContext> where the RebalanceContext has a field events: tokio::sync::broadcast::Sender<ControlEvent>, and in the pre_rebalance hook, it would send down a ControlEvent with the TopicPartitionList that is given when this fires.

Then I created a stream of ControlEvents using tokio_stream::wrappers, merged the data and control even streams with futures::streams::select. In the stream I did a try_filter_map to set the current partitions selected and then filtered out control events from the stream.

Looking now it seems rather complicated, but it works.

mindreader avatar Jan 01 '22 15:01 mindreader