atomic queue not always atomic
https://github.com/roboterclubaachen/xpcc/blob/develop/src/xpcc/architecture/driver/atomic/queue_impl.hpp#L132 I assume the queue is single producer and single consumer (doesn't say so in the docs but judging by the implementation). Even so there are potential problems in the improbable case that a user is pushing to the queue at lower priority and popping at high priority and sets the high priority interrupt manually it is not guaranteed that the high priority will actually see all the bytes in the queue.
Here some psydo code:
//queue is empty here MyUartTxQueue::push(3); //this gets inlined setInterrupt(UartTx); //this gets reordered by the optimizer into the middle of the body of push
//uart ISR is called but new value of head has not yet been flushed to ram so last byte will not be sent untill the next time something is sent over the uart.
Sorry for being pedantic, you guys do seem to have put a lot more effort into concurrency than many prominent libs out there.
Yes, single producer, single consumer. It's good to be pedantic when it comes to things like this!
That is a very good bug report. I have to refresh my memory about all the atomic stuff in xpcc. This was written a long time ago for the AVR and it does not have interrupt priorities, so this might have to be re-written.
The easiest fix would be to just hit it with a big hammer and make all the loads and stores of head and tail volatile. This is probably not going to you you anything in a typical situation because the work registers are going to be flushed any way. There is a potential for needlessly restricting the optimizer though in the case that the user pops multiple values next to each other. We are currently working on a range library which would potentially solve this inefficiency by providing a range based pop:
template<typename TOutputRange>
TOutputRange pop(TOutputRange out);
which would pop until either the output range is full or the queue is empty and returns the modified output range. This would allow us to do the most efficient thing possible in all cases. It also makes super slick looking syntax like:
Kvasir::copy(myUart,myQueue);
Before working on the queue here are a few questions of taste / suggestions. Feel free to shoot them down for any reason ;)
-
isEmptyis named wrong in my opinion because the queue can change size at any time (why else make it atomic). IfisEmptyis interrupted just before return then I get an old status, the queue is no longer empty, but the return value is true.wasEmptyis in my optinion a better name since you can never know what the status is but you can know what it was. -
isNearlyEmptyis in my opinion not flexible enough. What if I want to know if there are 4 or 5 items left. I would suggest implementing asizeWasfunction and leaving it up to the user. One could also give isNearlyEmpty a parameter with a default of 3. - separating
getandpopis really only needed when exceptions can be thrown. Since no one in their right mind is going to turn them on in an embedded project I think this is overkill. I would make pop either return the popped value or return a bool (success or not) and make the returned value available via an out parameter. Sincegetcannot fail I would have to callisEmptythengetand thenpopnow. Currently that is not going to cost me much because onlyisEmptyis loading using volatile, however this is actually potentially another bug. Think of:
void foo(){
while(myQueue.isEmpty()){
MyElement element = myQueue.get(); //do something with element in real code
myQueue.pop();
}
//queue is empty I think
waitForInterruptOrDoSomethingThatTakesForever();
} //changes to tail could be flushed to ram here or even later if foo is inlined
if we call foo with a full queue we may be surprised that during waitForInterruptOrDoSomethingThatTakesForever ISRs still see the queue as full even though we emptied it. To fix this we need more uses of volatile, however these come at a cost that the optimizer cannot remove if we do not merge the three functions into one. I would suggest:
void foo(){
MyElement element;
while(myQueue.pop(element)){
//do something with element it in real code
}
//queue is truly empty here and everyone sees it
waitForInterruptOrDoSomethingThatTakesForever();
}
- I would introduce a type wrapper called
Atomic::Integralwhich wraps the index type and only allows access vialoadandstoremember functions. This should force good coding behavior with regard to atomic semantics (and would have elided the bug that this issue concerns in the first place). - My tests on GCC generating code for Cortex M show
if (tmphead >= (N+1)) {
tmphead = 0;
}
turns into more code than
tmphead = tmphead%N;
especially if n is a power of 2. I would expect the optimizer to make the same code if decltype(tmphead) is unsigned but if its a signed type then it is clear why the code gen is different.
I can recommend everyone to read the following papers by Hans Boehm who was involved with the new memory model for C++11:
- Threads cannot be implemented as a library acm, pdf, slides
- Foundations of the C++ concurrency memory model acm, pdf
We might have to rethink all our concurrency primitives in light of these new C++11 capabilities.
Does anyone (@salkinium, @dergraaf or @odinthenerd) know how well the C++11 memory model is implemented for the cortex-m and avr port of gcc?
Is the atomic header available?