Restarting pulsar brokers or topic load shedding can cause internal channel errors that breaks the Consumer
Description
We have seen inconsistent delays and "stuck" messages in a prototype system we're developing. After a significant amount of time debugging it, I was able to isolate what appears to be a bug in pulsar-rs that matches the logs I'm seeing in the real service.
During restarts of the pulsar cluster (due to k8 node pool updates), we see logs similar to these from the public repo included down below:
2025-12-11T16:39:38.575199Z ERROR pulsar::consumer::engine: could not close consumer Some("delivery-consumer")(4) for topic persistent://example/delivery/notifications-retries-partition-1: Disconnected
2025-12-11T16:39:38.576263Z WARN pulsar::retry_op: Retry #0 -> connecting consumer 4 using connection f60e9405-c90e-415e-ab9e-fcf612431a54 to broker pulsar://broker-1:6650 to topic persistent://example/delivery/notifications-retries-partition-1
2025-12-11T16:39:38.781142Z ERROR pulsar::consumer::engine: tx returned SendError { kind: Disconnected }
2025-12-11T16:39:38.781268Z ERROR pulsar::consumer::engine: cannot send a message from the consumer engine to the consumer(9), stopping the engine
2025-12-11T16:39:38.782559Z WARN pulsar::consumer::engine: rx terminated
2025-12-11T16:39:38.800551Z ERROR pulsar::consumer::engine: tx returned SendError { kind: Disconnected }
2025-12-11T16:39:38.800602Z ERROR pulsar::consumer::engine: cannot send a message from the consumer engine to the consumer(6), stopping the engine
2025-12-11T16:39:38.802778Z WARN pulsar::consumer::engine: rx terminated
2025-12-11T16:39:39.074912Z ERROR pulsar::consumer::engine: Error sending event to channel - send failed because receiver is gone
2025-12-11T16:39:39.074918Z ERROR pulsar::consumer::engine: Error sending event to channel - send failed because receiver is gone
2025-12-11T16:39:39.074961Z WARN pulsar::consumer::engine: rx terminated
2025-12-11T16:39:39.074974Z WARN pulsar::consumer::engine: rx terminated
It seems the internal channel used in pulsar-rs gets closed when topics are moved between pulsar brokers. After that point, we don't seem to get any new messages on subscribed topics until we restart our consumer.
In our case, this wasn't detected in our staging for a two weeks, and once we restarted it we received two weeks of traffic immediately.
From what I can tell, there is no public bubbling up of this error, it can only be seen in the tracing logs included above.
The logs come from here:
https://github.com/streamnative/pulsar-rs/blob/4043d2284afe3f41e6876818d1a2fbe4857aa475/src/consumer/engine.rs#L262-L268
potentially from here:
https://github.com/streamnative/pulsar-rs/blob/4043d2284afe3f41e6876818d1a2fbe4857aa475/src/consumer/engine.rs#L572-L575
Steps to Reproduce
I was able to reproduce what appears to be the same issue two ways:
- Restarting one or more brokers while a consumer is attached
- Overloading a broker so that the partitioned topic is load shed to another broker
As the steps to reproduce are non-trivial and our system not open source, I have written up a sample showing the issue.
Clone https://github.com/chamons/pulsar-load-shed-repro and follow the readme instructions/source code
FWIW, we are experiencing a similar error upon broker rolls/restarts where certain partitions (of a partitioned topic, obviously) will stop consuming. The error does not appear to bubble up such that it can be handled and we are left with partitions that remain unconsumed and shut down, thus accumulating backlog. The open issue here is discussing channel errors, but it all feels related (and perhaps we may run into these channel issues as well).
pulsar::consumer::multi: Unexpected error consuming from pulsar topic app/ns/my_topic-partition-0: service discovery error: Connection error: fatal error when connecting to the Pulsar server
@darinspivey - I'd have to see the full logs, but given my current understanding of the issue I was seeing, I pretty strongly suspect they are the same. That connection error is likely bubbling up to the internal error task and killing it (?).
Can you look to see if you can find something like:
cannot send a message from the consumer engine to the consumer(6), stopping the engine
hey @chamons, thanks for chiming in. yes, I do see 3 of those errors during the times where I was restarting our brokers. It doesn't correspond with the time where I saw my consumer getting removed, though. Honestly, there's probably a few issues around making sure partitioned topics stay correctly connected, so I may have to open one for my case.
I traced my error back to the consumer::multi::poll_next function which calls remove_consumers when it encounters an error. It doesn't bubble anything up, and it seems that even check_connections would then only look at the remaining connections in the BTreeMap. These things need to bubble up and/or offer retries.
It doesn't correspond with the time where I saw my consumer getting removed, though. Honestly, there's probably a few issues around making sure partitioned topics stay correctly connected, so I may have to open one for my case.
The problem is that once that log happens, from what I can tell the associated consumer is a dead man walking and is corrupted in such a way that it no long consistently (?) works/works at all? This is hard to figure out when it is inside a MultiConsumer for example.
These things need to bubble up and/or offer retries.
Absolutely agree. I looked into how difficult it would be to fix and have given up for now, the multi consumer case seemed non-trivial.
I am looking into how difficult this would be to fix, and I have a unit test that I think shows the same behavior.
I don't see all of the same logs, ~possibly because it is a stand alone single pulsar and not a cluster,~ (I tested this on a cluster, and same behavior) but it still shows a "this should consume easily" that breaks when you unload the topic (and works when you comment out the reqwest put).
I've also determined by this diff that it isn't just "reconnect is broken", as expected that seems to work just fine.
Ok, it looks like my original potential unit test yesterday doesn't fail for the reason I think it did. It fails due to another bug, this one in the producer:
https://github.com/streamnative/pulsar-rs/issues/378
I will get a new test case up tomorrow and keep looking.
Ok, after looking into this more, it seems like my load shed example itself ran into the Topic load shedding causes Producers (including built in one in connection object) to fail until you manually close them after error issues I found when looking into creating a unit test.
When I patched the sample project to unconditionally close producers after any error, it starts working.
Now that doesn't match what I'm seeing in production, where a consumer got stuck still. Going to look into it more to see if I can reproduce it with the stress tester again or if it something else like cancel safety?.
So I'm not convinced yet there isn't a consumer hang, since I saw it in prod, but I currently now don't have a sample project.
I've updated the title and description, as I am no longer certain at all if load shedding can trigger this issue. It can absolutely cause https://github.com/streamnative/pulsar-rs/issues/378 which is what my test case was hitting when I unloaded topics. However, when I worked that issue by unconditionally closing busted producers then that use case seems to work.
However, if chain restart brokers, I still can get into the case described above. I really doubt I'll be able to make a unit test that restarts a brokers (that would be a disaster to run in parallel with any other test).
Update on my update 1 hour ago, I have it pinned down to the specific lines of code. It appears in some error conditions your MultiTopicConsumer gets topics removed from it, causing you to no longer listen to them.
See "Technical Details" section @darinspivey