rust-rdkafka
rust-rdkafka copied to clipboard
Upgrade from 0.34.0 to 0.36.0 causes consumer to stop consuming but keep running
As the title says, after upgrade from 0.34.0
to 0.36.0
we started getting Kafka consumers that would stop consuming messages, start leaking memory and do not leave the consumer group.
- I do not have clear reproducer as it happened to use across multiple production environments in random fashion.
- And for example in our staging it never happened.
- It happened more often to environments where Kafka was under higher load. Maybe it has to do with change to use event interface, but that is just wild guess.
- In the end I tried to set low max.poll.interval.ms but found out it happens again, which seems like it is still sending heartbeats, but for whatever reason stops pulling records, CPU drops down, memory increases and the consumer group does not re-balance.
- After revert to use
0.34.0
Disclaimer: Running on the 0.34.0
for half day without it reproducing and it happened many times with 0.36.0
. Would report back if it still happens on 0.34.0
and the origin is elsewhere.
Thanks for reporting it, we'll investigate further. 0.35.0
does not contain the event api change if you wanna give it a try.
Can you provide some snippet of your code? What consumer are you using? Stream, base? the properties, any logs, etc.
Confirming that I too encountered a similar issue, which resulted in significant increases in idle time being reported across the board, and some instances of what appeared to be an executor stall (almost like if some blocking calls were on the async path). I happened to be using a producer for this, with no consumers on my particular service, though I do use a long publish batching window of 1 second.
Downgrading from 0.36.0
to 0.35.0
did indeed resolve the issue.
@zhrebicek @neoeinstein The event API does allow some transient errors to be propagated that the callback API did not. @scanterog brought this up to upstream here: https://github.com/confluentinc/librdkafka/issues/4493 . Is it possible that your code might have not handled these explicitly, leading to either the consumer or producer becoming idle?
This may manifest with KafkaError (Message consumption error: BrokerTransportFailure (Local: Broker transport failure))
.
We tested internally today the BaseProducer
and BaseConsumer
and we couldn't reproduce the issue. We'll try out tomorrow the FutureProducer
and the StreamConsumer
. Please provide the properties you're setting and any relevant logs. Are you also using the statically linked librdkafka or linking it dynamically?
On our side, we are using FutureProducer
. We also record the amount of time that it takes to confirm the publish, but move that off the main execution path by tokio::spawn
ing a future wrapping that .send
. In general, each instance of this service is publishing about 100 messages per second into rdkafka, with an average of about 40 such spawned tasks pending concurrently (within a 1 second batching window).
I did also confirm that we did not see any errors bubble up out of the .send
call.
Edit to confirm the properties we set:
bootstrap.servers=<broker>
client.id=<identifier>
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.username=<username>
sasl.password=<password>
request.required.acks=-1
queue.buffering.max.ms=1000
On a side note, I do have an internal strongly-typed convention for adding these to the ClientConfig if that has any interest to you.
We have logging set to info
, with the tracing
feature enabled, but don't see any relevant logs get published up. That doesn't seem unusual, since we generally don't have any errors. We don't ask rdkafka to debug
anything right now, though we could do something around that in the future.
Our rdkafka dependency is set up to statically link librdkafka
and friends.
We use rdkafka
as is rdkafka = "0.34.0"
, running on debian-bookworm-slim
.
We use StreamConsumer
, and settings:
client.id: <id>
enable.auto.commit: true
enable.auto.offset.store: false
enable.partition.eof: false
linger.ms: 100
enable.idempotence: true
@davidblewett We can look into handling explicitly the errors you speak about, but we basically construct new consumer on any error.
loop {
let consumer = Arc::new(try_create_consumer_from_config(
kafka_config.consumer.clone(),
)?);
...
consumer.subscribe(&topics)?;
match run(
... (consumer.stream ... inside -> anyhow::Result<()>)
)
.await
{
Ok(()) => break Ok(()),
Err(error) => warn!(?error, "Processor failed"),
}
}
But all we have is this on rebalance
error:Store offset error: State (Local: Erroneous state)
Caused by:
State (Local: Erroneous state)
Edit: I see one of these couple of days back, but only one instance of it and I am not even sure it happened on 0.36.0
error:Meta data fetch error: BrokerTransportFailure (Local: Broker transport failure)
Caused by:
BrokerTransportFailure (Local: Broker transport failure)
One addition we had no issues for producers (FuruteProducer
), and we still have it in couple of services running for couple of days. So no need to test that @scanterog
Thanks a lot for looking into this!
I went ahead and yanked the 0.36.0
release from crates.io until we figure out what's going on. The implementation is still present in git.
I'm surprised we're seeing a working case of the producer with @zhrebicek and a non working one with @neoeinstein. I wonder what the differences would be here.
@zhrebicek I see you're setting enable.idempotence: True
but this is only a producer setting (same for linger.ms). I see you set enable.auto.offset.store: false
with auto commit set to true. Do you commit offsets via the StoreOffsets API?
@neoeinstein can you provide an exact snippet of what you're doing? I wasn't able to reproduce it as well with the FutureProducer
.
@scanterog We do it like this
consumer
.stream()
.take_until(token.wait_for_shutdown())
.err_into::<anyhow::Error>()
// Rest of pipe working with messages.
// Last step before storing offsets is publishing converted data to other topic.
.try_buffered(converter_config.max_in_flight)
.try_for_each(|message| {
let span = message.span();
let consumer = consumer.clone();
async move {
consumer.store_offset(message.topic(), message.partition(), message.offset())?;
Ok(())
}.instrument(span)
})
.await
@zhrebicek yeah that translates to stopping the stream entirely when an error is encountered. If a kafka cluster is under high load, then most likely you'll have a lot of requests retries that will be bubbled up as a transient errors. That's usually persistent which translates to the consumer not making progress at all and you'll be recreating the consumers constantly. The only thing that does not add up to me here is that I'd expect still the error to be logged everytime the stream stop and the consumer is recreated. Are you logging these events?
One potential solution is to stop forwarding those transient errors to the app (as with the callback API) but sadly rdkafka does not provide any api to identify those and will require to scan the rdkafka code base and filter those codes in our side.
@scanterog we are logging all the errors, and there are no consequent errors after the thing is stuck, so it should not be recreating in loop.
From what I remember there were at most 3 consequent errors like this that would cause recreation of it all per pod and even with that other pods did not stuck.
@zhrebicek I see. We haven't able to reproduce this so far. We have tests running for over 5 days and we have tried several operations on the cluster (rolling restarts, broker replacements, ungraceful shutdown). I'll try to setup a service using the pattern you've shared before and see whether we can get closer to understanding what's going on.
@scanterog If I would be able to reproduce on staging
I would deploy the version on purpose and enable rdkafka
debug logs, but it never happened there. And it caused issues in production.
One note, maybe set high replication factor, so it has more work with acks
@zhrebicek no worries! Totally get that. what RF are you using and what kafka version?
@scanterog
Version: confluent 6.1.0 (kafka 2.7.0 equivalent)
Replication factor 2
, in one env it was by mistake 4 and it happened there a bit more then in other envs.
@zhrebicek would it be possible to run a shadow service (that send all reads to dev/null) with this version and debug level enabled (same for rdkafka logs) on the environment where you're getting this issue? Sadly we aren't able to reproduce this issue so there must be something specific we're missing.
@neoeinstein this also applies for the FutureProducer. If you can provide that information would be extremely useful for the project!
Thanks both!
I will see what time allows me to try.
Thanks @zhrebicek ! appreciate that
Same problem here, took a few hours to diagnose that this was the issue.
We're also seeing a bunch of "Too many open files" errors coming from the consumer threads, until it finishes with a segfault after ~1h30min.
It seems pretty likely that this is due to https://github.com/fede1024/rust-rdkafka/pull/617.
Would you please consider yanking the affected releases (and maybe have them only as release candidates) until this is fixed?
(Also this deserves the bug
tag)
@Ten0 until we have a way of reproducing the issue, we can't work on a fix. That commit does change the underlying implementation, but aside from some errors bubbling up that didn't before there shouldn't have been a change in behavior. It is also the same mechanism that the Confluent Kafka libraries use. This change has been in use and seeing a lot of traffic and we haven't observed the reported issue.
Are you sure you aren't accidentally swallowing some exceptions? We need a self contained example that triggers the behavior to investigate further.
We need a self contained example that triggers the behavior to investigate further.
Unfortunately that is going to be too hard to extract for us as well :(
Our code looks like basically like this:
let stream: rdkafka::MessageStream<_> = stream_consumer.stream();
loop {
let msg = stream.next().await?;
// do stuff with the message
}
The librdkafka parameters are:
session.timeout.ms = "6000"
group.id = "some_group_id"
enable.auto.commit = "true"
auto.commit.interval.ms = "1000"
enable.auto.offset.store = "false"
enable.partition.eof = "false"
statistics.interval.ms = "1000"
We are handling ~300k lines/s dispatched on several servers. The issue happens with both Kafka v2.0 and v2.1.
Are you sure you aren't accidentally swallowing some exceptions?
Yes I am sure that we aren't accidentally dropping any error either returned by this library or printed to log
(there is nothing of level Warn
or greater in log
).
There is no message besides the ones related to too many open files, and those come from the dedicated threads without going through log.
there shouldn't have been a change in behavior. This change has been in use and seeing a lot of traffic and we haven't observed the reported issue.
From what I read in this topic there have been 3 different people encountering this precise issue (same blockage, same memory increase...) since this precise release: @zhrebicek @neoeinstein and myself. I still believe it is time to acknowledge that this issue indeed does exist and has a strong impact despite a minimal reproduction not being established yet.
Would you please consider yanking the affected releases (and maybe have them only as release candidates) until this is fixed? (Also this deserves the bug tag)
I'm not sure it is related, but upgrading to 0.36.0 makes topic deletion hang forever, if you have uncommitted messages JFYI
I think I was able to reproduce the issue locally. Running high-level consumer with following timing-related settings:
max.poll.interval.ms=20000
session.timeout.ms=10000
auto.commit.interval.ms=5000
statistics.interval.ms=5000
After calling subscribe
I am polling the consumer in a loop using recv
. If I add a tokio::time::sleep
of 7s between polls, 0.36.2 silently fails & just stops consuming, with a sleep
of 6s it resumes. With 0.35.0 it keeps consuming, even using larger sleep
durations (10s+).
My guess is that there's something wrong with the underlying librdkafka
consumer not sending heartbeats.
On a side note, another issue I noticed is that since 0.36.2 the consumer starts printing verbose log messages if the debug
setting is not left empty, not only from the custom client context (as it would be expected), but apparently also directly from librdkafka
.
Minimal example to reproduce the issue (can be included as-is in the crate's high-level consumer integration tests):
// All produced messages should be consumed.
#[tokio::test(flavor = "multi_thread")]
async fn test_produce_consume_delay() {
let _r = env_logger::try_init();
let start_time = current_time_millis();
let topic_name = rand_test_topic("test_produce_consume_delay");
let message_map = populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;
let consumer = create_stream_consumer(&rand_test_group(), None);
consumer.subscribe(&[topic_name.as_str()]).unwrap();
for i in 0..10 {
println!("awaiting message: {}", i);
let message = consumer.recv().await;
let timeout = tokio::time::sleep(std::time::Duration::from_millis(5000));
println!("processing message: {}", i);
match message {
Ok(m) => {
let id = message_map[&(m.partition(), m.offset())];
match m.timestamp() {
Timestamp::CreateTime(timestamp) => assert!(timestamp >= start_time),
_ => panic!("Expected createtime for message timestamp"),
};
assert_eq!(m.payload_view::<str>().unwrap().unwrap(), value_fn(id));
assert_eq!(m.key_view::<str>().unwrap().unwrap(), key_fn(id));
assert_eq!(m.topic(), topic_name.as_str());
}
Err(e) => panic!("Error receiving message: {:?}", e),
};
timeout.await;
}
}
On v0.35.0 this completes w/o issues, while on v0.36.2 it gets stuck after the first message w/o any error.
You can also try lowering the timeout of 5s, on my machine it still fails on 0.36.2 with a timeout of 1s and works with 500ms.
Interestingly, the issue seems to be directly related to statistics.interval.ms
:
If I bump it from 500 to 5000ms, the consumer keeps working for any poll delays at or below 5000ms and fails for values above.
I just ran into this issue; might I suggest yanking all variants of the 0.36 branch until a proper patch lands? This will prevent others from unknowingly having issues.
I don't think that's how yank is supposed to be used and I am sure it will cause even more problems for existing users. one example is libraries that depend on 0.36.
On Tue, Apr 9, 2024, at 01:09, Joe Wilm wrote:
I just ran into this issue; might I suggest yanking all variants of the 0.36 branch until a proper patch lands? This will prevent others from unknowingly having issues.
— Reply to this email directly, view it on GitHub https://github.com/fede1024/rust-rdkafka/issues/638#issuecomment-2043790462, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAGMPRIG45ZTB6W3V4ZWIF3Y4MPS3AVCNFSM6AAAAABARRFVNSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDANBTG44TANBWGI. You are receiving this because you are subscribed to this thread.Message ID: @.***>
I don't think that's how yank is supposed to be used and I am sure it will cause even more problems for existing users. one example is libraries that depend on 0.36.
Libraries that depend on 0.36 might get a warning. But people starting a new project or upgrading their dependencies stacks might introduce this new bug in their software.
As said above, I've noticed similar behaviour using 0.36.2 in two separate projects that use a StreamConsumer
. Getting some memory leaks after a long period of time (first leak started after 10 days of runtime).
Feel free to tell me if you want some information about my setup (configuration, topics setup, ...).
Will rollback to 0.35.0 to see if it fixes the issue.
if i want to depend on a library that depends on 0.36, I cannot do that anymore, and some (if not most) libraries' CI does not check in their lockfiles. So I do think yanking will 100% break somebody's build when it doesn't have to. If you want to fix the onboarding experience you could re-publish 0.34 as 0.37.
On Tue, Apr 9, 2024, at 15:20, Lopez Benjamin wrote:
I don't think that's how yank is supposed to be used and I am sure it will cause even more problems for existing users. one example is libraries that depend on 0.36.
Libraries that depend on 0.36 might get a warning. But people starting a new project or upgrading their dependencies stacks might introduce this new bug in their software.
As said above, I've noticed similar behaviour using 0.36.2 in two separate projects that use a
StreamConsumer
. Getting some memory leaks after a long period of time (first leak started after 10 days of runtime).Feel free to tell me if you want some information about my setup (configuration, topics setup, ...).
image.png (view on web) https://github.com/fede1024/rust-rdkafka/assets/7684550/530696b7-fcb2-4c58-b1a6-b1405a1cabef
Will rollback to 0.35.0 to see if it fixes the issue.
— Reply to this email directly, view it on GitHub https://github.com/fede1024/rust-rdkafka/issues/638#issuecomment-2045166214, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAGMPRIU7EPFEODN3EJNGMDY4PTK3AVCNFSM6AAAAABARRFVNSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDANBVGE3DMMRRGQ. You are receiving this because you are subscribed to this thread.Message ID: @.***>