Skip to content

Schedule function holds a reference to RawTask #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
3 commits merged into from
Dec 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 33 additions & 19 deletions src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::Task;
/// The vtable for a task.
pub(crate) struct TaskVTable {
/// The raw waker vtable.
pub(crate) raw_waker: RawWakerVTable,
pub(crate) raw_waker_vtable: RawWakerVTable,

/// Schedules the task.
pub(crate) schedule: unsafe fn(*const ()),
Expand Down Expand Up @@ -119,7 +119,7 @@ where
state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE),
awaiter: Cell::new(None),
vtable: &TaskVTable {
raw_waker: RawWakerVTable::new(
raw_waker_vtable: RawWakerVTable::new(
Self::clone_waker,
Self::wake,
Self::wake_by_ref,
Expand Down Expand Up @@ -198,6 +198,14 @@ where

/// Wakes a waker.
unsafe fn wake(ptr: *const ()) {
// This is just an optimization. If the schedule function has captured variables, then
// we'll do less reference counting if we wake the waker by reference and then drop it.
if mem::size_of::<S>() > 0 {
Self::wake_by_ref(ptr);
Self::drop_waker(ptr);
return;
}

let raw = Self::from_ptr(ptr);

let mut state = (*raw.header).state.load(Ordering::Acquire);
Expand Down Expand Up @@ -238,13 +246,9 @@ where
Ok(_) => {
// If the task is not yet scheduled and isn't currently running, now is the
// time to schedule it.
if state & (SCHEDULED | RUNNING) == 0 {
if state & RUNNING == 0 {
// Schedule the task.
let task = Task {
raw_task: NonNull::new_unchecked(ptr as *mut ()),
_marker: PhantomData,
};
(*raw.schedule)(task);
Self::schedule(ptr);
} else {
// Drop the waker.
Self::drop_waker(ptr);
Expand Down Expand Up @@ -284,8 +288,8 @@ where
Err(s) => state = s,
}
} else {
// If the task is not scheduled nor running, we'll need to schedule after waking.
let new = if state & (SCHEDULED | RUNNING) == 0 {
// If the task is not running, we can schedule right away.
let new = if state & RUNNING == 0 {
(state | SCHEDULED) + REFERENCE
} else {
state | SCHEDULED
Expand All @@ -299,14 +303,16 @@ where
Ordering::Acquire,
) {
Ok(_) => {
// If the task is not scheduled nor running, now is the time to schedule.
if state & (SCHEDULED | RUNNING) == 0 {
// If the task is not running, now is the time to schedule.
if state & RUNNING == 0 {
// If the reference count overflowed, abort.
if state > isize::max_value() as usize {
std::process::abort();
}

// Schedule the task.
// Schedule the task. There is no need to call `Self::schedule(ptr)`
// because the schedule function cannot be destroyed while the waker is
// still alive.
let task = Task {
raw_task: NonNull::new_unchecked(ptr as *mut ()),
_marker: PhantomData,
Expand All @@ -325,7 +331,7 @@ where
/// Clones a waker.
unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
let raw = Self::from_ptr(ptr);
let raw_waker = &(*raw.header).vtable.raw_waker;
let raw_waker_vtable = &(*raw.header).vtable.raw_waker_vtable;

// Increment the reference count. With any kind of reference-counted data structure,
// relaxed ordering is appropriate when incrementing the counter.
Expand All @@ -336,7 +342,7 @@ where
std::process::abort();
}

RawWaker::new(ptr, raw_waker)
RawWaker::new(ptr, raw_waker_vtable)
}

/// Drops a waker.
Expand All @@ -360,7 +366,7 @@ where
(*raw.header)
.state
.store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
((*raw.header).vtable.schedule)(ptr);
Self::schedule(ptr);
} else {
// Otherwise, destroy the task right away.
Self::destroy(ptr);
Expand Down Expand Up @@ -393,10 +399,18 @@ where
unsafe fn schedule(ptr: *const ()) {
let raw = Self::from_ptr(ptr);

(*raw.schedule)(Task {
// If the schedule function has captured variables, create a temporary waker that prevents
// the task from getting deallocated while the function is being invoked.
let _waker;
if mem::size_of::<S>() > 0 {
_waker = Waker::from_raw(Self::clone_waker(ptr));
}

let task = Task {
raw_task: NonNull::new_unchecked(ptr as *mut ()),
_marker: PhantomData,
});
};
(*raw.schedule)(task);
}

/// Drops the future inside a task.
Expand Down Expand Up @@ -448,7 +462,7 @@ where
// Create a context from the raw task pointer and the vtable inside the its header.
let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(
ptr,
&(*raw.header).vtable.raw_waker,
&(*raw.header).vtable.raw_waker_vtable,
)));
let cx = &mut Context::from_waker(&waker);

Expand Down
60 changes: 41 additions & 19 deletions tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,132 +109,132 @@ macro_rules! task {
fn cancel_and_drop_handle() {
future!(f, POLL, DROP_F);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
task!(task, handle, f, s, DROP_T);

assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_T.load(), 0);

task.cancel();
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_T.load(), 0);

drop(handle);
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_T.load(), 0);

drop(task);
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(DROP_T.load(), 1);
}

#[test]
fn run_and_drop_handle() {
future!(f, POLL, DROP_F);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
task!(task, handle, f, s, DROP_T);

drop(handle);
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_T.load(), 0);

task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(DROP_T.load(), 1);
}

#[test]
fn drop_handle_and_run() {
future!(f, POLL, DROP_F);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
task!(task, handle, f, s, DROP_T);

drop(handle);
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_T.load(), 0);

task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(DROP_T.load(), 1);
}

#[test]
fn cancel_and_run() {
future!(f, POLL, DROP_F);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
task!(task, handle, f, s, DROP_T);

handle.cancel();
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_T.load(), 0);

drop(handle);
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_T.load(), 0);

task.run();
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(DROP_T.load(), 1);
}

#[test]
fn run_and_cancel() {
future!(f, POLL, DROP_F);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
task!(task, handle, f, s, DROP_T);

task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_T.load(), 0);

handle.cancel();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_T.load(), 0);

drop(handle);
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(DROP_T.load(), 1);
}

#[test]
Expand Down Expand Up @@ -310,3 +310,25 @@ fn schedule_counter() {
assert_eq!(handle.tag().load(Ordering::SeqCst), 3);
r.recv().unwrap();
}

#[test]
fn drop_inside_schedule() {
struct DropGuard(AtomicUsize);
impl Drop for DropGuard {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
let guard = DropGuard(AtomicUsize::new(0));

let (task, _) = async_task::spawn(
async {},
move |task| {
assert_eq!(guard.0.load(Ordering::SeqCst), 0);
drop(task);
assert_eq!(guard.0.load(Ordering::SeqCst), 0);
},
(),
);
task.schedule();
}
Loading