concurrentqueue icon indicating copy to clipboard operation
concurrentqueue copied to clipboard

How to force fully remove the remaining data.

Open runmouse opened this issue 2 years ago • 4 comments

I did my own statistics when using concurrentqueue, and occasionally there was a problem that the remaining data could not be taken out of the queue. The detail is: After no more data enqueue, a small amount of data will remain in the concurrentqueue and cannot be dequeue. I use try_dequeue_bulk_from_producer() to do dequeue operation.

The reference log is as follows: enqueue_count:6131058 dequeue_count:6131047 left:11 size_approx : 11

Because enqueue_count and dequeue_count are two variables respectively, and they are both accumulated by a single thread respectively. So the data is accurate. And the num obtained by using size_approx() are also consistent.

Please tell me how to solve this situation. How to force fully remove the remaining data.

runmouse avatar Sep 26 '23 09:09 runmouse

This should not happen. Can you share example code that reproduces the issue?

cameron314 avatar Sep 26 '23 11:09 cameron314

The code is not allowed public. and tangled up with a lot of business logic。 I'am trying use try_dequeue_from_producer follow by example: https://github.com/cameron314/concurrentqueue/blob/master/samples.md#pump-until-empty

runmouse avatar Sep 26 '23 14:09 runmouse

This should not happen. Can you share example code that reproduces the issue?

The following code shows my use of concurrent queue. There is only one writing thread, there are NUM queues and NUM reading threads. Each reading thread only reads the corresponding queue. The writing thread writes data to all queues.

struct BiggerTraits : public moodycamel::ConcurrentQueueDefaultTraits
{
    static const size_t BLOCK_SIZE = 256;
};


int NUM = 5;

class DataLabel;
extern computerData(DataLabel data);

class Show {
public:
    Show() {
        // create concurrent queue and token
        for (int i = 0; i<NUM ; ++i) {
            mQueue.emplace_back(moodycamel::ConcurrentQueue<DataLabel, BiggerTraits >(1024*1024));
            mPtok.emplace_back(moodycamel::ProducerToken(mQueue.back()));
        }
        // consume threads
        for (int i = 0; i< NUM; ++i) {
            mThread.emplace_back(std::thread(&Show::dequeueData, this, i));
        }
        std::thread(&Show::enqueueData, this);
    }

    void dequeueData(int threadId) {
        while (mRuning) {
            //sleep(1);
            vector<DataLabel> items(32); 
            int num = mQueue[threadId].try_dequeue_bulk_from_producer(mPtok[threadId], items.begin(), 32);
            if (num == 0) {
                LOG(BaseLog, "fetch nothing from Queue %d" , threadId);
                usleep(1000000);
                continue;
            }
            LOG(BaseLog, "fetch %d data from Queue %d" , num, threadId);
            for (int pos=0; pos<num; ++pos) {
                dosomething(threadId, items[pos]);
            }
        }
    }

    void enqueueData() {
        // create and computer data
        int index = randvalue % NUM;
        DataLabel dataLabel(index);
        computerData(dataLabel);
        assert(mQueue[index].enqueue(mPtok[index], dataLabel));
    }


 
    bool mRuning; 
    std::vector<moodycamel::ConcurrentQueue<DataLabel, BiggerTraits > > mQueue;
    std::vector<moodycamel::ProducerToken> mPtok;
    ////thread
    std::vector<std::thread> mThread;

};

runmouse avatar Oct 08 '23 02:10 runmouse

Nothing stands out as problematic with this code (besides the call to enqueue within an assert). Can you add a main to this example that reproduces the issue?

cameron314 avatar Oct 13 '23 00:10 cameron314