concurrentqueue icon indicating copy to clipboard operation
concurrentqueue copied to clipboard

How to drain a multi-producer queue properly?

Open mooskagh opened this issue 6 months ago • 1 comments

I struggle to figure out how to drain a queue properly. tl;dr Sentinel-based approach fails because sending the sentinel last doesn't guarantee it's read last.

I have a multi-producer queue. I have multiple consumers, however they (blockingly) dequeue under a mutex, so from the queue perspective it's single-consumer:

(This is simplified - I actually use wait_dequeue_bulk with multiple iterations per under single mutex lock, but the core logic is the same)

void consumer() {
  while (true) {
    Item item;
    {
       const std::lock_guard<std::mutex> lock(gMutex);
       wait_dequeue(&item);
    }
    process(item);
  }
}

Now, I want to drain the queue and exit gracefully.

My initial idea was to:

  1. Make sure producers are stopped.
  2. Add the sentinel to the queue (from the "main" thread rather than producer).
  3. When a consumer reads sentinel, it sets the global "exit_now" flag.
bool gExitNow = false;

void consumer() {
  while (true) {
    Item item;
    {
       const std::lock_guard<std::mutex> lock(gMutex);
       if (gExitNow) return;
       wait_dequeue(&item);
       if (item == kSentinel) {
         gExitNow = true;
         return;
       }
    }
    process(item);
  }
}

However, the documentation says that the order is not guaranteed in multiple-producer use case, so if there are some items in the queue when I add sentinel, it may happen that the sentinel would come up ahead of time and there are some other items in the queue.

So far I have a few options to work around it, but they don't look elegant.

  1. After stopping all producers and before sending the sentinel, busy wait until size_approx() == 0. Doesn't sound exiting, and also I'm not sure whether I can rely on size_approx() not returning 0 when there's something in the queue.

  2. Before sending sentinel, set another bool shutdown_mode to true. In shutdown_mode, consumers would start to read non-blockingly rather than blockingly, and exit if there's nothing to read. When a consumer reads sentinel, it skips it but doesn't return (it will return on the next iteration when discovers non-blockingly that there's nothing to read).

mooskagh avatar Jun 28 '25 11:06 mooskagh