fast-wait-free-queue icon indicating copy to clipboard operation
fast-wait-free-queue copied to clipboard

wfqueue.c threads stuck at spin wfqueue.c:30

Open Taheta opened this issue 6 years ago • 12 comments

  Id   Target Id         Frame
  16   Thread 0x7fffbf7fe700 (LWP 22384) "a.out" 0x0000000000401779 in help_enq (i=33724383, c=0x7fffcc05db40, th=0x7fffb4001000, q=0x605000)
    at wfqueue.c:213
  15   Thread 0x7fffbffff700 (LWP 22383) "a.out" 0x000000000040192b in help_enq (i=33705511, c=0x7fffc80aa3c0, th=0x7fffb0001000, q=0x605000)
    at wfqueue.c:236
  14   Thread 0x7fffdcff9700 (LWP 22382) "a.out" spin (p=0x7fffcc0f3940) at wfqueue.c:30
  13   Thread 0x7fffdd7fa700 (LWP 22381) "a.out" spin (p=0x7fffcc05e140) at wfqueue.c:30
  12   Thread 0x7fffddffb700 (LWP 22380) "a.out" spin (p=0x7fffcc05e2c0) at wfqueue.c:30
  11   Thread 0x7fffde7fc700 (LWP 22379) "a.out" spin (p=0x7fffcc05e3c0) at wfqueue.c:30
  10   Thread 0x7fffdeffd700 (LWP 22378) "a.out" spin (p=0x7fffcc05c140) at wfqueue.c:32
* 1    Thread 0x7ffff7fe0740 (LWP 22369) "a.out" 0x0000000000400def in running_wfq_test (arg_producer=<optimized out>, arg_consumer=<optimized out>,

This issue happened on and off, and it always stucking at last 5 count Total Nproc = 16 core nProducerThread = 8, nConsumerThread= 7 nProducing = 8000000, nConsuming = 7999995,

Taheta avatar Jun 20 '19 08:06 Taheta

could you give me a small example that reproduce this?

chaoran avatar Jun 20 '19 10:06 chaoran

Hi Chaoran, Do you have reddit id? May I add you in chat list first?

Taheta avatar Jun 24 '19 00:06 Taheta

Sorry, I don't have reddit account. If you post here a small test code example and the compiler version you used. I can debug the issue and get back to you.

chaoran avatar Jun 24 '19 06:06 chaoran

Hi Charan,

This is all the test

/*
 * compile : g++ -std=c++11 -I./ OverallTest.cpp -pthread -Wall -o overalltest
 * execute
 * valgrind --fair-sched=yes ./overalltest
 */
// #include <iostream>
#include <stdio.h>
#include <time.h>
#include <pthread.h>
// #include <thread>

#include "queue.h"
#include <unistd.h>
#include <assert.h>
#include <sys/time.h>

#define MILLION  1000000
#define TEST_MAX_INPUT  MILLION

typedef struct {
    size_t v;
} MyVal;

typedef struct {
    size_t nProducer;
    size_t nConsumer;
    size_t nProducing;
    size_t nConsuming;
    queue_t *q;
    handle_t **hds;
} wfq_test_config_t;

int TEST_COUNT = 0;


MyVal* newval(size_t digit) {
    MyVal *data = (MyVal*)malloc(sizeof(MyVal));
    data->v = digit;
    return  data;
}


static int id = 0;
void * producing_fn(void *v) {
    int z;
    wfq_test_config_t* config = (wfq_test_config_t*)v;
    queue_t *q = config->q;
    handle_t **hds = config->hds;
    int _id = __sync_fetch_and_add(&id, 1);
    hds[_id] = align_malloc(PAGE_SIZE, sizeof(handle_t));
    queue_register(q, hds[_id], _id);
    for (z = 0; z < TEST_MAX_INPUT; z++) {
        MyVal* s = newval(__sync_fetch_and_add(&config->nProducing, 1));

        enqueue(q, hds[_id], s);
        // wfq_sleep(1);
        // if (xx % 100000 == 0)
        //     printf("%zu\n", xx);
    }

    queue_free(q, hds[_id]);
    return NULL;
}
void * consuming_fn(void *v) {
    wfq_test_config_t* config = (wfq_test_config_t*)v;
    queue_t *q = config->q;
    handle_t **hds = config->hds;
    int _id = __sync_fetch_and_add(&id, 1);
    hds[_id] = align_malloc(PAGE_SIZE, sizeof(handle_t));
    queue_register(q, hds[_id], _id);

    for (;;) {
        MyVal* s;
        while ( (s = (MyVal*)dequeue(q, hds[_id]) )  ) {
            if (s->v % 100000 == 0) {
                printf("t %zu\n", s->v);
            }
            free(s);
            __sync_fetch_and_add(&config->nConsuming, 1);
        }
        if (__sync_fetch_and_add(&config->nConsuming, 0) >= TEST_MAX_INPUT * (config->nProducer)) {
            break;
        }
    }

    queue_free(q, hds[_id]);
    return NULL;
}

int running_wfq_test(size_t arg_producer, size_t arg_consumer, size_t arg_producing, size_t arg_consuming, const size_t total_threads, const char * test_type) {

    size_t i = 0;
    struct timeval start_t, end_t;
    double diff_t;
    wfq_test_config_t config;

    assert ((total_threads >= (arg_producer + arg_consumer)) && "not enough thread to test");

    pthread_t testThreads[total_threads];


    config.nProducer = arg_producer;
    config.nProducing = arg_producing;
    config.nConsumer = arg_consumer;
    config.nConsuming = arg_consuming;
    config.q = align_malloc(PAGE_SIZE, sizeof(queue_t));
    queue_init(config.q, total_threads);
    config.hds = align_malloc(PAGE_SIZE, sizeof(handle_t * [total_threads]));


    char *testname = (char*)"Fixed size wfqueue test";

    gettimeofday(&start_t, NULL);
    for (i = 0; i < arg_producer ; i++) {
        pthread_create(testThreads + i, 0, producing_fn,  &config);
    }
    for (; i < total_threads ; i++) {
        pthread_create(testThreads + i, 0, consuming_fn,  &config);
    }

    while (__sync_fetch_and_add(&config.nConsuming, 0) < TEST_MAX_INPUT * (config.nProducer)) {
      struct timeval curr;
      gettimeofday(&curr, NULL);
      if ((curr.tv_usec - start_t.tv_usec) >= (120 * 1000 * 1000)) { // 2 minute
          assert(0 && " too long to consuming the queue ");
      }
    }

    for (i = 0; i < total_threads; i++) {
        void *ret;
        pthread_join(testThreads[i], &ret);
        // free(ret);
    }

    gettimeofday(&end_t, NULL);


    diff_t = (double)(end_t.tv_usec - start_t.tv_usec) / 1000000 + (double)(end_t.tv_sec - start_t.tv_sec);


    printf("===END Test= %d - %s, test type %s ===\n", ++TEST_COUNT, testname, test_type);
    printf("======Total consuming = %zu\n", __sync_fetch_and_add(&config.nConsuming, 0));
    // printf("======Left over = %zu\n", wfq_size(config.q));
    printf("Execution time = %f\n", diff_t);
    // assert(wfq_size(config.q) == 0 && " still left over queue inside ");

    // wfq_destroy(config.q);
    free(config.q);
    free(config.hds);
    sleep(1);
    return 0;
}

int main(void) {
    int ret = 0, i;

    unsigned int n = sysconf(_SC_NPROCESSORS_ONLN); // Linux / MAC OS

    if (n > 1) {
        int NUM_PRODUCER = n/2;
        int NUM_CONSUMER = (n/2) - 1;
        int running_set = 1;

        for (i = 0; i < running_set; i++) {
            ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MPMC");
        }

        NUM_PRODUCER = n - 2;
        NUM_CONSUMER = 1;
        for (i = 0; i < running_set; i++) {
            ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MPSC");
        }

        NUM_PRODUCER = 1;
        NUM_CONSUMER = n - 2;
        for (i = 0; i < running_set; i++) {
            ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MCSP");
        }
    } else {
        ret = -1;
        printf("One thread is not enough for testing\n");
    }

    return ret;
}

Taheta avatar Jun 24 '19 06:06 Taheta

What's your gcc version: g++ --version?

chaoran avatar Jun 24 '19 06:06 chaoran

g++ (GCC) 4.8.5 20150623 (Red Hat 4.8.5-16) Copyright (C) 2015 Free Software Foundation, Inc. This is free software; see the source for copying conditions. There is NO warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

I compile with

gcc -DWFQUEUE -I./ -O3 main_test.c wfqueue.c -pthread -o test -g 

Taheta avatar Jun 24 '19 06:06 Taheta

Could you try your test after adding a pthread_barrier_wait after queue_register and another pthread_barrier_wait before queue_free in both consuming_fn and producing_fn?

chaoran avatar Jun 24 '19 06:06 chaoran

it stucking at barrier wait

0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 15 Thread 0x7fffbffff700 (LWP 651) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 14 Thread 0x7fffdcff9700 (LWP 650) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 13 Thread 0x7fffdd7fa700 (LWP 649) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 12 Thread 0x7fffddffb700 (LWP 647) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 11 Thread 0x7fffde7fc700 (LWP 646) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 10 Thread 0x7fffdeffd700 (LWP 645) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 9 Thread 0x7fffdf7fe700 (LWP 644) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 8 Thread 0x7fffdffff700 (LWP 643) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 7 Thread 0x7ffff4fec700 (LWP 641) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 6 Thread 0x7ffff57ed700 (LWP 640) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 5 Thread 0x7ffff5fee700 (LWP 639) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 4 Thread 0x7ffff67ef700 (LWP 638) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 3 Thread 0x7ffff6ff0700 (LWP 637) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0 2 Thread 0x7ffff77f1700 (LWP 633) "test" 0x00007ffff7bcb5dc in pthread_barrier_wait () from /usr/lib64/libpthread.so.0

Taheta avatar Jun 24 '19 07:06 Taheta

Did you initialize the barrier before you use it?

chaoran avatar Jun 24 '19 07:06 chaoran

Sorry I didnt aware, after I put. It crashed

===END Test= 2 - Fixed size wfqueue test, test type MPSC ===
======Total consuming = 14000000
Execution time = 2.729662
*** Error in `./test': free(): invalid next size (fast): 0x00000000011de000 ***
======= Backtrace: =========
/usr/lib64/libc.so.6(+0x81429)[0x7f237a348429]
./test[0x400ff4]
./test[0x400a92]
/usr/lib64/libc.so.6(__libc_start_main+0xf5)[0x7f237a2e93d5]
./test[0x400ad8]
======= Memory map: ========
00400000-00403000 r-xp 00000000 fd:05 4588452                            /home/booking/cground/fast-wait-free-queue/test
00602000-00603000 r--p 00002000 fd:05 4588452                            /home/booking/cground/fast-wait-free-queue/test
00603000-00604000 rw-p 00003000 fd:05 4588452                            /home/booking/cground/fast-wait-free-queue/test
011cb000-011ed000 rw-p 00000000 00:00 0                                  [heap]
7f22f8000000-7f22f9592000 rw-p 00000000 00:00 0
7f22f9592000-7f22fc000000 ---p 00000000 00:00 0
7f2300000000-7f2301540000 rw-p 00000000 00:00 0
7f2301540000-7f2304000000 ---p 00000000 00:00 0
7f2304000000-7f2305577000 rw-p 00000000 00:00 0
7f2305577000-7f2308000000 ---p 00000000 00:00 0
7f2308000000-7f230968e000 rw-p 00000000 00:00 0
7f230968e000-7f230c000000 ---p 00000000 00:00 0
7f230c000000-7f230d76d000 rw-p 00000000 00:00 0
7f230d76d000-7f2310000000 ---p 00000000 00:00 0
7f2310000000-7f231143e000 rw-p 00000000 00:00 0
7f231143e000-7f2314000000 ---p 00000000 00:00 0
7f2314000000-7f23156bc000 rw-p 00000000 00:00 0
7f23156bc000-7f2318000000 ---p 00000000 00:00 0
7f2318000000-7f2319640000 rw-p 00000000 00:00 0
7f2319640000-7f231c000000 ---p 00000000 00:00 0
7f231c000000-7f231d34a000 rw-p 00000000 00:00 0
7f231d34a000-7f2320000000 ---p 00000000 00:00 0
7f2320000000-7f23214ad000 rw-p 00000000 00:00 0

Taheta avatar Jun 24 '19 07:06 Taheta

Could you post your code after all the barrier related changes?

chaoran avatar Jun 24 '19 13:06 chaoran

/*
 * compile : g++ -std=c++11 -I./ OverallTest.cpp -pthread -Wall -o overalltest
 * execute
 * valgrind --fair-sched=yes ./overalltest
 */
// #include <iostream>
#include <stdio.h>
#include <time.h>
#include <pthread.h>
// #include <thread>

#include "queue.h"
#include <unistd.h>
#include <assert.h>
#include <sys/time.h>

#define MILLION  1000000
#define TEST_MAX_INPUT  MILLION

static pthread_barrier_t barrier;
typedef struct {
    size_t v;
} MyVal;

typedef struct {
    size_t nProducer;
    size_t nConsumer;
    size_t nProducing;
    size_t nConsuming;
    queue_t *q;
    handle_t **hds;
} wfq_test_config_t;

int TEST_COUNT = 0;


MyVal* newval(size_t digit) {
    MyVal *data = (MyVal*)malloc(sizeof(MyVal));
    data->v = digit;
    return  data;
}


static int id = 0;
void * producing_fn(void *v) {
    int z;
    wfq_test_config_t* config = (wfq_test_config_t*)v;
    queue_t *q = config->q;
    handle_t **hds = config->hds;
    int _id = __sync_fetch_and_add(&id, 1);
    hds[_id] = align_malloc(PAGE_SIZE, sizeof(handle_t));
    queue_register(q, hds[_id], _id);
    pthread_barrier_wait(&barrier);
    for (z = 0; z < TEST_MAX_INPUT; z++) {
        MyVal* s = newval(__sync_fetch_and_add(&config->nProducing, 1));

        enqueue(q, hds[_id], s);
        // wfq_sleep(1);
        // if (xx % 100000 == 0)
        //     printf("%zu\n", xx);
    }

    pthread_barrier_wait(&barrier);
    queue_free(q, hds[_id]);
    return NULL;
}
void * consuming_fn(void *v) {
    wfq_test_config_t* config = (wfq_test_config_t*)v;
    queue_t *q = config->q;
    handle_t **hds = config->hds;
    int _id = __sync_fetch_and_add(&id, 1);
    hds[_id] = align_malloc(PAGE_SIZE, sizeof(handle_t));
    queue_register(q, hds[_id], _id);
    pthread_barrier_wait(&barrier);

    for (;;) {
        MyVal* s;
        while ( (s = (MyVal*)dequeue(q, hds[_id]) )  ) {
            if (s->v % 100000 == 0) {
                printf("t %zu\n", s->v);
            }
            free(s);
            __sync_fetch_and_add(&config->nConsuming, 1);
        }
        if (__sync_fetch_and_add(&config->nConsuming, 0) >= TEST_MAX_INPUT * (config->nProducer)) {
            break;
        }
    }

    pthread_barrier_wait(&barrier);
    queue_free(q, hds[_id]);
    return NULL;
}

int running_wfq_test(size_t arg_producer, size_t arg_consumer, size_t arg_producing, size_t arg_consuming, const size_t total_threads, const char * test_type) {

    size_t i = 0;
    struct timeval start_t, end_t;
    double diff_t;
    wfq_test_config_t config;

    assert ((total_threads >= (arg_producer + arg_consumer)) && "not enough thread to test");

    pthread_t testThreads[total_threads];


    config.nProducer = arg_producer;
    config.nProducing = arg_producing;
    config.nConsumer = arg_consumer;
    config.nConsuming = arg_consuming;
    config.q = align_malloc(PAGE_SIZE, sizeof(queue_t));
    queue_init(config.q, total_threads);
    config.hds = align_malloc(PAGE_SIZE, sizeof(handle_t * [total_threads]));


    char *testname = (char*)"Fixed size wfqueue test";

    gettimeofday(&start_t, NULL);
    for (i = 0; i < arg_producer ; i++) {
        pthread_create(testThreads + i, 0, producing_fn,  &config);
    }
    for (; i < total_threads ; i++) {
        pthread_create(testThreads + i, 0, consuming_fn,  &config);
    }

    while (__sync_fetch_and_add(&config.nConsuming, 0) < TEST_MAX_INPUT * (config.nProducer)) {
      struct timeval curr;
      gettimeofday(&curr, NULL);
      if ((curr.tv_usec - start_t.tv_usec) >= (120 * 1000 * 1000)) { // 2 minute
          assert(0 && " too long to consuming the queue ");
      }
    }

    for (i = 0; i < total_threads; i++) {
        void *ret;
        pthread_join(testThreads[i], &ret);
        // free(ret);
    }

    gettimeofday(&end_t, NULL);


    diff_t = (double)(end_t.tv_usec - start_t.tv_usec) / 1000000 + (double)(end_t.tv_sec - start_t.tv_sec);


    printf("===END Test= %d - %s, test type %s ===\n", ++TEST_COUNT, testname, test_type);
    printf("======Total consuming = %zu\n", __sync_fetch_and_add(&config.nConsuming, 0));
    // printf("======Left over = %zu\n", wfq_size(config.q));
    printf("Execution time = %f\n", diff_t);
    // assert(wfq_size(config.q) == 0 && " still left over queue inside ");

    // wfq_destroy(config.q);
    free(config.q);
    free(config.hds);
    sleep(1);
    return 0;
}

int main(void) {
    int ret = 0, i;

    unsigned int n = sysconf(_SC_NPROCESSORS_ONLN); // Linux / MAC OS
    pthread_barrier_init(&barrier, NULL, n);
    if (n > 1) {
        int NUM_PRODUCER = n/2;
        int NUM_CONSUMER = n/2;
        int running_set = 1;
        
        for (i = 0; i < running_set; i++) {
          ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MPMC");
        }
        
        NUM_PRODUCER = n - 1;
        NUM_CONSUMER = 1;
        for (i = 0; i < running_set; i++) {
         ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MPSC");
        }

        NUM_PRODUCER = 1;
        NUM_CONSUMER = n-1;
        
        for (i = 0; i < running_set; i++) {
            ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MCSP");
        }
    } else {
        ret = -1;
        printf("One thread is not enough for testing\n");
    }

    return ret;

Taheta avatar Jun 24 '19 23:06 Taheta