s_task icon indicating copy to clipboard operation
s_task copied to clipboard

libuv未计算chan等待状态导致程序提前结束

Open playgood111 opened this issue 4 years ago • 5 comments

现象为多个send_task发送消息到recv_task,由于chan空间设置太小,导致部分send_task进入等待态,而recv_task在收到第一个消息时正常退出并设置全局变量控制所有send_task退出。此时,剩余send_task不在全局变量的active_task上且由于没有定时器、没有uv的handles和req,uv正常结束流程,导致剩余等待状态的send_task没有正常退出。

不知道这是个bug还是正常使用不会出现这种情况?

playgood111 avatar Dec 24 '20 03:12 playgood111

可否上代码看下?

xhawk18 avatar Dec 26 '20 13:12 xhawk18

源代码如下:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "s_task.h"

#include <inttypes.h>

typedef uint8_t u8;
typedef uint16_t u16;
typedef uint32_t u32;
typedef uint64_t u64;

typedef int8_t i8;
typedef int16_t i16;
typedef int32_t i32;
typedef int64_t i64;

typedef struct chan_info{
	u32		timeout;
	u32		count;
	void 	*stack[32*1024];
} chan_info_t;

#define PUT_TIME_OUT			3
#define CLOSE_DELAY_TIME		10
#define CHAN_SIZE               3

//s_event_t  g_event;
void      *g_stack_main[512*1024];
//void      *g_stack0[512*1024];
//void      *g_stack1[512*1024];
void* g_stack_action_chan[64 * 1024];
s_chan_declare(g_chan, chan_info_t, CHAN_SIZE);

static struct chan_info chan_table[] = {
	{.timeout = PUT_TIME_OUT, .count = 1},
	{.timeout = PUT_TIME_OUT, .count = 2},
	{.timeout = PUT_TIME_OUT, .count = 3},
	{.timeout = PUT_TIME_OUT, .count = 4},
	{.timeout = PUT_TIME_OUT, .count = 5},
	{.timeout = PUT_TIME_OUT, .count = 6},
	{.timeout = PUT_TIME_OUT, .count = 7},
};

bool  g_closed = false;

void put_task(__async__, void *arg) {
    chan_info_t *chan_entry = (chan_info_t *)arg;
	
    while (!g_closed) {
		printf("put %d\n", chan_entry->count);
		s_chan_put(__await__, g_chan, chan_entry);
		s_task_sleep(__await__, chan_entry->timeout);
    }
	printf("done.\n");
}

void get_chan_task(__async__, void* arg) {
    chan_info_t chan_entry;

    while(!g_closed) {
        s_chan_get(__await__, g_chan, &chan_entry);
		g_closed = true;

		printf("get %d\n", chan_entry.count);
		s_task_sleep(__await__, CLOSE_DELAY_TIME);
    }
}


void main_task(__async__, void *arg) {
    int i;

	s_chan_init(g_chan, chan_info_t, CHAN_SIZE);
    s_task_create(g_stack_action_chan, sizeof(g_stack_action_chan), get_chan_task, NULL);

	for(i = 0; i < sizeof(chan_table) / sizeof(struct chan_info); ++i) {
		s_task_create(chan_table[i].stack, sizeof(chan_table[i].stack), put_task, (void *)&chan_table[i]);
	}

	for(i = 0; i < sizeof(chan_table) / sizeof(struct chan_info); ++i) {
		s_task_join(__await__, chan_table[i].stack);
	}


}


int main(int argc, char *argv[]) {
    uv_loop_t *loop = uv_default_loop();
    s_task_init_system(loop);

    s_task_create(g_stack_main, sizeof(g_stack_main), main_task, (void *)(size_t)argc);
    
    uv_run(loop, UV_RUN_DEFAULT);
    return 0;
}

上面的代码运行结果为:

$ ./chan_test
put 1
put 2
put 3
put 4
put 5
put 6
put 7
get 1
done.
done.
done.
done.

修改chan大小为32(超过put_task的数量),运行结果为:

$ ./chan_test
put 1
put 2
put 3
put 4
put 5
put 6
put 7
get 1
done.
done.
done.
done.
done.
done.
done.

根据结果推断当chan大小小于put_task数量时,有一部分put_task没有正常退出,不知道该问题是不是bug?

playgood111 avatar Jan 05 '21 00:01 playgood111

补充一下,这个问题是因为 ”接收任务“ 设置 g_closed = true 先退出了, 如果 ”发送任务“ 的队列已满,并且系统也没有更多的任务在运行中(或sleep中任务的情况下), 队列不可能被消费了,这时有三种处理方式 ---

  1. 发送任务一直等着
  2. 发送任务直接取消
  3. 发送任务的发送函数返回失败

方式3比较合理,然而目前代码选的方式2,只要保障chan中的消息有人收就没问题。

xhawk18 avatar Jan 06 '21 15:01 xhawk18

是的,只要有收的就没有问题,所以不确定是不是bug,就来问一下。

playgood111 avatar Jan 12 '21 07:01 playgood111

更新了,没有其他任务可以唤醒时,s_chan_put返回失败,输出如下

put 1
put 2
put 3
put 4
put 5
put 6
put 7
get 1
done.
done.
done.
done.
cancelled at line 55
done.
cancelled at line 55
done.
cancelled at line 55
done.

测试代码

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "s_task.h"

#include <inttypes.h>

typedef uint8_t u8;
typedef uint16_t u16;
typedef uint32_t u32;
typedef uint64_t u64;

typedef int8_t i8;
typedef int16_t i16;
typedef int32_t i32;
typedef int64_t i64;

typedef struct chan_info{
	u32		timeout;
	u32		count;
	void 	*stack[32*1024];
} chan_info_t;

#define PUT_TIME_OUT			3
#define CLOSE_DELAY_TIME		10
#define CHAN_SIZE               3

//s_event_t  g_event;
void      *g_stack_main[512*1024];
//void      *g_stack0[512*1024];
//void      *g_stack1[512*1024];
void* g_stack_action_chan[64 * 1024];
s_chan_declare(g_chan, chan_info_t, CHAN_SIZE);

static struct chan_info chan_table[] = {
	{.timeout = PUT_TIME_OUT, .count = 1},
	{.timeout = PUT_TIME_OUT, .count = 2},
	{.timeout = PUT_TIME_OUT, .count = 3},
	{.timeout = PUT_TIME_OUT, .count = 4},
	{.timeout = PUT_TIME_OUT, .count = 5},
	{.timeout = PUT_TIME_OUT, .count = 6},
	{.timeout = PUT_TIME_OUT, .count = 7},
};

bool  g_closed = false;

void put_task(__async__, void *arg) {
    chan_info_t *chan_entry = (chan_info_t *)arg;
	
    while (!g_closed) {
		int ret;
        printf("put %d\n", chan_entry->count);
		ret = s_chan_put(__await__, g_chan, chan_entry);
        if(ret != 0) {
            printf("cancelled at line %d\n", __LINE__);
            break;
        }
		
        ret = s_task_sleep(__await__, chan_entry->timeout);
        if(ret != 0) {
            printf("cancelled at line %d\n", __LINE__);
            break;
        }
    }
	printf("done.\n");
}

void get_chan_task(__async__, void* arg) {
    chan_info_t chan_entry;

    while(!g_closed) {
        int ret;
        ret = s_chan_get(__await__, g_chan, &chan_entry);
        if(ret != 0) {
            printf("cancelled at line %d\n", __LINE__);
            break;
        }
		g_closed = true;

		printf("get %d\n", chan_entry.count);
		ret = s_task_sleep(__await__, CLOSE_DELAY_TIME);
        if(ret != 0) {
            printf("cancelled at line %d\n", __LINE__);
            break;
        }
    }
}


void main_task(__async__, void *arg) {
    int i;

	s_chan_init(g_chan, chan_info_t, CHAN_SIZE);
    s_task_create(g_stack_action_chan, sizeof(g_stack_action_chan), get_chan_task, NULL);

	for(i = 0; i < sizeof(chan_table) / sizeof(struct chan_info); ++i) {
		s_task_create(chan_table[i].stack, sizeof(chan_table[i].stack), put_task, (void *)&chan_table[i]);
	}

	for(i = 0; i < sizeof(chan_table) / sizeof(struct chan_info); ++i) {
		s_task_join(__await__, chan_table[i].stack);
	}


}


int main(int argc, char *argv[]) {
    uv_loop_t *loop = uv_default_loop();
    s_task_init_system(loop);

    s_task_create(g_stack_main, sizeof(g_stack_main), main_task, (void *)(size_t)argc);
    
    uv_run(loop, UV_RUN_DEFAULT);
    return 0;
}

xhawk18 avatar Jan 14 '21 11:01 xhawk18