rust-rdkafka icon indicating copy to clipboard operation
rust-rdkafka copied to clipboard

Upgrade from 0.34.0 to 0.36.0 causes consumer to stop consuming but keep running

Open zhrebicek opened this issue 6 months ago • 34 comments

As the title says, after upgrade from 0.34.0 to 0.36.0 we started getting Kafka consumers that would stop consuming messages, start leaking memory and do not leave the consumer group.

  • I do not have clear reproducer as it happened to use across multiple production environments in random fashion.
    • And for example in our staging it never happened.
  • It happened more often to environments where Kafka was under higher load. Maybe it has to do with change to use event interface, but that is just wild guess.
  • In the end I tried to set low max.poll.interval.ms but found out it happens again, which seems like it is still sending heartbeats, but for whatever reason stops pulling records, CPU drops down, memory increases and the consumer group does not re-balance.
  • After revert to use 0.34.0
Screenshot 2023-12-12 at 14 40 59 Screenshot 2023-12-12 at 14 40 33

Disclaimer: Running on the 0.34.0 for half day without it reproducing and it happened many times with 0.36.0. Would report back if it still happens on 0.34.0 and the origin is elsewhere.

zhrebicek avatar Dec 12 '23 14:12 zhrebicek

Thanks for reporting it, we'll investigate further. 0.35.0 does not contain the event api change if you wanna give it a try.

Can you provide some snippet of your code? What consumer are you using? Stream, base? the properties, any logs, etc.

scanterog avatar Dec 12 '23 15:12 scanterog

Confirming that I too encountered a similar issue, which resulted in significant increases in idle time being reported across the board, and some instances of what appeared to be an executor stall (almost like if some blocking calls were on the async path). I happened to be using a producer for this, with no consumers on my particular service, though I do use a long publish batching window of 1 second.

Downgrading from 0.36.0 to 0.35.0 did indeed resolve the issue.

neoeinstein avatar Dec 12 '23 18:12 neoeinstein

@zhrebicek @neoeinstein The event API does allow some transient errors to be propagated that the callback API did not. @scanterog brought this up to upstream here: https://github.com/confluentinc/librdkafka/issues/4493 . Is it possible that your code might have not handled these explicitly, leading to either the consumer or producer becoming idle?

This may manifest with KafkaError (Message consumption error: BrokerTransportFailure (Local: Broker transport failure)).

davidblewett avatar Dec 12 '23 19:12 davidblewett

We tested internally today the BaseProducer and BaseConsumer and we couldn't reproduce the issue. We'll try out tomorrow the FutureProducer and the StreamConsumer. Please provide the properties you're setting and any relevant logs. Are you also using the statically linked librdkafka or linking it dynamically?

scanterog avatar Dec 12 '23 23:12 scanterog

On our side, we are using FutureProducer. We also record the amount of time that it takes to confirm the publish, but move that off the main execution path by tokio::spawning a future wrapping that .send. In general, each instance of this service is publishing about 100 messages per second into rdkafka, with an average of about 40 such spawned tasks pending concurrently (within a 1 second batching window).

I did also confirm that we did not see any errors bubble up out of the .send call.

Edit to confirm the properties we set:

bootstrap.servers=<broker>
client.id=<identifier>
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.username=<username>
sasl.password=<password>
request.required.acks=-1
queue.buffering.max.ms=1000

On a side note, I do have an internal strongly-typed convention for adding these to the ClientConfig if that has any interest to you.

We have logging set to info, with the tracing feature enabled, but don't see any relevant logs get published up. That doesn't seem unusual, since we generally don't have any errors. We don't ask rdkafka to debug anything right now, though we could do something around that in the future.

Our rdkafka dependency is set up to statically link librdkafka and friends.

neoeinstein avatar Dec 12 '23 23:12 neoeinstein

We use rdkafka as is rdkafka = "0.34.0", running on debian-bookworm-slim. We use StreamConsumer, and settings:

client.id: <id>
enable.auto.commit: true
enable.auto.offset.store: false
enable.partition.eof: false
linger.ms: 100
enable.idempotence: true

@davidblewett We can look into handling explicitly the errors you speak about, but we basically construct new consumer on any error.

 loop {
        let consumer = Arc::new(try_create_consumer_from_config(
            kafka_config.consumer.clone(),
        )?);
        ...
        consumer.subscribe(&topics)?;

        match run(
            ... (consumer.stream ... inside -> anyhow::Result<()>)
        )
        .await
        {
            Ok(()) => break Ok(()),
            Err(error) => warn!(?error, "Processor failed"),
        }
    }

But all we have is this on rebalance

error:Store offset error: State (Local: Erroneous state)

Caused by:
State (Local: Erroneous state)

Edit: I see one of these couple of days back, but only one instance of it and I am not even sure it happened on 0.36.0

error:Meta data fetch error: BrokerTransportFailure (Local: Broker transport failure)

Caused by:
BrokerTransportFailure (Local: Broker transport failure)

zhrebicek avatar Dec 13 '23 07:12 zhrebicek

One addition we had no issues for producers (FuruteProducer), and we still have it in couple of services running for couple of days. So no need to test that @scanterog

Thanks a lot for looking into this!

zhrebicek avatar Dec 13 '23 07:12 zhrebicek

I went ahead and yanked the 0.36.0 release from crates.io until we figure out what's going on. The implementation is still present in git.

davidblewett avatar Dec 13 '23 18:12 davidblewett

I'm surprised we're seeing a working case of the producer with @zhrebicek and a non working one with @neoeinstein. I wonder what the differences would be here.

@zhrebicek I see you're setting enable.idempotence: True but this is only a producer setting (same for linger.ms). I see you set enable.auto.offset.store: false with auto commit set to true. Do you commit offsets via the StoreOffsets API?

@neoeinstein can you provide an exact snippet of what you're doing? I wasn't able to reproduce it as well with the FutureProducer.

scanterog avatar Dec 13 '23 18:12 scanterog

@scanterog We do it like this

consumer
        .stream()
        .take_until(token.wait_for_shutdown())
        .err_into::<anyhow::Error>()
        // Rest of pipe working with messages.
        // Last step before storing offsets is publishing converted data to other topic.
        .try_buffered(converter_config.max_in_flight)
        .try_for_each(|message| {
            let span = message.span();
            let consumer = consumer.clone();
            async move {
                consumer.store_offset(message.topic(), message.partition(), message.offset())?;
                Ok(())
            }.instrument(span)
        })
        .await

zhrebicek avatar Dec 15 '23 08:12 zhrebicek

@zhrebicek yeah that translates to stopping the stream entirely when an error is encountered. If a kafka cluster is under high load, then most likely you'll have a lot of requests retries that will be bubbled up as a transient errors. That's usually persistent which translates to the consumer not making progress at all and you'll be recreating the consumers constantly. The only thing that does not add up to me here is that I'd expect still the error to be logged everytime the stream stop and the consumer is recreated. Are you logging these events?

One potential solution is to stop forwarding those transient errors to the app (as with the callback API) but sadly rdkafka does not provide any api to identify those and will require to scan the rdkafka code base and filter those codes in our side.

scanterog avatar Dec 18 '23 13:12 scanterog

@scanterog we are logging all the errors, and there are no consequent errors after the thing is stuck, so it should not be recreating in loop.

From what I remember there were at most 3 consequent errors like this that would cause recreation of it all per pod and even with that other pods did not stuck.

zhrebicek avatar Dec 18 '23 13:12 zhrebicek

@zhrebicek I see. We haven't able to reproduce this so far. We have tests running for over 5 days and we have tried several operations on the cluster (rolling restarts, broker replacements, ungraceful shutdown). I'll try to setup a service using the pattern you've shared before and see whether we can get closer to understanding what's going on.

scanterog avatar Dec 18 '23 13:12 scanterog

@scanterog If I would be able to reproduce on staging I would deploy the version on purpose and enable rdkafka debug logs, but it never happened there. And it caused issues in production.

One note, maybe set high replication factor, so it has more work with acks

zhrebicek avatar Dec 18 '23 14:12 zhrebicek

@zhrebicek no worries! Totally get that. what RF are you using and what kafka version?

scanterog avatar Dec 18 '23 14:12 scanterog

@scanterog Version: confluent 6.1.0 (kafka 2.7.0 equivalent) Replication factor 2, in one env it was by mistake 4 and it happened there a bit more then in other envs.

zhrebicek avatar Dec 19 '23 10:12 zhrebicek

@zhrebicek would it be possible to run a shadow service (that send all reads to dev/null) with this version and debug level enabled (same for rdkafka logs) on the environment where you're getting this issue? Sadly we aren't able to reproduce this issue so there must be something specific we're missing.

@neoeinstein this also applies for the FutureProducer. If you can provide that information would be extremely useful for the project!

Thanks both!

scanterog avatar Dec 26 '23 16:12 scanterog

I will see what time allows me to try.

zhrebicek avatar Jan 02 '24 13:01 zhrebicek

Thanks @zhrebicek ! appreciate that

scanterog avatar Jan 02 '24 14:01 scanterog

Same problem here, took a few hours to diagnose that this was the issue. We're also seeing a bunch of "Too many open files" errors coming from the consumer threads, until it finishes with a segfault after ~1h30min. It seems pretty likely that this is due to https://github.com/fede1024/rust-rdkafka/pull/617. Would you please consider yanking the affected releases (and maybe have them only as release candidates) until this is fixed? (Also this deserves the bug tag)

Ten0 avatar Jan 13 '24 14:01 Ten0

@Ten0 until we have a way of reproducing the issue, we can't work on a fix. That commit does change the underlying implementation, but aside from some errors bubbling up that didn't before there shouldn't have been a change in behavior. It is also the same mechanism that the Confluent Kafka libraries use. This change has been in use and seeing a lot of traffic and we haven't observed the reported issue.

Are you sure you aren't accidentally swallowing some exceptions? We need a self contained example that triggers the behavior to investigate further.

davidblewett avatar Jan 13 '24 19:01 davidblewett

We need a self contained example that triggers the behavior to investigate further.

Unfortunately that is going to be too hard to extract for us as well :(

Our code looks like basically like this:

let stream: rdkafka::MessageStream<_> = stream_consumer.stream();
loop {
    let msg = stream.next().await?;
    // do stuff with the message
}

The librdkafka parameters are:

session.timeout.ms = "6000"
group.id = "some_group_id"
enable.auto.commit = "true"
auto.commit.interval.ms = "1000"
enable.auto.offset.store = "false"
enable.partition.eof = "false"
statistics.interval.ms = "1000"

We are handling ~300k lines/s dispatched on several servers. The issue happens with both Kafka v2.0 and v2.1.

Are you sure you aren't accidentally swallowing some exceptions?

Yes I am sure that we aren't accidentally dropping any error either returned by this library or printed to log (there is nothing of level Warn or greater in log). There is no message besides the ones related to too many open files, and those come from the dedicated threads without going through log.

there shouldn't have been a change in behavior. This change has been in use and seeing a lot of traffic and we haven't observed the reported issue.

From what I read in this topic there have been 3 different people encountering this precise issue (same blockage, same memory increase...) since this precise release: @zhrebicek @neoeinstein and myself. I still believe it is time to acknowledge that this issue indeed does exist and has a strong impact despite a minimal reproduction not being established yet.

Would you please consider yanking the affected releases (and maybe have them only as release candidates) until this is fixed? (Also this deserves the bug tag)

Ten0 avatar Jan 13 '24 20:01 Ten0

I'm not sure it is related, but upgrading to 0.36.0 makes topic deletion hang forever, if you have uncommitted messages JFYI

DoumanAsh avatar Jan 30 '24 09:01 DoumanAsh

I think I was able to reproduce the issue locally. Running high-level consumer with following timing-related settings:

max.poll.interval.ms=20000
session.timeout.ms=10000
auto.commit.interval.ms=5000
statistics.interval.ms=5000

After calling subscribe I am polling the consumer in a loop using recv. If I add a tokio::time::sleep of 7s between polls, 0.36.2 silently fails & just stops consuming, with a sleep of 6s it resumes. With 0.35.0 it keeps consuming, even using larger sleep durations (10s+).

My guess is that there's something wrong with the underlying librdkafka consumer not sending heartbeats.

On a side note, another issue I noticed is that since 0.36.2 the consumer starts printing verbose log messages if the debug setting is not left empty, not only from the custom client context (as it would be expected), but apparently also directly from librdkafka.

pi-xel avatar Feb 13 '24 09:02 pi-xel

Minimal example to reproduce the issue (can be included as-is in the crate's high-level consumer integration tests):

// All produced messages should be consumed.
#[tokio::test(flavor = "multi_thread")]
async fn test_produce_consume_delay() {
    let _r = env_logger::try_init();

    let start_time = current_time_millis();
    let topic_name = rand_test_topic("test_produce_consume_delay");
    let message_map = populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;
    let consumer = create_stream_consumer(&rand_test_group(), None);
    consumer.subscribe(&[topic_name.as_str()]).unwrap();

    for i in 0..10 {
        println!("awaiting message: {}", i);

        let message = consumer.recv().await;

        let timeout = tokio::time::sleep(std::time::Duration::from_millis(5000));

        println!("processing message: {}", i);

        match message {
            Ok(m) => {
                let id = message_map[&(m.partition(), m.offset())];
                match m.timestamp() {
                    Timestamp::CreateTime(timestamp) => assert!(timestamp >= start_time),
                    _ => panic!("Expected createtime for message timestamp"),
                };
                assert_eq!(m.payload_view::<str>().unwrap().unwrap(), value_fn(id));
                assert_eq!(m.key_view::<str>().unwrap().unwrap(), key_fn(id));
                assert_eq!(m.topic(), topic_name.as_str());
            }
            Err(e) => panic!("Error receiving message: {:?}", e),
        };

        timeout.await;
    }
}

On v0.35.0 this completes w/o issues, while on v0.36.2 it gets stuck after the first message w/o any error.

You can also try lowering the timeout of 5s, on my machine it still fails on 0.36.2 with a timeout of 1s and works with 500ms.

pi-xel avatar Feb 13 '24 11:02 pi-xel

Interestingly, the issue seems to be directly related to statistics.interval.ms:

If I bump it from 500 to 5000ms, the consumer keeps working for any poll delays at or below 5000ms and fails for values above.

pi-xel avatar Feb 13 '24 14:02 pi-xel

I just ran into this issue; might I suggest yanking all variants of the 0.36 branch until a proper patch lands? This will prevent others from unknowingly having issues.

jwilm avatar Apr 08 '24 23:04 jwilm

I don't think that's how yank is supposed to be used and I am sure it will cause even more problems for existing users. one example is libraries that depend on 0.36.

On Tue, Apr 9, 2024, at 01:09, Joe Wilm wrote:

I just ran into this issue; might I suggest yanking all variants of the 0.36 branch until a proper patch lands? This will prevent others from unknowingly having issues.

— Reply to this email directly, view it on GitHub https://github.com/fede1024/rust-rdkafka/issues/638#issuecomment-2043790462, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAGMPRIG45ZTB6W3V4ZWIF3Y4MPS3AVCNFSM6AAAAABARRFVNSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDANBTG44TANBWGI. You are receiving this because you are subscribed to this thread.Message ID: @.***>

untitaker avatar Apr 08 '24 23:04 untitaker

I don't think that's how yank is supposed to be used and I am sure it will cause even more problems for existing users. one example is libraries that depend on 0.36.

Libraries that depend on 0.36 might get a warning. But people starting a new project or upgrading their dependencies stacks might introduce this new bug in their software.

As said above, I've noticed similar behaviour using 0.36.2 in two separate projects that use a StreamConsumer. Getting some memory leaks after a long period of time (first leak started after 10 days of runtime).

Feel free to tell me if you want some information about my setup (configuration, topics setup, ...).

image

Will rollback to 0.35.0 to see if it fixes the issue.

scotow avatar Apr 09 '24 13:04 scotow

if i want to depend on a library that depends on 0.36, I cannot do that anymore, and some (if not most) libraries' CI does not check in their lockfiles. So I do think yanking will 100% break somebody's build when it doesn't have to. If you want to fix the onboarding experience you could re-publish 0.34 as 0.37.

On Tue, Apr 9, 2024, at 15:20, Lopez Benjamin wrote:

I don't think that's how yank is supposed to be used and I am sure it will cause even more problems for existing users. one example is libraries that depend on 0.36.

Libraries that depend on 0.36 might get a warning. But people starting a new project or upgrading their dependencies stacks might introduce this new bug in their software.

As said above, I've noticed similar behaviour using 0.36.2 in two separate projects that use a StreamConsumer. Getting some memory leaks after a long period of time (first leak started after 10 days of runtime).

Feel free to tell me if you want some information about my setup (configuration, topics setup, ...).

image.png (view on web) https://github.com/fede1024/rust-rdkafka/assets/7684550/530696b7-fcb2-4c58-b1a6-b1405a1cabef

Will rollback to 0.35.0 to see if it fixes the issue.

— Reply to this email directly, view it on GitHub https://github.com/fede1024/rust-rdkafka/issues/638#issuecomment-2045166214, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAGMPRIU7EPFEODN3EJNGMDY4PTK3AVCNFSM6AAAAABARRFVNSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDANBVGE3DMMRRGQ. You are receiving this because you are subscribed to this thread.Message ID: @.***>

untitaker avatar Apr 09 '24 14:04 untitaker