s_task
s_task copied to clipboard
libuv未计算chan等待状态导致程序提前结束
现象为多个send_task发送消息到recv_task,由于chan空间设置太小,导致部分send_task进入等待态,而recv_task在收到第一个消息时正常退出并设置全局变量控制所有send_task退出。此时,剩余send_task不在全局变量的active_task上且由于没有定时器、没有uv的handles和req,uv正常结束流程,导致剩余等待状态的send_task没有正常退出。
不知道这是个bug还是正常使用不会出现这种情况?
可否上代码看下?
源代码如下:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "s_task.h"
#include <inttypes.h>
typedef uint8_t u8;
typedef uint16_t u16;
typedef uint32_t u32;
typedef uint64_t u64;
typedef int8_t i8;
typedef int16_t i16;
typedef int32_t i32;
typedef int64_t i64;
typedef struct chan_info{
u32 timeout;
u32 count;
void *stack[32*1024];
} chan_info_t;
#define PUT_TIME_OUT 3
#define CLOSE_DELAY_TIME 10
#define CHAN_SIZE 3
//s_event_t g_event;
void *g_stack_main[512*1024];
//void *g_stack0[512*1024];
//void *g_stack1[512*1024];
void* g_stack_action_chan[64 * 1024];
s_chan_declare(g_chan, chan_info_t, CHAN_SIZE);
static struct chan_info chan_table[] = {
{.timeout = PUT_TIME_OUT, .count = 1},
{.timeout = PUT_TIME_OUT, .count = 2},
{.timeout = PUT_TIME_OUT, .count = 3},
{.timeout = PUT_TIME_OUT, .count = 4},
{.timeout = PUT_TIME_OUT, .count = 5},
{.timeout = PUT_TIME_OUT, .count = 6},
{.timeout = PUT_TIME_OUT, .count = 7},
};
bool g_closed = false;
void put_task(__async__, void *arg) {
chan_info_t *chan_entry = (chan_info_t *)arg;
while (!g_closed) {
printf("put %d\n", chan_entry->count);
s_chan_put(__await__, g_chan, chan_entry);
s_task_sleep(__await__, chan_entry->timeout);
}
printf("done.\n");
}
void get_chan_task(__async__, void* arg) {
chan_info_t chan_entry;
while(!g_closed) {
s_chan_get(__await__, g_chan, &chan_entry);
g_closed = true;
printf("get %d\n", chan_entry.count);
s_task_sleep(__await__, CLOSE_DELAY_TIME);
}
}
void main_task(__async__, void *arg) {
int i;
s_chan_init(g_chan, chan_info_t, CHAN_SIZE);
s_task_create(g_stack_action_chan, sizeof(g_stack_action_chan), get_chan_task, NULL);
for(i = 0; i < sizeof(chan_table) / sizeof(struct chan_info); ++i) {
s_task_create(chan_table[i].stack, sizeof(chan_table[i].stack), put_task, (void *)&chan_table[i]);
}
for(i = 0; i < sizeof(chan_table) / sizeof(struct chan_info); ++i) {
s_task_join(__await__, chan_table[i].stack);
}
}
int main(int argc, char *argv[]) {
uv_loop_t *loop = uv_default_loop();
s_task_init_system(loop);
s_task_create(g_stack_main, sizeof(g_stack_main), main_task, (void *)(size_t)argc);
uv_run(loop, UV_RUN_DEFAULT);
return 0;
}
上面的代码运行结果为:
$ ./chan_test
put 1
put 2
put 3
put 4
put 5
put 6
put 7
get 1
done.
done.
done.
done.
修改chan大小为32(超过put_task的数量),运行结果为:
$ ./chan_test
put 1
put 2
put 3
put 4
put 5
put 6
put 7
get 1
done.
done.
done.
done.
done.
done.
done.
根据结果推断当chan大小小于put_task数量时,有一部分put_task没有正常退出,不知道该问题是不是bug?
补充一下,这个问题是因为 ”接收任务“ 设置 g_closed = true 先退出了, 如果 ”发送任务“ 的队列已满,并且系统也没有更多的任务在运行中(或sleep中任务的情况下), 队列不可能被消费了,这时有三种处理方式 ---
- 发送任务一直等着
- 发送任务直接取消
- 发送任务的发送函数返回失败
方式3比较合理,然而目前代码选的方式2,只要保障chan中的消息有人收就没问题。
是的,只要有收的就没有问题,所以不确定是不是bug,就来问一下。
更新了,没有其他任务可以唤醒时,s_chan_put返回失败,输出如下
put 1
put 2
put 3
put 4
put 5
put 6
put 7
get 1
done.
done.
done.
done.
cancelled at line 55
done.
cancelled at line 55
done.
cancelled at line 55
done.
测试代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "s_task.h"
#include <inttypes.h>
typedef uint8_t u8;
typedef uint16_t u16;
typedef uint32_t u32;
typedef uint64_t u64;
typedef int8_t i8;
typedef int16_t i16;
typedef int32_t i32;
typedef int64_t i64;
typedef struct chan_info{
u32 timeout;
u32 count;
void *stack[32*1024];
} chan_info_t;
#define PUT_TIME_OUT 3
#define CLOSE_DELAY_TIME 10
#define CHAN_SIZE 3
//s_event_t g_event;
void *g_stack_main[512*1024];
//void *g_stack0[512*1024];
//void *g_stack1[512*1024];
void* g_stack_action_chan[64 * 1024];
s_chan_declare(g_chan, chan_info_t, CHAN_SIZE);
static struct chan_info chan_table[] = {
{.timeout = PUT_TIME_OUT, .count = 1},
{.timeout = PUT_TIME_OUT, .count = 2},
{.timeout = PUT_TIME_OUT, .count = 3},
{.timeout = PUT_TIME_OUT, .count = 4},
{.timeout = PUT_TIME_OUT, .count = 5},
{.timeout = PUT_TIME_OUT, .count = 6},
{.timeout = PUT_TIME_OUT, .count = 7},
};
bool g_closed = false;
void put_task(__async__, void *arg) {
chan_info_t *chan_entry = (chan_info_t *)arg;
while (!g_closed) {
int ret;
printf("put %d\n", chan_entry->count);
ret = s_chan_put(__await__, g_chan, chan_entry);
if(ret != 0) {
printf("cancelled at line %d\n", __LINE__);
break;
}
ret = s_task_sleep(__await__, chan_entry->timeout);
if(ret != 0) {
printf("cancelled at line %d\n", __LINE__);
break;
}
}
printf("done.\n");
}
void get_chan_task(__async__, void* arg) {
chan_info_t chan_entry;
while(!g_closed) {
int ret;
ret = s_chan_get(__await__, g_chan, &chan_entry);
if(ret != 0) {
printf("cancelled at line %d\n", __LINE__);
break;
}
g_closed = true;
printf("get %d\n", chan_entry.count);
ret = s_task_sleep(__await__, CLOSE_DELAY_TIME);
if(ret != 0) {
printf("cancelled at line %d\n", __LINE__);
break;
}
}
}
void main_task(__async__, void *arg) {
int i;
s_chan_init(g_chan, chan_info_t, CHAN_SIZE);
s_task_create(g_stack_action_chan, sizeof(g_stack_action_chan), get_chan_task, NULL);
for(i = 0; i < sizeof(chan_table) / sizeof(struct chan_info); ++i) {
s_task_create(chan_table[i].stack, sizeof(chan_table[i].stack), put_task, (void *)&chan_table[i]);
}
for(i = 0; i < sizeof(chan_table) / sizeof(struct chan_info); ++i) {
s_task_join(__await__, chan_table[i].stack);
}
}
int main(int argc, char *argv[]) {
uv_loop_t *loop = uv_default_loop();
s_task_init_system(loop);
s_task_create(g_stack_main, sizeof(g_stack_main), main_task, (void *)(size_t)argc);
uv_run(loop, UV_RUN_DEFAULT);
return 0;
}