pulsar-rs icon indicating copy to clipboard operation
pulsar-rs copied to clipboard

Acknowledged messages are not removed from topic backlog

Open ciuncan opened this issue 4 years ago • 3 comments

We are using this library to connect to our pulsar topic from Rust, but noticed that backlog never decreases. After some investigation, we found out that, even though we call ack, the messages don't actually get ack'ed and causes the backlog to pile up. If we restart the application, it starts receiving the same message. This is pretty serious.

I noticed that if I add a small amount of sleep (like 10ms or so) acks are being properly sent (to the extent of my experiments).

Could it be related to https://github.com/wyyerd/pulsar-rs/issues/162, @Geal ?

Also the test code consumer::tests::consumer_dropped_with_lingering_acks somehow doesn't catch this behavior. When I run this test, I indeed observe that the message is removed from the temp topic created by the test. But if I add a return after the first ack call, the issue is reproduced consistently, the message is not removed from queue.

running 1 test
2021-10-13 17:02:21.543558 UTC  DEBUG   pulsar::connection_manager      ConnectionManager::connect(BrokerAddress { url: Url { scheme: "pulsar", cannot_be_a_base: false, username: "", password: None, host: Some(Domain("127.0.0.1")), port: Some(6650), path: "", query: None, fragment: None }, broker_url: "127.0.0.1:6650", proxy: false })
2021-10-13 17:02:21.544077 UTC  DEBUG   pulsar::connection      Connecting to pulsar://127.0.0.1:6650: 127.0.0.1:6650
2021-10-13 17:02:21.551680 UTC  INFO    pulsar::connection_manager      Connected n°-4928389175985698705 to pulsar://127.0.0.1:6650 in 7ms
2021-10-13 17:02:21.564847 UTC  DEBUG   pulsar::connection_manager      ConnectionManager::connect(BrokerAddress { url: Url { scheme: "pulsar", cannot_be_a_base: false, username: "", password: None, host: Some(Domain("127.0.0.1")), port: Some(6650), path: "", query: None, fragment: None }, broker_url: "localhost:6650", proxy: true })
2021-10-13 17:02:21.565006 UTC  DEBUG   pulsar::connection      Connecting to pulsar://127.0.0.1:6650: 127.0.0.1:6650
2021-10-13 17:02:21.573705 UTC  INFO    pulsar::connection_manager      Connected n°7463148176148534536 to localhost:6650 via proxy pulsar://127.0.0.1:6650 in 8ms
producer sends done
creating consumer
created consumer
2021-10-13 17:02:21.643655 UTC  DEBUG   pulsar::consumer        starting the consumer engine for topic consumer_dropped_with_lingering_acks_23850
got message: Payload { metadata: MessageMetadata { producer_name: "standalone-1-4", sequence_id: 0, publish_time: 1634144541610, properties: [], replicated_from: None, partition_key: None, replicate_to: [], compression: None, uncompressed_size: None, num_messages_in_batch: None, event_time: None, encryption_keys: [], encryption_algo: None, encryption_param: None, schema_version: None, partition_key_b64_encoded: None, ordering_key: None, deliver_at_time: None, marker_type: None, txnid_least_bits: None, txnid_most_bits: None, highest_sequence_id: None, null_value: None, uuid: None, num_chunks_from_msg: None, total_chunk_msg_size: None, chunk_id: None, null_partition_key: None }, data: [123, 34, 116, 111, 112, 105, 99, 34, 58, 34, 97, 49, 55, 122, 89, 48, 111, 81, 34, 44, 34, 109, 115, 103, 34, 58, 49, 125] }
test consumer::tests::consumer_dropped_with_lingering_acks ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 11 filtered out; finished in 0.11s

Output of pulsar-admin topics peek-messages --count 1 --subscription dropped_ack

consumer_dropped_with_lingering_acks_23850
Message ID: 1437:0
Tenants:
"publish-time    2021-10-13T17:02:21.61Z"
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 74 6f 70 69 63 22 3a 22 61 31 37 7a 59 30 |{"topic":"a17zY0|
|00000010| 6f 51 22 2c 22 6d 73 67 22 3a 31 7d             |oQ","msg":1}    |
+--------+-------------------------------------------------+----------------+

Adding tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; after the return will help the message to be removed from topic backlog:

17:18:44.935 [AsyncHttpClient-7-1] WARN  org.apache.pulsar.client.admin.internal.TopicsImpl - Exception 'Message not found' occurred while trying to peek Messages.

ciuncan avatar Oct 13 '21 17:10 ciuncan

Dropping and recreating the client also ensures that the messages are actually acknowledged.

ciuncan avatar Oct 13 '21 17:10 ciuncan

Hi, I noticed the same problem in our environment. Especially when there are many messages in the backlog (in my case ~300k).

I saw that in this case messages from engine_rx channel never gets selected, so I tried to rewrite the ConsumerEngine::<Exe>::engine function to use a supporting channel to merge message_rx and engine_rx channels.

This is how I updated the method https://github.com/dghilardi/pulsar-rs/blob/fix/ack-not-sent/src/consumer.rs#L855

In my case this solved the problem, and acknowledged messages are removed from the backlog.

Let me know if you want a pull request or prefer to handle the problem in a different way.

dghilardi avatar Sep 28 '22 12:09 dghilardi

Hello folks, I will take a look at this issue and pull request. Thank you for the help :)

FlorentinDUBOIS avatar Oct 10 '22 09:10 FlorentinDUBOIS