obqueue icon indicating copy to clipboard operation
obqueue copied to clipboard

A simple fast MPMC obstruction-free linearizable concurrent queue for C (based on linux[futex]/gcc/x86_64)

obqueue

obqueue.h is a awesome fast/simple concurrent queue, and the dequeue is blocking while there is no elements. obqueue_no_blocking.h is non-blocking relative to obqueue.h, dequeue just return NULL while there is no elements.

Prohibition

0(NULL) unable to enter the queue.

Notes

1: The long type is -2^63 ~ (2^63-1), Even call 100 million times per second(so Absolutely not so much in the actual environment), We can continue to use it for (2^63-1) / 100,000,000 / 3600(seconds) / 24(hours) / 365(days) == 2924.71209 (years)!

2: if there is one thread(It is likely to be dequeuer) Particularly slow, May slow down memory reclaim, so every consumer should be consumed on average.

test_obqueue_

gcc -pthread -g -o test_obq test_obq.c

./test_obq 2500000 8

use case(obqueue.h):test_obq.c


#include "obqueue.h"

#define THREAD_NUM 4
long COUNTS_PER_THREAD = 2500000;
int threshold = 8;
obqueue_t qq;

void* producer(void* index) {
	obqueue_t* q = &qq;		
	handle_t* th = (handle_t*) malloc(sizeof(handle_t));
	memset(th, 0, sizeof(handle_t));
	// register as enqueuer.
	ob_queue_register(q, th, ENQ);
	
	for(int i = 0; i = 3) {
		COUNTS_PER_THREAD = atol(argv[1]);
		threshold = atoi(argv[2]);	
	}
	
	printf("take %ld ops\n", THREAD_NUM * COUNTS_PER_THREAD);
	fflush(stdout);

	array = (int*) malloc((1 + THREAD_NUM * COUNTS_PER_THREAD) * sizeof(int));
	memset(array, 0, (1 + THREAD_NUM * COUNTS_PER_THREAD) * sizeof(int));
	ob_init_queue(&qq, THREAD_NUM, THREAD_NUM, threshold);

	struct timeval start;
	gettimeofday(&start, NULL);
	
	pthread_t pids[THREAD_NUM];
	
	for(int i = 0; i 

use case(obqueue_no_blocking.h):test_obq_no_blocking.c


#include "obqueue_no_blocking.h"

long COUNTS_PER_THREAD = 2500000;
int threshold = 8;
obqueue_t qq;

int* array;
void* produce_and_consume(void* index) {
	obqueue_t* q = &qq;		
	handle_t* th = (handle_t*) malloc(sizeof(handle_t));
	memset(th, 0, sizeof(handle_t));
	ob_queue_register(q, th, ENQ | DEQ);
	
	for(int i = 0; i = 3) {
		COUNTS_PER_THREAD = atol(argv[1]);
		threshold = atoi(argv[2]);	
	}
	
	printf("here %ld\n", THREAD_NUM * COUNTS_PER_THREAD);
	array = (int*) malloc((1 + THREAD_NUM * COUNTS_PER_THREAD) * sizeof(int));
	memset(array, 0, (1 + THREAD_NUM * COUNTS_PER_THREAD) * sizeof(int));
	fflush(stdout);
	ob_init_queue(&qq, THREAD_NUM, THREAD_NUM, threshold);

	struct timeval start;
	gettimeofday(&start, NULL);
	
	pthread_t pids[THREAD_NUM];
	for(int i = 0; i