confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

Do `IConsumer.Pause` and `IConsumer.Resume` not take immediate effect?

Open Aaronontheweb opened this issue 9 months ago • 2 comments

I'm in the process of trying to troubleshoot some partition re-balancing bugs inside https://github.com/akkadotnet/Akka.Streams.Kafka and here's something I've noticed:

var currentAssignment = _consumer.Assignment.ToImmutableList();
            var initialRebalanceInProcess = _rebalanceInProgress.Value;

            var partitionsToFetch = _requests.Values.SelectMany(v => v.Topics)
                .Where(p => currentAssignment.Contains(p))
                .ToImmutableHashSet();
            
            if (partitionsToFetch.IsEmpty || _requests.IsEmpty())
            {
                if(_log.IsDebugEnabled)
                    _log.Debug("Requests are empty or no partitions to fetch - partitionsToFetch.IsEmpty={0}, _requests.IsEmpty={1}. Attempting to consume with paused partitions.", 
                        partitionsToFetch.IsEmpty, 
                        _requests.IsEmpty());
                PausePartitions(currentAssignment);
                try
                {
                    // BUG: we are expecting a `null` value here - namely we're waiting for partition rebalancing events to complete
                    var consumed = _consumer.Consume(0); // will still return events even though all partitions are paused

When we call Consume(0) here, we will still get data from the IConsumer even though we just paused all partitions. Is this something I should expect to happen or is this a bug? I couldn't find any documentation on this anywhere I looked.

Aaronontheweb avatar Mar 06 '25 19:03 Aaronontheweb

It looks like the underlying librdkafka operations were, at one point, asynchronous but are now synchronous? https://github.com/confluentinc/librdkafka/issues/2455#issuecomment-521165132

Aaronontheweb avatar Mar 06 '25 20:03 Aaronontheweb

Yeah it looks like in this PR that behavior was introduced? https://github.com/confluentinc/librdkafka/pull/2473

Aaronontheweb avatar Mar 06 '25 20:03 Aaronontheweb