fast-wait-free-queue icon indicating copy to clipboard operation
fast-wait-free-queue copied to clipboard

wfqueue MCSP stucking at spin function scope wfqueue.c [30|32]

Open Taheta opened this issue 6 years ago • 6 comments

Hi chaoran, this is confirm stuck if mcsp Nproc = 16 consumer: 14 producer: 1

0x0000000000400e5f in running_wfq_test (arg_producer=, arg_consumer=, arg_producing=, arg_consuming=, total_threads=15, test_type=0x401fbd "MCSP") at main_test.c:121 121 while (__sync_fetch_and_add(&config.nConsuming, 0) < TEST_MAX_INPUT * (config.nProducer)) { Missing separate debuginfos, use: debuginfo-install glibc-2.17-222.el7.x86_64 (gdb) info threads Id Target Id Frame 16 Thread 0x7fffbf7fe700 (LWP 25694) "test" spin (p=0x7fffc8031980) at wfqueue.c:32 15 Thread 0x7fffbffff700 (LWP 25693) "test" spin (p=0x7fffc8032180) at wfqueue.c:32 14 Thread 0x7fffdcff9700 (LWP 25692) "test" spin (p=0x7fffc80324c0) at wfqueue.c:30 13 Thread 0x7fffdd7fa700 (LWP 25691) "test" 0x0000000000401788 in spin (p=0x7fffc8032a80) at wfqueue.c:30 12 Thread 0x7fffddffb700 (LWP 25690) "test" 0x0000000000401788 in spin (p=0x7fffc8032d80) at wfqueue.c:30 11 Thread 0x7fffde7fc700 (LWP 25689) "test" 0x0000000000401788 in spin (p=0x7fffc8033200) at wfqueue.c:30 10 Thread 0x7fffdeffd700 (LWP 25688) "test" spin (p=0x7fffc8033540) at wfqueue.c:32 9 Thread 0x7fffdf7fe700 (LWP 25687) "test" spin (p=0x7fffc80338c0) at wfqueue.c:32 8 Thread 0x7fffdffff700 (LWP 25686) "test" 0x0000000000401788 in spin (p=0x7fffc8033cc0) at wfqueue.c:30 7 Thread 0x7ffff4fec700 (LWP 25685) "test" 0x00000000004017f9 in help_enq (i=91476149, c=0x7fffc8033ec0, th=0x7fffd8001000, q=0x605000) at wfqueue.c:213 6 Thread 0x7ffff57ed700 (LWP 25684) "test" 0x0000000000401788 in spin (p=0x7fffe4013080) at wfqueue.c:30 5 Thread 0x7ffff5fee700 (LWP 25683) "test" 0x0000000000401788 in spin (p=0x7fffe4013200) at wfqueue.c:30 4 Thread 0x7ffff67ef700 (LWP 25682) "test" 0x0000000000401788 in spin (p=0x7fffc802cd80) at wfqueue.c:30 3 Thread 0x7ffff6ff0700 (LWP 25681) "test" 0x0000000000401788 in spin (p=0x7fffe4013380) at wfqueue.c:30

  • 1 Thread 0x7ffff7fe0740 (LWP 25676) "test" 0x0000000000400e5f in running_wfq_test (arg_producer=, arg_consumer=, arg_producing=, arg_consuming=, total_threads=15, test_type=0x401fbd "MCSP") at main_test.c:121

Taheta avatar Jun 24 '19 06:06 Taheta

Is this the same issue as #10 ? Or a different one?

chaoran avatar Jun 24 '19 06:06 chaoran

This is different, this is MCSP. This is confirmed issue. I have dropped you a code, you may take a look.

Taheta avatar Jun 24 '19 06:06 Taheta

Hi Chaoran,

This is the MCSP code, it still stuck after I put barrier

/*
 * compile : g++ -std=c++11 -I./ OverallTest.cpp -pthread -Wall -o overalltest
 * execute
 * valgrind --fair-sched=yes ./overalltest
 */
// #include <iostream>
#include <stdio.h>
#include <time.h>
#include <pthread.h>
// #include <thread>

#include "queue.h"
#include <unistd.h>
#include <assert.h>
#include <sys/time.h>

#define MILLION  1000000
#define TEST_MAX_INPUT  MILLION

static pthread_barrier_t barrier;
typedef struct {
    size_t v;
} MyVal;

typedef struct {
    size_t nProducer;
    size_t nConsumer;
    size_t nProducing;
    size_t nConsuming;
    queue_t *q;
    handle_t **hds;
} wfq_test_config_t;

int TEST_COUNT = 0;


MyVal* newval(size_t digit) {
    MyVal *data = (MyVal*)malloc(sizeof(MyVal));
    data->v = digit;
    return  data;
}


static int id = 0;
void * producing_fn(void *v) {
    int z;
    wfq_test_config_t* config = (wfq_test_config_t*)v;
    queue_t *q = config->q;
    handle_t **hds = config->hds;
    int _id = __sync_fetch_and_add(&id, 1);
    hds[_id] = align_malloc(PAGE_SIZE, sizeof(handle_t));
    queue_register(q, hds[_id], _id);
    pthread_barrier_wait(&barrier);
    for (z = 0; z < TEST_MAX_INPUT; z++) {
        MyVal* s = newval(__sync_fetch_and_add(&config->nProducing, 1));

        enqueue(q, hds[_id], s);
        // wfq_sleep(1);
        // if (xx % 100000 == 0)
        //     printf("%zu\n", xx);
    }

    pthread_barrier_wait(&barrier);
    queue_free(q, hds[_id]);
    return NULL;
}
void * consuming_fn(void *v) {
    wfq_test_config_t* config = (wfq_test_config_t*)v;
    queue_t *q = config->q;
    handle_t **hds = config->hds;
    int _id = __sync_fetch_and_add(&id, 1);
    hds[_id] = align_malloc(PAGE_SIZE, sizeof(handle_t));
    queue_register(q, hds[_id], _id);
    pthread_barrier_wait(&barrier);

    for (;;) {
        MyVal* s;
        while ( (s = (MyVal*)dequeue(q, hds[_id]) )  ) {
            if (s->v % 100000 == 0) {
                printf("t %zu\n", s->v);
            }
            free(s);
            __sync_fetch_and_add(&config->nConsuming, 1);
        }
        if (__sync_fetch_and_add(&config->nConsuming, 0) >= TEST_MAX_INPUT * (config->nProducer)) {
            break;
        }
    }

    pthread_barrier_wait(&barrier);
    queue_free(q, hds[_id]);
    return NULL;
}

int running_wfq_test(size_t arg_producer, size_t arg_consumer, size_t arg_producing, size_t arg_consuming, const size_t total_threads, const char * test_type) {

    size_t i = 0;
    struct timeval start_t, end_t;
    double diff_t;
    wfq_test_config_t config;

    assert ((total_threads >= (arg_producer + arg_consumer)) && "not enough thread to test");

    pthread_t testThreads[total_threads];


    config.nProducer = arg_producer;
    config.nProducing = arg_producing;
    config.nConsumer = arg_consumer;
    config.nConsuming = arg_consuming;
    config.q = align_malloc(PAGE_SIZE, sizeof(queue_t));
    queue_init(config.q, total_threads);
    config.hds = align_malloc(PAGE_SIZE, sizeof(handle_t * [total_threads]));


    char *testname = (char*)"Fixed size wfqueue test";

    gettimeofday(&start_t, NULL);
    for (i = 0; i < arg_producer ; i++) {
        pthread_create(testThreads + i, 0, producing_fn,  &config);
    }
    for (; i < total_threads ; i++) {
        pthread_create(testThreads + i, 0, consuming_fn,  &config);
    }

    while (__sync_fetch_and_add(&config.nConsuming, 0) < TEST_MAX_INPUT * (config.nProducer)) {
      struct timeval curr;
      gettimeofday(&curr, NULL);
      if ((curr.tv_usec - start_t.tv_usec) >= (120 * 1000 * 1000)) { // 2 minute
          assert(0 && " too long to consuming the queue ");
      }
    }

    for (i = 0; i < total_threads; i++) {
        void *ret;
        pthread_join(testThreads[i], &ret);
        // free(ret);
    }

    gettimeofday(&end_t, NULL);


    diff_t = (double)(end_t.tv_usec - start_t.tv_usec) / 1000000 + (double)(end_t.tv_sec - start_t.tv_sec);


    printf("===END Test= %d - %s, test type %s ===\n", ++TEST_COUNT, testname, test_type);
    printf("======Total consuming = %zu\n", __sync_fetch_and_add(&config.nConsuming, 0));
    // printf("======Left over = %zu\n", wfq_size(config.q));
    printf("Execution time = %f\n", diff_t);
    // assert(wfq_size(config.q) == 0 && " still left over queue inside ");

    // wfq_destroy(config.q);
    free(config.q);
    free(config.hds);
    sleep(1);
    return 0;
}

int main(void) {
    int ret = 0, i;

    unsigned int n = sysconf(_SC_NPROCESSORS_ONLN); // Linux / MAC OS
    pthread_barrier_init(&barrier, NULL, n);
    if (n > 1) {
        // int NUM_PRODUCER = n/2;
        // int NUM_CONSUMER = (n/2) - 1;
        // int running_set = 1;
        //
        // for (i = 0; i < running_set; i++) {
        //     ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MPMC");
        // }
        //
        // NUM_PRODUCER = n - 2;
        // NUM_CONSUMER = 1;
        // for (i = 0; i < running_set; i++) {
        //     ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MPSC");
        // }

        int NUM_PRODUCER = 1;
        int NUM_CONSUMER = n-1;
        int running_set = 1;
        for (i = 0; i < running_set; i++) {
            ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MCSP");
        }
    } else {
        ret = -1;
        printf("One thread is not enough for testing\n");
    }

    return ret;
}

Taheta avatar Jun 24 '19 07:06 Taheta

I thought it could be gcc 4.8.5 issue, but tried my mac clang Apple LLVM version 10.0.0 (clang-1000.11.45.5)

Still having same issue.

Taheta avatar Jun 26 '19 00:06 Taheta

It may be a bug in the wfqueue implementation. I don't have time to debug it right now. But I will leave this issue open until I fix it.

chaoran avatar Jun 26 '19 01:06 chaoran

Hi, I have make a #13, It looks like no longer stucks in my tests of the dropped code, or is there some things still not right? Maybe you can test it again.

Pslydhh avatar Mar 12 '20 13:03 Pslydhh