concurrentqueue icon indicating copy to clipboard operation
concurrentqueue copied to clipboard

Inconsistent order of enqueue and try_dequeue

Open YeahhhhLi opened this issue 2 years ago • 13 comments

Two threads: one thread continuously generates data per 10ms about and enqueue(); the other thread continuously reads data and processes it. If the queue is empty, it will sleep for 20ms,

but the data obtained by try_dequeue is messy

the code like this:

// thread 1
void SendMessage(message) {
  // message 1:  id = 1
  // message 2:  id = 2
  // message 2:  id = 3
  message_queue_.enqueue(message);
}

// thread 2
void ReceiveMessage() {
   while (is_running_ || message_queue_.size_approx() > 0) {
    if (message_queue_.size_approx() <= 0) {
      std::this_thread::sleep_for(std::chrono::milliseconds(20));
      continue;
    }
    CHECK(message_queue_.try_dequeue(message));
    // message 1: id = 3
    // message 2: id = 2
    //  message 3: id = 1
  }
}

So what is the reason for this phenomenon?

YeahhhhLi avatar Sep 21 '22 15:09 YeahhhhLi

That shouldn't happen. Please provide a more complete example program. Are you sure only one thread is enqueuing?

cameron314 avatar Sep 22 '22 01:09 cameron314

That shouldn't happen. Please provide a more complete example program. Are you sure only one thread is enqueuing?

We use this queue in a time-driven (10ms interval) module, and the underlying scheduling uses coroutine. I don't know if it is related to this. I added the debug information and found that the tid of the sent message is indeed different, but only one tid generated message is queued at the same time

YeahhhhLi avatar Sep 22 '22 05:09 YeahhhhLi

That explains it then -- different threads will be mapped to different sub-queues internally, with no guarantee of any ordering between them (see the README).

Since you seem to have a single logical thread of execution, you can use a producer token when enqueueing, which will guarantee that all elements end up in the same sub-queue.

Also note that since this appears to be an SPSC use case, consider using my ReaderWriterQueue instead, which is far simpler and can result in less overhead.

cameron314 avatar Sep 22 '22 13:09 cameron314

That explains it then -- different threads will be mapped to different sub-queues internally, with no guarantee of any ordering between them (see the README).

Since you seem to have a single logical thread of execution, you can use a producer token when enqueueing, which will guarantee that all elements end up in the same sub-queue.

Also note that since this appears to be an SPSC use case, consider using my ReaderWriterQueue instead, which is far simpler and can result in less overhead.

OK, thx~

YeahhhhLi avatar Sep 23 '22 06:09 YeahhhhLi

@cameron314 I i have the similiar issues here. I was using block concurrent queue as a MPSC queue. Multiple coroutines will keep enqueue elements with unique id, and a single pthread consumer will keep using wait_dequeue_bulk_timed to retrieve batch of elements and process them.

I found that sometimes in a highly concurrent environment, i will insert some items into queue and never managed to pop it back. So i wonder could that issue casued by starvation, since in those concurrent environment, if we cann't ensure FIFO, there might be new items keep on adding in and old items will never get consumed.

Or maybe queue has lost some data.

Really appreciated if you can help me, this confused me for many days.

ysj1173886760 avatar Apr 26 '23 06:04 ysj1173886760

Data loss should not be possible.

Interestingly, starvation is possible, as dequeue is not fair. However, using the version that accepts a consumer token should be fair in this scenario.

Please note that unfortunately, this queue's implementation has MPSC as its worst case for global ordering and performance.

cameron314 avatar Apr 26 '23 11:04 cameron314

That you for answering.

I'm not sure how to dispatch token in MSPC queue to ensure there is no starvation.

Will that be ok if i give each producer a producer token, and a consumer token for consumer. Does the queue guarantees that no producer will get starvation?

Also, since i was using coroutine, multiple producers are dynamic. i.e. i was keep on destroying and creating producer. In that case, is that ok if i always allocate new producer token for new coroutine, or just allocate producer token for every pthread worker under the hood.

ysj1173886760 avatar Apr 26 '23 11:04 ysj1173886760

Probably best to avoid short-lived producer tokens. But using a consumer token will cause the consumer to cycle between the inner producer sub-queues. Without the token, the consumer just picks the first sub-queue that looks "good enough" without checking them all.

cameron314 avatar Apr 26 '23 11:04 cameron314

It sounds that i don't need producer token but only a consumer token?

What happens to producers if i've add a producer token

ysj1173886760 avatar Apr 26 '23 11:04 ysj1173886760

Right. You can use just producer tokens, just consumer tokens, neither, or both.

Creating/destroying a producer token has significant overhead. Instead of churning producer tokens, consider using a thread-local (dispatcher-local?) producer token, or none at all.

cameron314 avatar Apr 26 '23 11:04 cameron314

Cool, that helps a lot.

ysj1173886760 avatar Apr 26 '23 11:04 ysj1173886760

Much appreciated, i will do some experiments and see whether it has solved my problem, thanks a lot.

ysj1173886760 avatar Apr 26 '23 11:04 ysj1173886760

By adding just a consumer token solved my problem, no starvation anymore!

ysj1173886760 avatar Apr 27 '23 07:04 ysj1173886760