concurrency icon indicating copy to clipboard operation
concurrency copied to clipboard

How to properly use a queue

Open WebFreak001 opened this issue 2 years ago • 7 comments

I'm trying to understand the API and writing an example to process a list of jobs that are known ahead of time, but I've got a few questions:

// uncomment this line:
//version = Alternative;

struct Job
{
	int workData;

	// implementation requirement: needed for MPSCQueue to work
	shared Job* next;
}

void main(string[] args) @safe
{
	// number of spawned OS threads
	size_t numWorkers = 16;

	import concurrency.thread : stdTaskPool;
	auto taskPool = stdTaskPool(numWorkers);

	import concurrency.data.queue.mpsc : MPSCQueue;
	auto queue = new MPSCQueue!Job;

	Job[] jobs = new Job[10000];
	foreach (i, ref job; jobs)
	{
		job.workData = cast(int)i;
		queue.push(&jobs[i]);
	}

	import concurrency.operations : then, via, on;
	import concurrency.sender : just;

	import std.array : array;
	import std.range : repeat;

	shared int i = -1;
	shared int[] calledTimes = new int[numWorkers];

	auto workerTasks =
		just(queue) // binding (i, queue) into the callback
		.then(
			// why isn't the queue forced to be shared? I think it should be, no?
			(MPSCQueue!Job queue) shared @safe {
				import core.atomic : atomicOp;
				int thisTask = i.atomicOp!"+="(1);

				// on each thread, pop queue until empty and run performJob
				while (auto job = queue.pop())
				{
					performJob(thisTask, *job);
					calledTimes[thisTask].atomicOp!"+="(1);
				}
			})
		.on(taskPool.getScheduler()) // when run, run on new thread
		.repeat(numWorkers)
		.array;

	import concurrency.operations : whenAll;
	import concurrency.syncwait : syncWait;
	whenAll(workerTasks).syncWait();

	import std.stdio;

	writeln("distribution: ", (() @trusted => cast(int[])calledTimes)());
}

void performJob(int runnerId, ref Job job) @safe
{
	version (Alternative)
	{
		import core.thread;

		Thread.sleep(1.msecs);
	}
	else
	{
		import std.stdio;

		writeln(runnerId, ": running job #", job.workData);
	}
}

MPSCQueue.pop does not seem to be thread-safe? it doesn't have shared on it, but also the API neither forces me to make it shared, nor does it forbid this kind of usage. In this code, when you uncomment the version at the top, you can see that if you increase the jobs high enough / let it run long enough, it eventually dead-locks / races into a lock. (using writeln has some weird internal locks, causing a non-uniform load distribution, which doesn't seem to dead-lock)

Q: What type should I rather use for a queue that I can call pop on in parallel? (use-case: one thread generates jobs, lots of worker threads distribute work for each item)

Q2: Calling whenAll with the same task in multiple arguments running the task multiple times feels quite foreign to me, but from the unittests I saw this usage, so I adapted it here. But is this really intended behavior?

Q3: how to I properly pass the queue into my job processor? (I would expect something like just making the queue argument shared ref in the callback, assuming the queue would be a struct that can't be copied, but at least with the class version this doesn't compile)

overarching Q: should I even be writing my code like this in the first place? Is there a better way?

WebFreak001 avatar Mar 08 '23 23:03 WebFreak001

I saw streams existed as well, but this doesn't do what I expect:

struct Job
{
	int workData;
}

void main(string[] args) @safe
{
	// number of spawned OS threads
	size_t numWorkers = 16;

	import concurrency.thread : stdTaskPool;
	auto taskPool = stdTaskPool(numWorkers);

	Job[] jobs = new Job[8000];
	foreach (i, ref job; jobs)
		job.workData = cast(int)i;

	import concurrency.operations : then, via, on;
	import concurrency.stream : arrayStream;
	import concurrency.syncwait : syncWait;

	jobs
		.arrayStream
		.collect((Job job) shared @safe => performJob(0, job))
		.on(taskPool.getScheduler())
		.syncWait;

}

void performJob(int runnerId, ref Job job) @trusted
{
	import core.thread;

	Thread.sleep(1.msecs);
}

(it doesn't run them in parallel, since this takes 8000 msecs instead of 8000/16 msecs)

WebFreak001 avatar Mar 08 '23 23:03 WebFreak001

There is no need to do manual queuing:

#!/usr/bin/env dub
/+ dub.sdl:
 name "jobs"
 dependency "concurrency" version="*"
 dflags "-dip1000"
 +/

import concurrency;
import std.range : iota;
import std.algorithm : map;

struct Job
{
  int workData;
}

void main(string[] args) @safe
{
  import concurrency.thread : stdTaskPool;
  import concurrency.operations : then, on, whenAll;
  import std.array : array;

  // number of spawned OS threads
  size_t numWorkers = 16;

  auto taskPool = stdTaskPool(numWorkers);
  scope scheduler = taskPool.getScheduler();

  iota(10000)
    .map!(i => just(Job(i)).then(&performJob).on(scheduler))
    .array
    .whenAll
    .syncWait;
}

void performJob(ref Job job) @trusted
{
  import core.thread;

  Thread.sleep(1.msecs);
}

skoppe avatar Mar 09 '23 22:03 skoppe

MPSCQueue.pop does not seem to be thread-safe? it doesn't have shared on it, but also the API neither forces me to make it shared, nor does it forbid this kind of usage. In this code, when you uncomment the version at the top, you can see that if you increase the jobs high enough / let it run long enough, it eventually dead-locks / races into a lock. (using writeln has some weird internal locks, causing a non-uniform load distribution, which doesn't seem to dead-lock)

Yes. just is breaking the law here. It should only accept non-aliased data. I have a branch working on a fix, but it needs to be backwards compatible to allow users to update (myself included).

Also, this is a multi-producer-single-consumer queue, and you were using it completely opposite (single-producer-multi-consumer). Yes, had just been implemented correctly, it would have screamed at you, forcing you to make it shared, at which point pop would stop working (its non shared).

In fact, the queue was designed to work correctly with shared, see the producer method, which returns a wrapper that is shared but can only push.

skoppe avatar Mar 09 '23 22:03 skoppe

it eventually dead-locks / races into a lock.

This is because you are pop-ing from multiple threads. Again, I need to fix just so that this becomes impossible.

skoppe avatar Mar 09 '23 22:03 skoppe

Also, this is a multi-producer-single-consumer queue, and you were using it completely opposite (single-producer-multi-consumer). Yes, had just been implemented correctly, it would have screamed at you, forcing you to make it shared, at which point pop would stop working (its non shared).

ah that explains it, I was thinking what MPSCQueue would otherwise stand for and would have otherwise just casted away the shared, knowing that it will probably break because of this. (a ddoc comment would be nice here)

WebFreak001 avatar Mar 10 '23 13:03 WebFreak001

I have also very roughly benchmarked overhead memory & time of concurrency here for your example:

image

nice and constant overhead (so RAM and time growing linearly with number of jobs when queuing all at once)

now it's probably also quite interesting to see if it's possible to dynamically queue all the tasks (e.g. calling whenAll with a range instead of array) and to see if it makes it constant memory use instead of linear memory use

WebFreak001 avatar Mar 10 '23 17:03 WebFreak001

now it's probably also quite interesting to see if it's possible to dynamically queue all the tasks (e.g. calling whenAll with a range instead of array) and to see if it makes it constant memory use instead of linear memory use

whenAll currently doesn't accept a range. Supporting those with lengths would be easy, but others are more difficult.

You can switch to using a Nursery, which is intended for a dynamic job sets. There are allocations involved though, likely more than with a static list.

skoppe avatar Mar 10 '23 23:03 skoppe