rust-rdkafka
rust-rdkafka copied to clipboard
How can I determine the partition assign in the stream consumer case?
Hi, fede1024
We have a scenario when using rust-rdkafka
, first we use tokio
as an asynchronous framework, so we choose a high level consumer, so we can use stream()
for async/await
operations
But we need to get the assignment of the consumer's partition as early as possible, usually by calling poll
and assignment
functions until the assignment is not empty, but in the case of stream consumer we can't call poll()
directly.
So is there an elegant way to get the assignment in the case of StreamConsumer
?
The construction of a StreamConsumer
starts a background poll task. After you construct a StreamConsumer
, you could probably just call assignment
repeatedly until it's not empty. Not exactly elegant, but it'd probably work.
Thank you very much for your reply!
But there may be a scenario. For example, I use 4 consumers to consume a topic with 3 partitions, usually, there will be a consumer who has not been assigned so that it will keep a dead-loop.
Or is there a way to know that the current consumer group is in a state where all the partitions have been assigned?
Thank you very much!
Hrm, I'm afraid I just don't know! That state is tracked internally by librdkafka, but I'm not sure if it is exposed anywhere. You might try logging in the rebalance callback and seeing if that yields any insights.
I just wanted to chime in here, on the lack of access to BaseConsumer.poll()
from StreamConsumer
. I found that if I didn't have a simple loop that called BaseConsumer.poll
, the stats
callback would never be called by librdkafka
. I had to add a tiny tokio task that called poll
every 250ms to get reliable metrics reporting.
I just wanted to chime in here, on the lack of access to
BaseConsumer.poll()
fromStreamConsumer
. I found that if I didn't have a simple loop that calledBaseConsumer.poll
, thestats
callback would never be called bylibrdkafka
. I had to add a tiny tokio task that calledpoll
every 250ms to get reliable metrics reporting.
Have you tried this with the implementation in the latest release? If you're using split partition queues, you'll need to launch a background task that drains the main message queue:
https://github.com/fede1024/rust-rdkafka/blob/bbd3417b02e354ebc194b315cbf959222a35f016/src/consumer/stream_consumer.rs#L310-L322
Exposing poll
directly isn't safe because the StreamConsumer
and BaseConsumer
configure the underlying consumer queue directly. But the example in the code linked above should ensure that those stats callbacks get drained.
I had the same requirement and I ended up creating a StreamConsumer<RebalanceContext> where the RebalanceContext
has a field events: tokio::sync::broadcast::Sender<ControlEvent>
, and in the pre_rebalance hook, it would send down a ControlEvent with the TopicPartitionList
that is given when this fires.
Then I created a stream of ControlEvents using tokio_stream::wrappers, merged the data and control even streams with futures::streams::select
. In the stream I did a try_filter_map
to set the current partitions selected and then filtered out control events from the stream.
Looking now it seems rather complicated, but it works.