async-oneshot
async-oneshot copied to clipboard
Waker update synchronization
Hi, I could not fully figure out how waker updates are synchronized.
If a waker is already registered (be it send
or recv
) and is later updated, the SEND
(or RECV
) bit stay set during the update so it would seem that nothing prevents a waker from being read while updated? Sorry if I missed something here.
looks like you're right.
i have a PR on this repo open for a rewrite that makes it usable more than once. i don't think that version suffers from the same problem, but not many people have reviewed it.
i'm also struggling for enthusiasm to write rust recently. i spent several months failing to get a rust job, so motivation to work on complicated low level things is not high right now :/
Sorry to hear about that, indeed rust jobs are not plentiful yet sadly...
I decided to bite the bullet and write my own reusable one-shot channel. Guaranteeing truly lock-free send, receive, and recycle operations while totally preventing reallocations of the waker turned out to be much harder than for a plain one-shot channel, though. I think I have found an efficient way to achieve that, but I need to run it through Loom before I am totally confident it works. I will probably have a look later at the PR you mentioned.
The PR turns it into a capacity one reusable channel. it pays some price for this relative to the platonic ideal of a oneshot channel, but i think there may be ways to overcome at least some of this, it just requires motivation.
it's been a while since i looked at the actual oneshot code until yesterday, all my focus has been on that PR. strongly recommend you look at that. i am aware it oversynchronises in places, but i don't think it's far off the platonic ideal.
my suspicion is that if you want the thing that makes this library interesting, waiting for receiver, you might not be able to get everything totally lockfree. there is also oneshot you may want to take a look at, although it would be nice if you could bring your updates over to this repo so that anyone using it can have an easy upgrade path. happy to hand out permission bits.
TBH I actually don't need the wait-for-receiver feature, my initial review of this crate was because I was curious if it could be turned into a channel which receiver can recycle senders.
The PR does appear to offer just that with the recover
method, but from a cursory glance at the code my suspicion is that recover
may fail spuriously if called just after receiving a message because the sender may still hold the lock. This is because (if I understand well) the message is "sent" before the lock is released, which I think is indeed correct and necessary to prevent the receiver from registering a new waker when the sender is about to notify the receiver task. If the above analysis is correct, however, it means that getting a new sender after receiving a message is actually a blocking operation since one would need to spin until the sender releases the lock so recover
can succeed.
As you see my needs are pretty specific so I am afraid what I am implementing would not be of general interest and would further add complexity to what is already a fairly meaty PR (my implementation uses a redundant waker slot to achieve lock-freedom + a fairly non-trivial state machine).
So I would suggest to close this issue since the PR seems to address the particular point I raised, though perhaps it would be good to put in the meantime a warning in the README until the PR lands.
oh, that's an interesting approach. and quite an interesting problem, heh.
Would the issue be addressed in oneshot/non-hatch-branch with the below change? It just zeros the send/recv bits during the move. Assuming that does what I think it does, it's also an extra atomic operation to prevent against an impossible situation in mainline, but it might still be useful. I'm also not seeing any changes in a quick little cargo bench
, so perf impact is probably not too bad. Is this right and should I open a PR, or did I make a massive oversight anywhere?
diff --git a/src/inner.rs b/src/inner.rs
index 262189f..c24bc47 100644
--- a/src/inner.rs
+++ b/src/inner.rs
@@ -47,6 +47,7 @@ impl<T> Inner<T> {
#[inline(always)]
pub(crate) fn set_recv(&self, waker: Waker) -> State {
let recv = self.recv.get();
+ self.state.fetch_and(!RECV, AcqRel);
unsafe { (*recv).as_mut_ptr().write(waker) } // !
State(self.state.fetch_or(RECV, AcqRel))
}
@@ -63,6 +64,7 @@ impl<T> Inner<T> {
#[inline(always)]
pub(crate) fn set_send(&self, waker: Waker) -> State {
let send = self.send.get();
+ self.state.fetch_and(!SEND, AcqRel);
unsafe { (*send).as_mut_ptr().write(waker) } // !
State(self.state.fetch_or(SEND, AcqRel))
}
Would the issue be addressed in oneshot/non-hatch-branch with the below change?
Sorry, couldn't find that branch, do you have a link?
So only based on on the main
branch and on my very faint memory of how this library works, this code fragment suggest that this does not solve the race:
https://github.com/irrustible/async-oneshot/blob/9d2f61ddd10f6304fb6194a04174c3ae38c4e33e/src/sender.rs#L59-L60
Say:
- the sender reads
state.recv() == true
- the
RECV
bit is cleared by the receiver - the sender and receiver now may both access the waker concurrently (data race and thus UB)
Or is this scenario avoided in the non-hatch-branch
you mention?
Oh, the non-hatch branch is just mainline. IThe PR with the rewrite @jjl referred to was apparently called async-hatch, I can see how I was confusing, sorry :)
Good catch on that fragment, I'll keep looking at this.
correctly solving it would require introducing a LOCK flag which is taken before touching the wakers.
i'm sure i've written it with such a flag before, but maybe i'm just remembering async-hatch...
i had a look at the code again yesterday while procrastinating doing my paperwork and i here are the flags i've got in my current working copy (which alas is very broken because at some point i started refactoring it apparently...)
#[repr(u8)]
/// The flags are split into two halves - shared and local.
pub(crate) enum Flags {
/// The receiver promises not to touch the inner again.
ReceiverClosed = 1,
/// The sender promises not to touch the inner again.
SenderClosed = 1 << 1,
/// Lock bit for the wakers.
Lock = 1 << 2,
/// The receiver's waker is present
ReceiverWaker = 1 << 3,
/// The sender's waker is present
SenderWaker = 1 << 4,
/// The Sender has sent a value.
Value = 1 << 5,
}