rust-amqp icon indicating copy to clipboard operation
rust-amqp copied to clipboard

Silent failure to ack some messages

Open Keruspe opened this issue 7 years ago • 2 comments

I need to handle several message in parallel so I couldn't use a consumer (see #38), I'm thus using the GetIterator approach. I have some code that looks like this:

        loop {
            for get_result in channel.basic_get(&queue, false) {
                /* aquire a semaphore */
                thread::spawn(move || {
                  /* handle the message */
                  /* ack the message */
                  /* release the semapore */
                })
            }

            println!("No more messages, waiting for 60 seconds");
            thread::sleep(Duration::from_secs(60));
        }

My problem is quite simple, let's say that:

  • there's 100 messages in the queue atm
  • 5 threads are allowed to run in parallel

While there are messages in the queue, there are 6 unacked messages (the 5 in the active threads and one waiting for a free slot to start a new thread). When each thread has finished its jobs, the message is ack'ed properly and a new thread is started, a new messages is poped and is waiting for ack.

The problem appears when there are no more messages in the queue:

  • the five last messages are being handled in their threads, so they are in a "noack" state, waitin for their ack
  • the "for" loop exists (I guess that could be the cause of the problem), we enter the sleep, then reenter the "for" loop, etc
  • the ack() for the 5 messages are basically noops.

Now if some messages are published to the queue, we'll have 11 unacked instead of 6 as the 5 from earlier are never acked effectively.

Keruspe avatar Oct 28 '16 09:10 Keruspe

Ok, I slightly reworked this to have one channel per thread, now I have something that looks like

    loop {
       /* acquire a semaphore */

        let mut channel = session.open_channel(chan).expect("Failed to create channel");
        chan += 1;

        thread::spawn(move || {
            for get_result in channel.basic_get(&handler.conf.backup_queue, false) {
                /* handle message */
                /* ack message */
            }
            println!("No more messages, waiting for 60 seconds");
            thread::sleep(Duration::from_secs(60));
           /* release semaphore */
        })
    }

What I'm seeing now is just so awkward I cannot think of any reasonable explanation. Here is what the scenario I wrote above becomes:

  • there's 100 messages in the queue atm
  • 5 threads are allowed to run in parallel

While no new message get in the queue, there are 6 unacked messages (the 5 in the active threads and one waiting for a free slot to start a new thread). I thus see 94 ready and 6 unacked. The 100 are processed and I still see 94 ready 6 unacked in the rabbitmq cluster (if I stop the software it gets back to 100 ready), but no message is consumed anymore as the 100 have already been handled. Now, if 10 more messages gets in the queue, when they'll all have been handled, I get 94 ready 16 unacked.

I will try to provide a full code example for the two versions.

Keruspe avatar Nov 05 '16 18:11 Keruspe

I just pushed a test project here https://github.com/Keruspe/rust-amqp-bugs

You need to export AMQP_URL then run cargo run --bin publish-data to post some initial data. I did not really manage to reproduce anyting with get-then-thread, but in production there are lots of other factors (like the message handling can take up to several hours sometimes). You can run cargo run --bin thread-then-get though, and republish some data as above when there aren't any more. You'll see that some messages definitely don't get ack'ed. Sample output, after having pusblished data several times, then waited for the process to finish dealing with those:

keruspe@Lou ~/Projects/rust-amqp-bugs (git)-[master] % cargo run --bin get-then-thread    
    Finished debug [unoptimized + debuginfo] target(s) in 0.0 secs                        
     Running `target/debug/get-then-thread`                                               
New message: test 98                                                                      
New message: test 99                                                                      
New message: test 3                                                                       
New message: test 4                                                                       
New message: test 95                                                                      
New message: test 96                                                                      
New message: test 97                                                                      
New message: test 98                                                                      
New message: test 99                                                                      
New message: test 3                                                                       
New message: test 4                                                                       
New message: test 5                                                                       
New message: test 10                                                                      
New message: test 95                                                                      
New message: test 96                                                                      
New message: test 97                                                                      
New message: test 98                                                                      
New message: test 99                                                                      
New message: test 3                                                                       
New message: test 4                                                                       
New message: test 5                                                                       
New message: test 10                                                                      
New message: test 95                                                                      
New message: test 96                                                                      
New message: test 97                                                                      
New message: test 98                                                                      
New message: test 99                                                                      
No more messages, waiting for 6 seconds                                                   
^C                                                                                        
keruspe@Lou ~/Projects/rust-amqp-bugs (git)-[master] % cargo run --bin get-then-thread    
    Finished debug [unoptimized + debuginfo] target(s) in 0.0 secs                        
     Running `target/debug/get-then-thread`                                               
New message: test 3                                                                       
New message: test 4                                                                       
New message: test 96                                                                      
New message: test 97                                                                      
New message: test 98                                                                      
New message: test 99                                                                      
No more messages, waiting for 6 seconds                                                   
^C                                                                                        
keruspe@Lou ~/Projects/rust-amqp-bugs (git)-[master] % cargo run --bin get-then-thread    
    Finished debug [unoptimized + debuginfo] target(s) in 0.0 secs                        
     Running `target/debug/get-then-thread`                                               
New message: test 96                                                                      
New message: test 97                                                                      
New message: test 98                                                                      
New message: test 99                                                                      
No more messages, waiting for 6 seconds                                                   
^C                                                                                        
keruspe@Lou ~/Projects/rust-amqp-bugs (git)-[master] % cargo run --bin get-then-thread    
    Finished debug [unoptimized + debuginfo] target(s) in 0.0 secs                        
     Running `target/debug/get-then-thread`                                               
New message: test 96                                                                      
New message: test 97                                                                      
New message: test 98                                                                      
New message: test 99                                                                      
No more messages, waiting for 6 seconds                                                   
^C                                                                                        
keruspe@Lou ~/Projects/rust-amqp-bugs (git)-[master] % cargo run --bin get-then-thread    
    Finished debug [unoptimized + debuginfo] target(s) in 0.0 secs                        
     Running `target/debug/get-then-thread`                                               
New message: test 96                                                                      
New message: test 97                                                                      
New message: test 98                                                                      
New message: test 99                                                                      
No more messages, waiting for 6 seconds                                                   
^C                                                                                        
keruspe@Lou ~/Projects/rust-amqp-bugs (git)-[master] % cargo run --bin get-then-thread    
    Finished debug [unoptimized + debuginfo] target(s) in 0.0 secs                        
     Running `target/debug/get-then-thread`                                               
New message: test 96                                                                      
New message: test 97                                                                      
New message: test 98                                                                      
New message: test 99                                                                      
No more messages, waiting for 6 seconds                                                   

You can see that some random messages don't get ack'ed and the last 4 never get ack'ed

Keruspe avatar Nov 05 '16 20:11 Keruspe