async-task
async-task copied to clipboard
Can there be no additional scheduling of tasks when Task cancel or Detach
The following code is logic in my dynamic call based async-task project. And Here, I didn't let the task reschedule to run, but it didn't seem any problem.
- task.rs
//Cancel task execution (may be called only once because ownership will be taken for either a cancer or drop)
fn set_canceled(&mut self) {
let raw = unsafe { self.raw.as_mut() };
let mut state = raw.state.load(Ordering::Acquire);
loop {`
//If the task has been completed or closed, it does not need to be cancelled.
if state & (COMPLETED | CLOSED) != 0 {
break;
}
match raw.state.compare_exchange_weak(
state,
state | CLOSED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
//Notify watcher if task is in pendding status
//Drop Future
//Reduce reference counts. At this point task is definitely not detached from task, so there is no real destory RawTask.
//When the task is detached, the task is truly destroyed
//It is only possible that the notified here has other tasks
if state & (SCHEDULED | RUNNING) == 0 {
raw.notify(None);
raw.drop_future();
raw.drop_ref();
}
}
Err(s) => state = s,
}
}
}
unsafe fn set_detached(&mut self) -> Option<T> {
let raw = self.raw.as_mut();
let mut output = None;
//Most often dissociate when created for a task, at which point only simple removal of the task Association mark is necessary
if let Err(mut state) = raw.state.compare_exchange_weak(
SCHEDULED | TASK | REFERENCE,
SCHEDULED | REFERENCE,
Ordering::AcqRel,
Ordering::Acquire,
) {
//Detach directly from RawTask
state = raw.state.fetch_and(!TASK, Ordering::AcqRel);
//If the RawTask has been completed but not closed
//Then need to take out the results of the task.
if state & (COMPLETED | CLOSED) == COMPLETED {
output = Some((raw.output as *mut T).read());
}
//If the task reference is already zero when detach, the task needs to be destroyed.
if state & !(REFERENCE - 1) == 0 {
raw.destroy();
}
}
output
}
fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
unsafe {
let raw = self.raw.as_mut();
let mut state = raw.state.load(Ordering::Acquire);
// If the task is cancelled, register yourself to the task and wait for the task to cancel.
//No task execution results are acquired at this time.
if state & CLOSED != 0 {
//If the task is scheduled or running, we need to wait until its future is abandoned.
if state & (SCHEDULED | RUNNING) != 0 {
//The awaiter is replaced by a waker associated with the current task.
raw.register(cx.waker());
//Reload the state after registering. It is possible changes occurred just
// before registration so we need to check for that.
state = raw.state.load(Ordering::Acquire);
//If the task is still planned or running, we need to wait because its future has not been abandoned.
if state & (SCHEDULED | RUNNING) != 0 {
return Poll::Pending;
} else {
//Otherwise, we remove the walker just registered anyway.
let _rtn = raw.take();
}
}
return Poll::Ready(None);
}
// If the task is not completed, register the current waker.
if state & COMPLETED == 0 {
//Replacing the awaker with an arousal associated with the current task。
raw.register(cx.waker());
// Reload the state after registering. It is possible that the task became
// completed or closed just before registration so we need to check for that.
state = raw.state.load(Ordering::Acquire);
// It is not possible to get here because the only one that can now operate the rawtask status is our own.
// if state & CLOSED != 0 {
// continue;
// }
// If the task is still not completed, we're blocked on it
if state & COMPLETED == 0 {
return Poll::Pending;
} else {
raw.take();
}
}
raw.state.fetch_or(CLOSED, Ordering::AcqRel);
//从任务中获取输出。
let output = raw.output as *mut T;
return Poll::Ready(Some(output.read()));
}
}
*raw.rs
pub(crate) unsafe fn run(&mut self) -> bool {
println!("runing");
let mut state = self.state.load(Ordering::Acquire);
loop {
//Task switched to RUNNING state.
let new = (state & !SCHEDULED) | RUNNING;
match self
.state
.compare_exchange_weak(state, new, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => {
state = new;
break;
}
Err(s) => state = s,
}
}
//Start execution future
let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(
self as *const _ as *const (),
&Self::RAW_WAKER_VTABLE,
)));
let cx = &mut Context::from_waker(&waker);
let poll = self.dynfn.as_mut().poll(cx);
match poll {
Poll::Ready(_) => {
loop {
//Task switch to COMPLETED state
let new = if state & TASK == 0 {
(state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
} else {
(state & !RUNNING & !SCHEDULED) | COMPLETED
};
match self.state.compare_exchange_weak(
state,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
if state & TASK == 0 || state & CLOSED != 0 {
self.drop_output();
}
if state & AWAITER != 0 {
self.notify();
}
self.drop_ref();
break;
}
Err(s) => {
state = s;
}
}
}
}
Poll::Pending => loop {
let new = if state & CLOSED != 0 {
state & !RUNNING & !SCHEDULED
} else {
state & !RUNNING
};
match self.state.compare_exchange_weak(
state,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
if state & CLOSED != 0 {
if state & AWAITER != 0 {
self.notify();
}
self.drop_future();
self.drop_ref();
} else if state & SCHEDULED != 0 {
self.schedule();
return true;
}
break;
}
Err(s) => state = s,
}
},
}
false
}