pulsar-rs
pulsar-rs copied to clipboard
Acknowledged messages are not removed from topic backlog
We are using this library to connect to our pulsar topic from Rust, but noticed that backlog never decreases. After some investigation, we found out that, even though we call ack, the messages don't actually get ack'ed and causes the backlog to pile up. If we restart the application, it starts receiving the same message. This is pretty serious.
I noticed that if I add a small amount of sleep (like 10ms or so) acks are being properly sent (to the extent of my experiments).
Could it be related to https://github.com/wyyerd/pulsar-rs/issues/162, @Geal ?
Also the test code consumer::tests::consumer_dropped_with_lingering_acks somehow doesn't catch this behavior. When I run this test, I indeed observe that the message is removed from the temp topic created by the test. But if I add a return after the first ack call, the issue is reproduced consistently, the message is not removed from queue.
running 1 test
2021-10-13 17:02:21.543558 UTC DEBUG pulsar::connection_manager ConnectionManager::connect(BrokerAddress { url: Url { scheme: "pulsar", cannot_be_a_base: false, username: "", password: None, host: Some(Domain("127.0.0.1")), port: Some(6650), path: "", query: None, fragment: None }, broker_url: "127.0.0.1:6650", proxy: false })
2021-10-13 17:02:21.544077 UTC DEBUG pulsar::connection Connecting to pulsar://127.0.0.1:6650: 127.0.0.1:6650
2021-10-13 17:02:21.551680 UTC INFO pulsar::connection_manager Connected n°-4928389175985698705 to pulsar://127.0.0.1:6650 in 7ms
2021-10-13 17:02:21.564847 UTC DEBUG pulsar::connection_manager ConnectionManager::connect(BrokerAddress { url: Url { scheme: "pulsar", cannot_be_a_base: false, username: "", password: None, host: Some(Domain("127.0.0.1")), port: Some(6650), path: "", query: None, fragment: None }, broker_url: "localhost:6650", proxy: true })
2021-10-13 17:02:21.565006 UTC DEBUG pulsar::connection Connecting to pulsar://127.0.0.1:6650: 127.0.0.1:6650
2021-10-13 17:02:21.573705 UTC INFO pulsar::connection_manager Connected n°7463148176148534536 to localhost:6650 via proxy pulsar://127.0.0.1:6650 in 8ms
producer sends done
creating consumer
created consumer
2021-10-13 17:02:21.643655 UTC DEBUG pulsar::consumer starting the consumer engine for topic consumer_dropped_with_lingering_acks_23850
got message: Payload { metadata: MessageMetadata { producer_name: "standalone-1-4", sequence_id: 0, publish_time: 1634144541610, properties: [], replicated_from: None, partition_key: None, replicate_to: [], compression: None, uncompressed_size: None, num_messages_in_batch: None, event_time: None, encryption_keys: [], encryption_algo: None, encryption_param: None, schema_version: None, partition_key_b64_encoded: None, ordering_key: None, deliver_at_time: None, marker_type: None, txnid_least_bits: None, txnid_most_bits: None, highest_sequence_id: None, null_value: None, uuid: None, num_chunks_from_msg: None, total_chunk_msg_size: None, chunk_id: None, null_partition_key: None }, data: [123, 34, 116, 111, 112, 105, 99, 34, 58, 34, 97, 49, 55, 122, 89, 48, 111, 81, 34, 44, 34, 109, 115, 103, 34, 58, 49, 125] }
test consumer::tests::consumer_dropped_with_lingering_acks ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 11 filtered out; finished in 0.11s
Output of pulsar-admin topics peek-messages --count 1 --subscription dropped_ack
consumer_dropped_with_lingering_acks_23850
Message ID: 1437:0
Tenants:
"publish-time 2021-10-13T17:02:21.61Z"
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 74 6f 70 69 63 22 3a 22 61 31 37 7a 59 30 |{"topic":"a17zY0|
|00000010| 6f 51 22 2c 22 6d 73 67 22 3a 31 7d |oQ","msg":1} |
+--------+-------------------------------------------------+----------------+
Adding tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; after the return will help the message to be removed from topic backlog:
17:18:44.935 [AsyncHttpClient-7-1] WARN org.apache.pulsar.client.admin.internal.TopicsImpl - Exception 'Message not found' occurred while trying to peek Messages.
Dropping and recreating the client also ensures that the messages are actually acknowledged.
Hi, I noticed the same problem in our environment. Especially when there are many messages in the backlog (in my case ~300k).
I saw that in this case messages from engine_rx channel never gets selected, so I tried to rewrite the ConsumerEngine::<Exe>::engine function to use a supporting channel to merge message_rx and engine_rx channels.
This is how I updated the method https://github.com/dghilardi/pulsar-rs/blob/fix/ack-not-sent/src/consumer.rs#L855
In my case this solved the problem, and acknowledged messages are removed from the backlog.
Let me know if you want a pull request or prefer to handle the problem in a different way.
Hello folks, I will take a look at this issue and pull request. Thank you for the help :)