wfqueue MCSP stucking at spin function scope wfqueue.c [30|32]
Hi chaoran, this is confirm stuck if mcsp Nproc = 16 consumer: 14 producer: 1
0x0000000000400e5f in running_wfq_test (arg_producer=
- 1 Thread 0x7ffff7fe0740 (LWP 25676) "test" 0x0000000000400e5f in running_wfq_test (arg_producer=
, arg_consumer= , arg_producing= , arg_consuming= , total_threads=15, test_type=0x401fbd "MCSP") at main_test.c:121
Is this the same issue as #10 ? Or a different one?
This is different, this is MCSP. This is confirmed issue. I have dropped you a code, you may take a look.
Hi Chaoran,
This is the MCSP code, it still stuck after I put barrier
/*
* compile : g++ -std=c++11 -I./ OverallTest.cpp -pthread -Wall -o overalltest
* execute
* valgrind --fair-sched=yes ./overalltest
*/
// #include <iostream>
#include <stdio.h>
#include <time.h>
#include <pthread.h>
// #include <thread>
#include "queue.h"
#include <unistd.h>
#include <assert.h>
#include <sys/time.h>
#define MILLION 1000000
#define TEST_MAX_INPUT MILLION
static pthread_barrier_t barrier;
typedef struct {
size_t v;
} MyVal;
typedef struct {
size_t nProducer;
size_t nConsumer;
size_t nProducing;
size_t nConsuming;
queue_t *q;
handle_t **hds;
} wfq_test_config_t;
int TEST_COUNT = 0;
MyVal* newval(size_t digit) {
MyVal *data = (MyVal*)malloc(sizeof(MyVal));
data->v = digit;
return data;
}
static int id = 0;
void * producing_fn(void *v) {
int z;
wfq_test_config_t* config = (wfq_test_config_t*)v;
queue_t *q = config->q;
handle_t **hds = config->hds;
int _id = __sync_fetch_and_add(&id, 1);
hds[_id] = align_malloc(PAGE_SIZE, sizeof(handle_t));
queue_register(q, hds[_id], _id);
pthread_barrier_wait(&barrier);
for (z = 0; z < TEST_MAX_INPUT; z++) {
MyVal* s = newval(__sync_fetch_and_add(&config->nProducing, 1));
enqueue(q, hds[_id], s);
// wfq_sleep(1);
// if (xx % 100000 == 0)
// printf("%zu\n", xx);
}
pthread_barrier_wait(&barrier);
queue_free(q, hds[_id]);
return NULL;
}
void * consuming_fn(void *v) {
wfq_test_config_t* config = (wfq_test_config_t*)v;
queue_t *q = config->q;
handle_t **hds = config->hds;
int _id = __sync_fetch_and_add(&id, 1);
hds[_id] = align_malloc(PAGE_SIZE, sizeof(handle_t));
queue_register(q, hds[_id], _id);
pthread_barrier_wait(&barrier);
for (;;) {
MyVal* s;
while ( (s = (MyVal*)dequeue(q, hds[_id]) ) ) {
if (s->v % 100000 == 0) {
printf("t %zu\n", s->v);
}
free(s);
__sync_fetch_and_add(&config->nConsuming, 1);
}
if (__sync_fetch_and_add(&config->nConsuming, 0) >= TEST_MAX_INPUT * (config->nProducer)) {
break;
}
}
pthread_barrier_wait(&barrier);
queue_free(q, hds[_id]);
return NULL;
}
int running_wfq_test(size_t arg_producer, size_t arg_consumer, size_t arg_producing, size_t arg_consuming, const size_t total_threads, const char * test_type) {
size_t i = 0;
struct timeval start_t, end_t;
double diff_t;
wfq_test_config_t config;
assert ((total_threads >= (arg_producer + arg_consumer)) && "not enough thread to test");
pthread_t testThreads[total_threads];
config.nProducer = arg_producer;
config.nProducing = arg_producing;
config.nConsumer = arg_consumer;
config.nConsuming = arg_consuming;
config.q = align_malloc(PAGE_SIZE, sizeof(queue_t));
queue_init(config.q, total_threads);
config.hds = align_malloc(PAGE_SIZE, sizeof(handle_t * [total_threads]));
char *testname = (char*)"Fixed size wfqueue test";
gettimeofday(&start_t, NULL);
for (i = 0; i < arg_producer ; i++) {
pthread_create(testThreads + i, 0, producing_fn, &config);
}
for (; i < total_threads ; i++) {
pthread_create(testThreads + i, 0, consuming_fn, &config);
}
while (__sync_fetch_and_add(&config.nConsuming, 0) < TEST_MAX_INPUT * (config.nProducer)) {
struct timeval curr;
gettimeofday(&curr, NULL);
if ((curr.tv_usec - start_t.tv_usec) >= (120 * 1000 * 1000)) { // 2 minute
assert(0 && " too long to consuming the queue ");
}
}
for (i = 0; i < total_threads; i++) {
void *ret;
pthread_join(testThreads[i], &ret);
// free(ret);
}
gettimeofday(&end_t, NULL);
diff_t = (double)(end_t.tv_usec - start_t.tv_usec) / 1000000 + (double)(end_t.tv_sec - start_t.tv_sec);
printf("===END Test= %d - %s, test type %s ===\n", ++TEST_COUNT, testname, test_type);
printf("======Total consuming = %zu\n", __sync_fetch_and_add(&config.nConsuming, 0));
// printf("======Left over = %zu\n", wfq_size(config.q));
printf("Execution time = %f\n", diff_t);
// assert(wfq_size(config.q) == 0 && " still left over queue inside ");
// wfq_destroy(config.q);
free(config.q);
free(config.hds);
sleep(1);
return 0;
}
int main(void) {
int ret = 0, i;
unsigned int n = sysconf(_SC_NPROCESSORS_ONLN); // Linux / MAC OS
pthread_barrier_init(&barrier, NULL, n);
if (n > 1) {
// int NUM_PRODUCER = n/2;
// int NUM_CONSUMER = (n/2) - 1;
// int running_set = 1;
//
// for (i = 0; i < running_set; i++) {
// ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MPMC");
// }
//
// NUM_PRODUCER = n - 2;
// NUM_CONSUMER = 1;
// for (i = 0; i < running_set; i++) {
// ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MPSC");
// }
int NUM_PRODUCER = 1;
int NUM_CONSUMER = n-1;
int running_set = 1;
for (i = 0; i < running_set; i++) {
ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MCSP");
}
} else {
ret = -1;
printf("One thread is not enough for testing\n");
}
return ret;
}
I thought it could be gcc 4.8.5 issue, but tried my mac clang Apple LLVM version 10.0.0 (clang-1000.11.45.5)
Still having same issue.
It may be a bug in the wfqueue implementation. I don't have time to debug it right now. But I will leave this issue open until I fix it.
Hi, I have make a #13, It looks like no longer stucks in my tests of the dropped code, or is there some things still not right? Maybe you can test it again.