rust-rdkafka icon indicating copy to clipboard operation
rust-rdkafka copied to clipboard

WARN librdkafka: librdkafka: MAXPOLL [thrd:main]: Application maximum poll interval (300000ms) exceeded by 399ms

Open bbigras opened this issue 3 years ago • 6 comments

I got this warning and my program seems to have stopped consuming:

  Jan 18 00:47:06.807  WARN librdkafka: librdkafka: MAXPOLL [thrd:main]: Application maximum poll interval (300000ms) exceeded by 399ms (adjust max.poll.interval.ms for long-running message processing): leaving group
    at src/client.rs:65

I don't know if my code is blocking for ever.

The warning seems to come from rust-rdkafka. Any way I can get that error form stream.next().await so I can panic or restart?

bbigras avatar Jan 18 '22 16:01 bbigras

Nope, don’t think so. I think it’s just not in the librdkafka design. But you can create a custom ClientContext and override the log method to install a handler. If all you’re trying to do is panic on this warning, that shouldn’t be too hard.

benesch avatar Jan 27 '22 07:01 benesch

I've encountered this issue several times recently and I'm not quite sure how best to handle it at the moment. Our consumer emits a very similar log message that also indicates the consumer has left the consumer group:

librdkafka: MAXPOLL [thrd:main]: Application maximum poll interval (300000ms) exceeded by 346ms (adjust max.poll.interval.ms for long-running message processing): leaving group

The underlying cause in our case is still TBD as it occurs infrequently. When it does occur, the process running the consumer doesn't panic/exit and seems otherwise unaffected, except the consumer is no longer consuming any messages. My uneducated guess is that it has something to do with the future running the consumer stops being scheduled on the event loop, but it really is just a guess at this point.

Thanks for your suggestion @benesch. I will look into seeing what can be done with the ClientContext to avoid the consumer entering this "zombie" state. If anyone has other suggestions or ideas in the meantime, I'd be interested to hear them. Thanks!

la-mar avatar Jan 30 '22 23:01 la-mar

Encountering this as well, @la-mar how did you fix it if at all?

Zarathustra2 avatar Aug 23 '23 21:08 Zarathustra2

In my case it was an issue of having a multiple consumers spawning each in its own thread. One consumer in particular had 1000x more messages on its topic than the other and was doing some more heavy lifting work which did not allow other consumers to run.

Maybe if someone else runs into this, it may make sense to make sure that nothing in your application/tokio runtime is blocking

Zarathustra2 avatar Aug 27 '23 14:08 Zarathustra2

So, I also ran into this issue a while ago, but I was able to manage it by doing a couple of things:

1/ Store the offsets manually, opt to not commit offsets manually, and rely on the auto-offset committer to do the actual commit for you:

let consumer: BaseConsumer<_> = ClientConfig::new()
    // We are opting *NOT* to manually commit offsets to Kafka because we are processing
    // messages faster than we can commit them. This leads to a huge queue of offsets and
    // that will trigger the Kafka Group to be recreated and start the cycle all over again.
    // ref: https://github.com/confluentinc/librdkafka/issues/826#issuecomment-252901186
    .set("enable.auto.commit", "true")
    .set("enable.auto.offset.store", "false")
    .set("auto.offset.reset", "earliest")
    .set("fetch.wait.max.ms", "50")
    // Explicitly set this and interrupt the tokio loop to ensure we're always attempting
    // to get a message within this boundary.
    .set("session.timeout.ms", "60000")
    .set("max.poll.interval.ms", "60000")
    .unwrap();

2/ After receiving and processing a Kafka message, be sure to store the offset:

// As per the note above, we are going to store the offset instead of committing
// the offset ourselves to reduce the chances of overloading the queue.
consumer.store_offset(msg.topic(), msg.partition(), msg.offset())?;

This doesn't prevent the issue entirely, but it definitely alleviates much the issue. We've been using this to process real-time price data without much incident. We also have a custom context that was use to panic so we can restart our container should this error occur. The only reason this works is because our program is pretty much stateless.

helios2k6 avatar Aug 27 '23 15:08 helios2k6