nuttx/msgq: add kernel message queue support
Summary
nuttx/msgq: add kernel message queue support
Currently NuttX have 2 message queue implementations:
- Posix Message Queue (mq_close/mq_getattr/mq_getsetattr/mq_notify/mq_open/mq_overview/mq_receive/mq_send/mq_setattr/mq_timedreceive/mq_timedsend/mq_unlink)
- System V Message Queue (msgctl/msgget/msggrep/msginit/msgmerge/msgop/msgrcv/msgsnd)
Posix/SysteV message queues meet the standard implementation, But there are various limitations for kernel developer:
- Depends on the file system, and message sending and receiving require file descriptors as handles, resulting in additional resource overhead and performance degradation
- Do not support static memory pool configuration, and use global shared memory pools, which will cause more uncertainty in some use case.
- Cannot support additional capabilities(such as "message peek")
So in this PR, we are planing to introduce the "nxmsgq" implementation to simplify the development in kernel space: (Compare with of Zephyr and FreeRTOS interfaces)
------------------------------------------------------------------------------
| NuttX | Zephyr | FreeRTOS |
|---------------------|-------------------------|----------------------------|
| nxmsgq_init | k_msgq_init | xQueueCreateStatic |
|---------------------|-------------------------|----------------------------|
| nxmsgq_create | k_msgq_alloc_init | xQueueCreate |
|---------------------|-------------------------|----------------------------|
| nxmsgq_destroy | k_msgq_cleanup | vQueueDelete |
|---------------------|-------------------------|----------------------------|
| nxmsgq_used | k_msgq_num_used_get | uxQueueMessagesWaiting |
|---------------------|-------------------------|----------------------------|
| nxmsgq_space | k_msgq_num_free_get | uxQueueSpacesAvailable |
|---------------------|-------------------------|----------------------------|
| nxmsgq_purge | k_msgq_purge | |
|---------------------|-------------------------|----------------------------|
| nxmsgq_ticksend | k_msgq_put | xQueueSend |
| nxmsgq_trysend | | xQueueSendFromISR |
| nxmsgq_send | | xQueueSend |
|---------------------|-------------------------|----------------------------|
| nxmsgq_tickrecv | k_msgq_get | xQueueReceive |
| nxmsgq_tryrecv | | xQueueReceiveFromISR |
| nxmsgq_recv | | xQueueReceive |
|---------------------|-------------------------|----------------------------|
| nxmsgq_tickpeek | k_msgq_peek | xQueuePeek |
| nxmsgq_trypeek | | xQueuePeekFromISR |
| nxmsgq_peek | | xQueuePeek |
|---------------------|-------------------------|----------------------------|
| nxmsgq_is_empty | | |
| nxmsgq_is_full | | |
------------------------------------------------------------------------------
Posix Open Test(mq) : loop: 1001:spending 0.13305000s
Kernel Open Test(nxmsgq) : loop: 1001:spending 0.6345000s (-52%)
Posix Recv Test(mq) : loop: 1001:spending 0.7884000s
Kernel Recv Test(nxmsgq) : loop: 1001:spending 0.6837000s (-13%)
Signed-off-by: chao an [email protected]
Impact
N/A
Testing
sim/nsh, Cortex-M55, test code as below
#include <nuttx/config.h>
#include <stdio.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <nuttx/msgq.h>
#include <nuttx/irq.h>
#include <errno.h>
/****************************************************************************
* Public Functions
****************************************************************************/
/****************************************************************************
* hello_main
****************************************************************************/
static void timespec_sub(struct timespec *dest,
struct timespec *ts1,
struct timespec *ts2)
{
dest->tv_sec = ts1->tv_sec - ts2->tv_sec;
dest->tv_nsec = ts1->tv_nsec - ts2->tv_nsec;
if (dest->tv_nsec < 0)
{
dest->tv_nsec += 1000000000;
dest->tv_sec -= 1;
}
}
void mq_open_test(void)
{
struct timespec result;
struct timespec start;
struct timespec end;
nxmsgq_t *msg;
mqd_t mq;
irqstate_t flags;
int loop = 0;
/* Posix Message Queue Test */
flags = enter_critical_section();
clock_gettime(CLOCK_MONOTONIC, &start);
while (loop++ < 1000)
{
mq = mq_open("test", O_RDWR | O_CREAT, 0644, NULL);
if (mq < 0) {
printf("mq_open fail: %d\n", mq);
break;
}
mq_close(mq);
}
clock_gettime(CLOCK_MONOTONIC, &end);
leave_critical_section(flags);
timespec_sub(&result, &end, &start);
printf("Posix Open Test : loop: %d:spending %lld.%lds\n", loop, result.tv_sec, result.tv_nsec);
/* Kernel Message Queue Test */
loop = 0;
flags = enter_critical_section();
clock_gettime(CLOCK_MONOTONIC, &start);
while (loop++ < 1000)
{
msg = nxmsgq_create(64, 8);
if (msg == NULL) {
printf("nxmsgq_create fail: %p\n", msg);
break;
}
nxmsgq_destroy(msg);
}
clock_gettime(CLOCK_MONOTONIC, &end);
leave_critical_section(flags);
timespec_sub(&result, &end, &start);
printf("Kernel Open Test : loop: %d:spending %lld.%lds\n", loop, result.tv_sec, result.tv_nsec);
}
mqd_t g_mq;
nxmsgq_t *g_msg;
void *message_thread(void *arg)
{
struct timespec result;
struct timespec start;
struct timespec end;
irqstate_t flags;
char tmp[64];
int loop = 0;
int ret;
unsigned int prio;
/* Posix Message Queue Test */
flags = enter_critical_section();
clock_gettime(CLOCK_MONOTONIC, &start);
while (loop++ < 1000)
{
ret = mq_receive(g_mq, tmp, 64, &prio);
if (ret < 0)
{
printf("mq_receive fail: %d, loop: %d, errno: %d\n", ret, loop, errno);
return NULL;
}
}
clock_gettime(CLOCK_MONOTONIC, &end);
leave_critical_section(flags);
timespec_sub(&result, &end, &start);
printf("Posix Recv Test : loop: %d:spending %lld.%lds\n", loop, result.tv_sec, result.tv_nsec);
/* Kernel Message Queue Test */
loop = 0;
flags = enter_critical_section();
clock_gettime(CLOCK_MONOTONIC, &start);
while (loop++ < 1000)
{
ret = nxmsgq_recv(g_msg, tmp);
if (ret < 0)
{
printf("nxmsgq_recv fail: %d, loop: %d\n", ret, loop);
return NULL;
}
}
clock_gettime(CLOCK_MONOTONIC, &end);
leave_critical_section(flags);
timespec_sub(&result, &end, &start);
printf("Kernel Recv Test : loop: %d:spending %lld.%lds\n", loop, result.tv_sec, result.tv_nsec);
return NULL;
}
void mq_sendrecv_test(void)
{
struct sched_param sparam;
pthread_attr_t tattr;
pthread_t pid;
char tmp[64];
int loop = 0;
int ret;
g_mq = mq_open("test", O_RDWR | O_CREAT, 0644, NULL);
if (g_mq < 0)
{
printf("mq_open fail: %d\n", g_mq);
return;
}
g_msg = nxmsgq_create(64, 8);
if (g_msg < 0)
{
printf("nxmsgq_create fail: %p\n", g_msg);
return;
}
pthread_attr_init(&tattr);
sparam.sched_priority = 200;
pthread_attr_setschedparam(&tattr, &sparam);
pthread_create(&pid, &tattr, message_thread, NULL);
while (loop++ < 1000)
{
ret = mq_send(g_mq, tmp, 64, 0);
if (ret < 0)
{
printf("mq_send fail: %d\n", ret);
return;
}
}
loop = 0;
while (loop++ < 1000)
{
ret = nxmsgq_send(g_msg, tmp);
if (ret < 0)
{
printf("nxmsgq_send fail: %d\n", ret);
return;
}
}
}
int main(int argc, FAR char *argv[])
{
mq_open_test();
mq_sendrecv_test();
return 0;
}
[Experimental Bot, please feedback here]
This PR does not fully meet the NuttX requirements. Here's why:
-
Impact is understated: The addition of a new message queue implementation has a significant impact. The PR author marks impact as N/A, which is incorrect. At minimum, the documentation impact should be YES, as new APIs require documentation. Compatibility may also be affected if existing code uses other message queue implementations and wants to switch. The performance impact also needs to be documented and justified. While a benchmark is provided, a more detailed analysis of the trade-offs would be beneficial.
-
Testing is incomplete: While tests are provided, they don't appear to cover all code paths. Edge cases such as queue full/empty conditions, timeout handling, and interrupt context usage (if supported) should be tested. More details about the test environment are also needed. "sim/nsh, Cortex-M55" isn't sufficient. Specific simulator details and the Cortex-M55 board/configuration used should be listed. The "before change" logs are missing entirely. These logs are crucial for demonstrating that the changes fix or improve an existing issue without introducing regressions.
-
Missing information: The summary mentions Zephyr and FreeRTOS. While useful for context, it doesn't explain why the NuttX-specific implementation is needed beyond some general statements. A more concrete comparison (e.g., a table showing feature support and limitations of each) would be helpful in justifying the design choices. The summary also lacks a clear description of how the message queue is implemented (e.g., data structures used, synchronization mechanisms).
To fully meet the requirements, the PR author needs to:
-
Expand the Impact section: Detail the documentation updates, potential compatibility issues, and the reasoning behind the performance trade-offs.
-
Improve the Testing section: Provide "before change" logs, test edge cases and error conditions, and specify the exact testing environment. Consider adding tests for each of the new API functions.
-
Clarify the Summary: Explain the implementation details of the message queue and provide a more structured comparison with existing NuttX message queue implementations as well as Zephyr and FreeRTOS. Justify the design choices and the need for a new implementation more convincingly. It's helpful to clarify whether this new implementation is meant to replace the existing POSIX and System V queues eventually, or if it's intended to coexist with them and serve different use cases.
I like the idea a lot, I was always wondering how NuttX did not already have this kernel internal message system (which does not depend on a file system). Like stated in the summary field, many RTOSes provide this and I think it is an extremely useful and powerful synchronization mechanism.
let me mark this PR as a draft first . I want to support the priority queue and zero-copy in this pull request.