wfqueue.c threads stuck at spin wfqueue.c:30
Id Target Id Frame
16 Thread 0x7fffbf7fe700 (LWP 22384) "a.out" 0x0000000000401779 in help_enq (i=33724383, c=0x7fffcc05db40, th=0x7fffb4001000, q=0x605000)
at wfqueue.c:213
15 Thread 0x7fffbffff700 (LWP 22383) "a.out" 0x000000000040192b in help_enq (i=33705511, c=0x7fffc80aa3c0, th=0x7fffb0001000, q=0x605000)
at wfqueue.c:236
14 Thread 0x7fffdcff9700 (LWP 22382) "a.out" spin (p=0x7fffcc0f3940) at wfqueue.c:30
13 Thread 0x7fffdd7fa700 (LWP 22381) "a.out" spin (p=0x7fffcc05e140) at wfqueue.c:30
12 Thread 0x7fffddffb700 (LWP 22380) "a.out" spin (p=0x7fffcc05e2c0) at wfqueue.c:30
11 Thread 0x7fffde7fc700 (LWP 22379) "a.out" spin (p=0x7fffcc05e3c0) at wfqueue.c:30
10 Thread 0x7fffdeffd700 (LWP 22378) "a.out" spin (p=0x7fffcc05c140) at wfqueue.c:32
* 1 Thread 0x7ffff7fe0740 (LWP 22369) "a.out" 0x0000000000400def in running_wfq_test (arg_producer=<optimized out>, arg_consumer=<optimized out>,
This issue happened on and off, and it always stucking at last 5 count Total Nproc = 16 core nProducerThread = 8, nConsumerThread= 7 nProducing = 8000000, nConsuming = 7999995,
could you give me a small example that reproduce this?
Hi Chaoran, Do you have reddit id? May I add you in chat list first?
Sorry, I don't have reddit account. If you post here a small test code example and the compiler version you used. I can debug the issue and get back to you.
Hi Charan,
This is all the test
/*
* compile : g++ -std=c++11 -I./ OverallTest.cpp -pthread -Wall -o overalltest
* execute
* valgrind --fair-sched=yes ./overalltest
*/
// #include <iostream>
#include <stdio.h>
#include <time.h>
#include <pthread.h>
// #include <thread>
#include "queue.h"
#include <unistd.h>
#include <assert.h>
#include <sys/time.h>
#define MILLION 1000000
#define TEST_MAX_INPUT MILLION
typedef struct {
size_t v;
} MyVal;
typedef struct {
size_t nProducer;
size_t nConsumer;
size_t nProducing;
size_t nConsuming;
queue_t *q;
handle_t **hds;
} wfq_test_config_t;
int TEST_COUNT = 0;
MyVal* newval(size_t digit) {
MyVal *data = (MyVal*)malloc(sizeof(MyVal));
data->v = digit;
return data;
}
static int id = 0;
void * producing_fn(void *v) {
int z;
wfq_test_config_t* config = (wfq_test_config_t*)v;
queue_t *q = config->q;
handle_t **hds = config->hds;
int _id = __sync_fetch_and_add(&id, 1);
hds[_id] = align_malloc(PAGE_SIZE, sizeof(handle_t));
queue_register(q, hds[_id], _id);
for (z = 0; z < TEST_MAX_INPUT; z++) {
MyVal* s = newval(__sync_fetch_and_add(&config->nProducing, 1));
enqueue(q, hds[_id], s);
// wfq_sleep(1);
// if (xx % 100000 == 0)
// printf("%zu\n", xx);
}
queue_free(q, hds[_id]);
return NULL;
}
void * consuming_fn(void *v) {
wfq_test_config_t* config = (wfq_test_config_t*)v;
queue_t *q = config->q;
handle_t **hds = config->hds;
int _id = __sync_fetch_and_add(&id, 1);
hds[_id] = align_malloc(PAGE_SIZE, sizeof(handle_t));
queue_register(q, hds[_id], _id);
for (;;) {
MyVal* s;
while ( (s = (MyVal*)dequeue(q, hds[_id]) ) ) {
if (s->v % 100000 == 0) {
printf("t %zu\n", s->v);
}
free(s);
__sync_fetch_and_add(&config->nConsuming, 1);
}
if (__sync_fetch_and_add(&config->nConsuming, 0) >= TEST_MAX_INPUT * (config->nProducer)) {
break;
}
}
queue_free(q, hds[_id]);
return NULL;
}
int running_wfq_test(size_t arg_producer, size_t arg_consumer, size_t arg_producing, size_t arg_consuming, const size_t total_threads, const char * test_type) {
size_t i = 0;
struct timeval start_t, end_t;
double diff_t;
wfq_test_config_t config;
assert ((total_threads >= (arg_producer + arg_consumer)) && "not enough thread to test");
pthread_t testThreads[total_threads];
config.nProducer = arg_producer;
config.nProducing = arg_producing;
config.nConsumer = arg_consumer;
config.nConsuming = arg_consuming;
config.q = align_malloc(PAGE_SIZE, sizeof(queue_t));
queue_init(config.q, total_threads);
config.hds = align_malloc(PAGE_SIZE, sizeof(handle_t * [total_threads]));
char *testname = (char*)"Fixed size wfqueue test";
gettimeofday(&start_t, NULL);
for (i = 0; i < arg_producer ; i++) {
pthread_create(testThreads + i, 0, producing_fn, &config);
}
for (; i < total_threads ; i++) {
pthread_create(testThreads + i, 0, consuming_fn, &config);
}
while (__sync_fetch_and_add(&config.nConsuming, 0) < TEST_MAX_INPUT * (config.nProducer)) {
struct timeval curr;
gettimeofday(&curr, NULL);
if ((curr.tv_usec - start_t.tv_usec) >= (120 * 1000 * 1000)) { // 2 minute
assert(0 && " too long to consuming the queue ");
}
}
for (i = 0; i < total_threads; i++) {
void *ret;
pthread_join(testThreads[i], &ret);
// free(ret);
}
gettimeofday(&end_t, NULL);
diff_t = (double)(end_t.tv_usec - start_t.tv_usec) / 1000000 + (double)(end_t.tv_sec - start_t.tv_sec);
printf("===END Test= %d - %s, test type %s ===\n", ++TEST_COUNT, testname, test_type);
printf("======Total consuming = %zu\n", __sync_fetch_and_add(&config.nConsuming, 0));
// printf("======Left over = %zu\n", wfq_size(config.q));
printf("Execution time = %f\n", diff_t);
// assert(wfq_size(config.q) == 0 && " still left over queue inside ");
// wfq_destroy(config.q);
free(config.q);
free(config.hds);
sleep(1);
return 0;
}
int main(void) {
int ret = 0, i;
unsigned int n = sysconf(_SC_NPROCESSORS_ONLN); // Linux / MAC OS
if (n > 1) {
int NUM_PRODUCER = n/2;
int NUM_CONSUMER = (n/2) - 1;
int running_set = 1;
for (i = 0; i < running_set; i++) {
ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MPMC");
}
NUM_PRODUCER = n - 2;
NUM_CONSUMER = 1;
for (i = 0; i < running_set; i++) {
ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MPSC");
}
NUM_PRODUCER = 1;
NUM_CONSUMER = n - 2;
for (i = 0; i < running_set; i++) {
ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MCSP");
}
} else {
ret = -1;
printf("One thread is not enough for testing\n");
}
return ret;
}
What's your gcc version: g++ --version?
g++ (GCC) 4.8.5 20150623 (Red Hat 4.8.5-16) Copyright (C) 2015 Free Software Foundation, Inc. This is free software; see the source for copying conditions. There is NO warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
I compile with
gcc -DWFQUEUE -I./ -O3 main_test.c wfqueue.c -pthread -o test -g
Could you try your test after adding a pthread_barrier_wait after queue_register and another pthread_barrier_wait before queue_free in both consuming_fn and producing_fn?
it stucking at barrier wait
0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 15 Thread 0x7fffbffff700 (LWP 651) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 14 Thread 0x7fffdcff9700 (LWP 650) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 13 Thread 0x7fffdd7fa700 (LWP 649) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 12 Thread 0x7fffddffb700 (LWP 647) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 11 Thread 0x7fffde7fc700 (LWP 646) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 10 Thread 0x7fffdeffd700 (LWP 645) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 9 Thread 0x7fffdf7fe700 (LWP 644) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 8 Thread 0x7fffdffff700 (LWP 643) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 7 Thread 0x7ffff4fec700 (LWP 641) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 6 Thread 0x7ffff57ed700 (LWP 640) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 5 Thread 0x7ffff5fee700 (LWP 639) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 4 Thread 0x7ffff67ef700 (LWP 638) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 3 Thread 0x7ffff6ff0700 (LWP 637) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 2 Thread 0x7ffff77f1700 (LWP 633) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0
Did you initialize the barrier before you use it?
Sorry I didnt aware, after I put. It crashed
===END Test= 2 - Fixed size wfqueue test, test type MPSC ===
======Total consuming = 14000000
Execution time = 2.729662
*** Error in `./test': free(): invalid next size (fast): 0x00000000011de000 ***
======= Backtrace: =========
/usr/lib64/libc.so.6(+0x81429)[0x7f237a348429]
./test[0x400ff4]
./test[0x400a92]
/usr/lib64/libc.so.6(__libc_start_main+0xf5)[0x7f237a2e93d5]
./test[0x400ad8]
======= Memory map: ========
00400000-00403000 r-xp 00000000 fd:05 4588452 /home/booking/cground/fast-wait-free-queue/test
00602000-00603000 r--p 00002000 fd:05 4588452 /home/booking/cground/fast-wait-free-queue/test
00603000-00604000 rw-p 00003000 fd:05 4588452 /home/booking/cground/fast-wait-free-queue/test
011cb000-011ed000 rw-p 00000000 00:00 0 [heap]
7f22f8000000-7f22f9592000 rw-p 00000000 00:00 0
7f22f9592000-7f22fc000000 ---p 00000000 00:00 0
7f2300000000-7f2301540000 rw-p 00000000 00:00 0
7f2301540000-7f2304000000 ---p 00000000 00:00 0
7f2304000000-7f2305577000 rw-p 00000000 00:00 0
7f2305577000-7f2308000000 ---p 00000000 00:00 0
7f2308000000-7f230968e000 rw-p 00000000 00:00 0
7f230968e000-7f230c000000 ---p 00000000 00:00 0
7f230c000000-7f230d76d000 rw-p 00000000 00:00 0
7f230d76d000-7f2310000000 ---p 00000000 00:00 0
7f2310000000-7f231143e000 rw-p 00000000 00:00 0
7f231143e000-7f2314000000 ---p 00000000 00:00 0
7f2314000000-7f23156bc000 rw-p 00000000 00:00 0
7f23156bc000-7f2318000000 ---p 00000000 00:00 0
7f2318000000-7f2319640000 rw-p 00000000 00:00 0
7f2319640000-7f231c000000 ---p 00000000 00:00 0
7f231c000000-7f231d34a000 rw-p 00000000 00:00 0
7f231d34a000-7f2320000000 ---p 00000000 00:00 0
7f2320000000-7f23214ad000 rw-p 00000000 00:00 0
Could you post your code after all the barrier related changes?
/*
* compile : g++ -std=c++11 -I./ OverallTest.cpp -pthread -Wall -o overalltest
* execute
* valgrind --fair-sched=yes ./overalltest
*/
// #include <iostream>
#include <stdio.h>
#include <time.h>
#include <pthread.h>
// #include <thread>
#include "queue.h"
#include <unistd.h>
#include <assert.h>
#include <sys/time.h>
#define MILLION 1000000
#define TEST_MAX_INPUT MILLION
static pthread_barrier_t barrier;
typedef struct {
size_t v;
} MyVal;
typedef struct {
size_t nProducer;
size_t nConsumer;
size_t nProducing;
size_t nConsuming;
queue_t *q;
handle_t **hds;
} wfq_test_config_t;
int TEST_COUNT = 0;
MyVal* newval(size_t digit) {
MyVal *data = (MyVal*)malloc(sizeof(MyVal));
data->v = digit;
return data;
}
static int id = 0;
void * producing_fn(void *v) {
int z;
wfq_test_config_t* config = (wfq_test_config_t*)v;
queue_t *q = config->q;
handle_t **hds = config->hds;
int _id = __sync_fetch_and_add(&id, 1);
hds[_id] = align_malloc(PAGE_SIZE, sizeof(handle_t));
queue_register(q, hds[_id], _id);
pthread_barrier_wait(&barrier);
for (z = 0; z < TEST_MAX_INPUT; z++) {
MyVal* s = newval(__sync_fetch_and_add(&config->nProducing, 1));
enqueue(q, hds[_id], s);
// wfq_sleep(1);
// if (xx % 100000 == 0)
// printf("%zu\n", xx);
}
pthread_barrier_wait(&barrier);
queue_free(q, hds[_id]);
return NULL;
}
void * consuming_fn(void *v) {
wfq_test_config_t* config = (wfq_test_config_t*)v;
queue_t *q = config->q;
handle_t **hds = config->hds;
int _id = __sync_fetch_and_add(&id, 1);
hds[_id] = align_malloc(PAGE_SIZE, sizeof(handle_t));
queue_register(q, hds[_id], _id);
pthread_barrier_wait(&barrier);
for (;;) {
MyVal* s;
while ( (s = (MyVal*)dequeue(q, hds[_id]) ) ) {
if (s->v % 100000 == 0) {
printf("t %zu\n", s->v);
}
free(s);
__sync_fetch_and_add(&config->nConsuming, 1);
}
if (__sync_fetch_and_add(&config->nConsuming, 0) >= TEST_MAX_INPUT * (config->nProducer)) {
break;
}
}
pthread_barrier_wait(&barrier);
queue_free(q, hds[_id]);
return NULL;
}
int running_wfq_test(size_t arg_producer, size_t arg_consumer, size_t arg_producing, size_t arg_consuming, const size_t total_threads, const char * test_type) {
size_t i = 0;
struct timeval start_t, end_t;
double diff_t;
wfq_test_config_t config;
assert ((total_threads >= (arg_producer + arg_consumer)) && "not enough thread to test");
pthread_t testThreads[total_threads];
config.nProducer = arg_producer;
config.nProducing = arg_producing;
config.nConsumer = arg_consumer;
config.nConsuming = arg_consuming;
config.q = align_malloc(PAGE_SIZE, sizeof(queue_t));
queue_init(config.q, total_threads);
config.hds = align_malloc(PAGE_SIZE, sizeof(handle_t * [total_threads]));
char *testname = (char*)"Fixed size wfqueue test";
gettimeofday(&start_t, NULL);
for (i = 0; i < arg_producer ; i++) {
pthread_create(testThreads + i, 0, producing_fn, &config);
}
for (; i < total_threads ; i++) {
pthread_create(testThreads + i, 0, consuming_fn, &config);
}
while (__sync_fetch_and_add(&config.nConsuming, 0) < TEST_MAX_INPUT * (config.nProducer)) {
struct timeval curr;
gettimeofday(&curr, NULL);
if ((curr.tv_usec - start_t.tv_usec) >= (120 * 1000 * 1000)) { // 2 minute
assert(0 && " too long to consuming the queue ");
}
}
for (i = 0; i < total_threads; i++) {
void *ret;
pthread_join(testThreads[i], &ret);
// free(ret);
}
gettimeofday(&end_t, NULL);
diff_t = (double)(end_t.tv_usec - start_t.tv_usec) / 1000000 + (double)(end_t.tv_sec - start_t.tv_sec);
printf("===END Test= %d - %s, test type %s ===\n", ++TEST_COUNT, testname, test_type);
printf("======Total consuming = %zu\n", __sync_fetch_and_add(&config.nConsuming, 0));
// printf("======Left over = %zu\n", wfq_size(config.q));
printf("Execution time = %f\n", diff_t);
// assert(wfq_size(config.q) == 0 && " still left over queue inside ");
// wfq_destroy(config.q);
free(config.q);
free(config.hds);
sleep(1);
return 0;
}
int main(void) {
int ret = 0, i;
unsigned int n = sysconf(_SC_NPROCESSORS_ONLN); // Linux / MAC OS
pthread_barrier_init(&barrier, NULL, n);
if (n > 1) {
int NUM_PRODUCER = n/2;
int NUM_CONSUMER = n/2;
int running_set = 1;
for (i = 0; i < running_set; i++) {
ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MPMC");
}
NUM_PRODUCER = n - 1;
NUM_CONSUMER = 1;
for (i = 0; i < running_set; i++) {
ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MPSC");
}
NUM_PRODUCER = 1;
NUM_CONSUMER = n-1;
for (i = 0; i < running_set; i++) {
ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MCSP");
}
} else {
ret = -1;
printf("One thread is not enough for testing\n");
}
return ret;