cppcoro icon indicating copy to clipboard operation
cppcoro copied to clipboard

Add async_semaphore

Open lewissbaker opened this issue 6 years ago • 14 comments

lewissbaker avatar Jul 21 '17 14:07 lewissbaker

We happen to need a semaphore in cppcoro. I looked at it briefly. The main difference to async_mutex is that we need to store the counter instead of simply "unlocked". I see two possibilities: exploit that the first 64K of pointer space are free in Windows (other OSes?) and keep a lock-free implementation with the counter limited to 64K, or use a std::variant, but this won't be lock-free. Any preference? Or something completely else?

schoedl avatar Feb 12 '19 23:02 schoedl

The implementation would likely be closer to that of async_auto_reset_event which is essentially a binary-semaphore.

This implementation keeps track of the count of calls to 'set' and 'wait' by combining the 2 32-bit counts into a single 64-bit integer. It's implementation currently ensures that 'set' can never get more than 1 above the 'wait' count. If this is generalised to avoid getting more than N above, where N is a constructor parameter to the semaphore, then a similar strategy should serve for an async_semaphore class.

lewissbaker avatar Feb 13 '19 03:02 lewissbaker

I have trouble using either one as a template. Both exploit the fact that they have a singleton resource, so releasing of waiters is sequenced and thus the list of waiters can be copied as a whole from the atomic list to a non-atomic list and worked on there. I don't see how this works for semaphores. Unlocks may arrive any time, thus I need to atomically dequeue a single waiter at a time. AFAIK, lock-free stack pop already needs a double CAS to avoid ABA, and I would actually need a queue for fairness.

The most pedestrian way would be a regularly locked state, being a variant of a queue of waiters or a count of free resoures. The mutex could in turn be an async_mutex.

Am I missing something? What do you recommend?

schoedl avatar Feb 13 '19 17:02 schoedl

The implementation of async_auto_reset_event actually has an implicit mutex/lock built in to it.

The first thread to increment either 'set' or 'wait' count such that both are non-zero acquires the lock and is responsible for dequeueing waiters and resuming N = min(setCount, waitCount) of them from the list. It then atomically decrements both the 'set' and 'wait' count by N and if the new result has at least one of 'set' or 'wait' count equal to zero then the lock is considred released. Otherwise, some other thread has called set or enqueued more waiters and the current thread must go around the loop again and dequeue some more waiters.

Once it has successfully released the lock then it can go and resume the list of waiters that it dequeued.

lewissbaker avatar Feb 13 '19 19:02 lewissbaker

I think I see it.

Something else:

In async_auto_reset_event::set(), you get the first oldState with std::memory_order_relaxed, but on failure require std::memory_order_acquire. Is this correct?

	std::uint64_t oldState = m_state.load(std::memory_order_relaxed); // CORRECT?
	do
	{
		if (local::get_set_count(oldState) > local::get_waiter_count(oldState))
		{
			// Already set.
			return;
		}

		// Increment the set-count
	} while (!m_state.compare_exchange_weak(
		oldState,
		oldState + local::set_increment,
		std::memory_order_acq_rel,
		std::memory_order_acquire));

schoedl avatar Feb 14 '19 03:02 schoedl

Yes. I think the compare_exchange failure case can be changed to relaxed too

lewissbaker avatar Feb 16 '19 02:02 lewissbaker

I hope what I implemented is what you envisioned. It was very simple, more about leaving things out than adding anything:

https://github.com/think-cell/cppcoro/commits/semaphore

Could you review the code?

I wrote a simple test with a thread pool of 100 threads which try to acquire a semaphore of 10 resources. AFAIK, the only suspension point is the acquisition of the semaphore. In the beginning, all 10 resources are acquired. But over time, the maximum number of simultaneously acquired resources goes down, and eventually reaches 1 or 2. I think I understand why this is: whenever there is contention, the acquiring coroutine suspends, and is later resumed not on its original thread but on the thread that just released the semaphore. The original thread that the acquiring coroutine suspended on does not get used anymore, and sits idle forever.

What are your thoughts on this?

cppcoro::async_auto_reset_event semaphore(10);
std::atomic<int> n(0);
cppcoro::static_thread_pool threadPool;

void test() noexcept {

	auto makeTask = []() -> cppcoro::task<>
	{
		co_await threadPool.schedule();
		for(;;) {
			co_await semaphore;
			int m=n.fetch_add(1);
			OutputDebugString(tc::make_c_str<TCHAR>(tc::as_dec(m), _T("\n")));
			_ASSERT(m<10)
			Sleep(1);
			--n;
			semaphore.set();
		}
	};

	std::vector<cppcoro::task<>> tasks;
	for (int i = 0; i < 100; ++i)
	{
		tasks.push_back(makeTask());
	}

	cppcoro::sync_wait(cppcoro::when_all(std::move(tasks)));
}

Thanks for all your help!

schoedl avatar Feb 22 '19 12:02 schoedl

The diff looks to be taking the right approach. I've added a few comments to the commit.

With regards to the ever-reducing number of concurrent operations, the approach I've been taking with some of the similar types like sequence_barrier is having the 'wait' operation take a scheduler which is used to schedule the resumption of the coroutine if the operation does not complete synchronously rather than resuming it inline.

For example, see this method: https://github.com/lewissbaker/cppcoro/blob/87605fabb3aabae36065d6e1289d42243ae51269/include/cppcoro/sequence_barrier.hpp#L94-L98

This would ensure that you don't start serialising producer/consumer by having the consumer coroutine steal the thread of the producer coroutine when the producer calls semaphore.set().

lewissbaker avatar Feb 23 '19 06:02 lewissbaker

This puts the responsibilty which scheduler to use into the releasing function, but likely the acquiring function knows better how CPU intensive it is. Like when acquiring three async locks in a row, you want to lock the three locks on the releasing thread, but when starting the CPU-intensive part, you reschedule. The TLS could carry a flag if the thread is “borrowed”, set if resuming a subroutine on the current thread, and not in case the thread comes from a thread pool or a completion port. “reschedule” would then either be a noop or actually reschedule.

Maybe that’s all a bit too crazy. I can pass a scheduler.

-- Dr. Arno Schödl | [email protected] CTO

We are looking for C++ Developers: https://www.think-cell.com/developers


think-cell Software GmbH https://www.think-cell.comhttps://www.think-cell.com/ Chausseestr. 8/E phone +49 30 666473-10 10115 Berlin, Germany US phone +1 800 891 8091 Amtsgericht Berlin-Charlottenburg, HRB 180042 Directors: Dr. Markus Hannebauer, Dr. Arno Schödl Please refer to our privacy policyhttps://www.think-cell.com/privacy on how we protect your personal data. On 23. Feb 2019, at 07:49, Lewis Baker <[email protected]mailto:[email protected]> wrote:

The diff looks to be taking the right approach. I've added a few comments to the commit.

With regards to the ever-reducing number of concurrent operations, the approach I've been taking with some of the similar types like sequence_barrier is having the 'wait' operation take a scheduler which is used to schedule the resumption of the coroutine if the operation does not complete synchronously rather than resuming it inline.

For example, see this method: https://github.com/lewissbaker/cppcoro/blob/87605fabb3aabae36065d6e1289d42243ae51269/include/cppcoro/sequence_barrier.hpp#L96

This would ensure that you don't start serialising producer/consumer by having the consumer coroutine steal the thread of the producer coroutine when the producer calls semaphore.set().

— You are receiving this because you commented. Reply to this email directly, view it on GitHubhttps://github.com/lewissbaker/cppcoro/issues/24#issuecomment-466623060, or mute the threadhttps://github.com/notifications/unsubscribe-auth/AD_nC8JCMIrkVJ8tQkWGVD7VRM0F2OYEks5vQORugaJpZM4OfdQG.

schoedl avatar Feb 23 '19 13:02 schoedl

This puts the responsibilty which scheduler to use into the releasing function...

Actually, the responsibility of which scheduler to use is given to the caller of wait(). eg.

task<void> consumer(async_semaphore& sem, static_thread_pool& tp) {
  // Pass thread-pool into the wait().
  // If the wait() does not complete synchronously then it will be resumed on the thread-pool.
  co_await sem.wait(1, tp);
  do_work();

  // This release call will schedule resumption of any waiting coroutines sitting in a wait() call
  // onto the scheduler specified as a parameter to the wait() call.
  sem.release();
}

I think what you're getting at with regards to acquiring multiple resources and only wanting to schedule once all three are acquired may be addressed by instead having the wait() operation return a bool value indicating whether the operation completed synchronously or asynchronously. eg.

bool completedSynchronously = co_await sem1.wait();
completedSynchronously &= co_await sem2.wait();
completedSynchronosly &= co_await sem3.wait();
if (!completedSynchronously) {
  // Reschedule onto the thread-pool once we are about to start work.
  co_await tp.schedule();
}|

There are still cases where it makes sense for the coroutine to be rescheduled onto the thread-pool, even if it is already executing on the thread-pool as this can allow a greater degree of parallelism between coroutines. Otherwise you can end up with the situation like you described where the coroutines are just running synchronously inside the call to .release() call that released the resources and thus blocking the producer that called .release().

lewissbaker avatar Feb 27 '19 01:02 lewissbaker

I just checked http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2019/p1135r3.html and could not find a saturating semaphore. You can provide a maximum value for the semaphore count, but not going over it is a precondition to release, without saturation semantics. Do you really want saturation semantics in your library?

schoedl avatar Mar 01 '19 19:03 schoedl

I have a use-case for the current saturating behaviour of async_auto_reset_event.

This is useful for cases where there might be many updates to a shared data-structure and after each update we want to set a dirty flag (in this case the event) such that the coroutine processing the changes suspend until there is at least one change and will be woken up, reset the dirty flag and then process all pending changes before suspending waiting for the dirty flag to be set again.

But perhaps this is necessarily different from a binary_semaphore which does not have saturating behaviour.

lewissbaker avatar Mar 01 '19 21:03 lewissbaker

I think this does not come up so often because the dirtiness is often part of the data structure itself. As a common example, when adding an item to a queue, you lock the data structure, add the item, remember if you went from 0 to 1 item, and if so, after unlocking, wake up the consumer. The consumer consumes items. It can lock and empty the queue in one go or release the lock in between items. Only if the queue is empty it would go to sleep, and then require another wake-up.

This separation of dirtiness and wake-up method has the advantage of being flexible about the wake-up method, for example a window message would work, too.

Also, releasing the consumer while the producer is still locking the datastructure may lead to unnecessary contention on the lock. The consumer wakes up, tries to lock, and blocks/suspends waiting for the producer to release the lock. Doing it outside the lock may lead to a release of the producer after someone else changed the data structure again, released the consumer, the consumer consumed the item and is already asleep again. So the wake-up would be unnecessary.

Just my two cents.

schoedl avatar Mar 01 '19 22:03 schoedl

I checked in a new version, with small check-in steps for easier review. I am not sure if async_auto_reset_event should stay related to async_semaphore, because the latter got a bit more complicated because I needed signed resources counts. I do need a retire function to remove resources from the pool, without waiting for it to happen, which is implemented by allowing the number of resources to become negative. I did not implement the scheduler extension because I did not need it, and it is something that probably could be done in many places in the library. For naming, I sticked to P1135.

Thanks for any suggestions.

schoedl avatar Mar 04 '19 17:03 schoedl