Skip to content

Commit 45a2c1f

Browse files
author
Stjepan Glavina
authored
Schedule function holds a reference to RawTask (#12)
* Schedule function holds a reference * Rename DROP_D to DROP_T * Don't rely on valgrind to catch errors
1 parent 9f11ab3 commit 45a2c1f

File tree

8 files changed

+216
-180
lines changed

8 files changed

+216
-180
lines changed

src/raw.rs

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::Task;
1616
/// The vtable for a task.
1717
pub(crate) struct TaskVTable {
1818
/// The raw waker vtable.
19-
pub(crate) raw_waker: RawWakerVTable,
19+
pub(crate) raw_waker_vtable: RawWakerVTable,
2020

2121
/// Schedules the task.
2222
pub(crate) schedule: unsafe fn(*const ()),
@@ -119,7 +119,7 @@ where
119119
state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE),
120120
awaiter: Cell::new(None),
121121
vtable: &TaskVTable {
122-
raw_waker: RawWakerVTable::new(
122+
raw_waker_vtable: RawWakerVTable::new(
123123
Self::clone_waker,
124124
Self::wake,
125125
Self::wake_by_ref,
@@ -198,6 +198,14 @@ where
198198

199199
/// Wakes a waker.
200200
unsafe fn wake(ptr: *const ()) {
201+
// This is just an optimization. If the schedule function has captured variables, then
202+
// we'll do less reference counting if we wake the waker by reference and then drop it.
203+
if mem::size_of::<S>() > 0 {
204+
Self::wake_by_ref(ptr);
205+
Self::drop_waker(ptr);
206+
return;
207+
}
208+
201209
let raw = Self::from_ptr(ptr);
202210

203211
let mut state = (*raw.header).state.load(Ordering::Acquire);
@@ -238,13 +246,9 @@ where
238246
Ok(_) => {
239247
// If the task is not yet scheduled and isn't currently running, now is the
240248
// time to schedule it.
241-
if state & (SCHEDULED | RUNNING) == 0 {
249+
if state & RUNNING == 0 {
242250
// Schedule the task.
243-
let task = Task {
244-
raw_task: NonNull::new_unchecked(ptr as *mut ()),
245-
_marker: PhantomData,
246-
};
247-
(*raw.schedule)(task);
251+
Self::schedule(ptr);
248252
} else {
249253
// Drop the waker.
250254
Self::drop_waker(ptr);
@@ -284,8 +288,8 @@ where
284288
Err(s) => state = s,
285289
}
286290
} else {
287-
// If the task is not scheduled nor running, we'll need to schedule after waking.
288-
let new = if state & (SCHEDULED | RUNNING) == 0 {
291+
// If the task is not running, we can schedule right away.
292+
let new = if state & RUNNING == 0 {
289293
(state | SCHEDULED) + REFERENCE
290294
} else {
291295
state | SCHEDULED
@@ -299,14 +303,16 @@ where
299303
Ordering::Acquire,
300304
) {
301305
Ok(_) => {
302-
// If the task is not scheduled nor running, now is the time to schedule.
303-
if state & (SCHEDULED | RUNNING) == 0 {
306+
// If the task is not running, now is the time to schedule.
307+
if state & RUNNING == 0 {
304308
// If the reference count overflowed, abort.
305309
if state > isize::max_value() as usize {
306310
std::process::abort();
307311
}
308312

309-
// Schedule the task.
313+
// Schedule the task. There is no need to call `Self::schedule(ptr)`
314+
// because the schedule function cannot be destroyed while the waker is
315+
// still alive.
310316
let task = Task {
311317
raw_task: NonNull::new_unchecked(ptr as *mut ()),
312318
_marker: PhantomData,
@@ -325,7 +331,7 @@ where
325331
/// Clones a waker.
326332
unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
327333
let raw = Self::from_ptr(ptr);
328-
let raw_waker = &(*raw.header).vtable.raw_waker;
334+
let raw_waker_vtable = &(*raw.header).vtable.raw_waker_vtable;
329335

330336
// Increment the reference count. With any kind of reference-counted data structure,
331337
// relaxed ordering is appropriate when incrementing the counter.
@@ -336,7 +342,7 @@ where
336342
std::process::abort();
337343
}
338344

339-
RawWaker::new(ptr, raw_waker)
345+
RawWaker::new(ptr, raw_waker_vtable)
340346
}
341347

342348
/// Drops a waker.
@@ -360,7 +366,7 @@ where
360366
(*raw.header)
361367
.state
362368
.store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
363-
((*raw.header).vtable.schedule)(ptr);
369+
Self::schedule(ptr);
364370
} else {
365371
// Otherwise, destroy the task right away.
366372
Self::destroy(ptr);
@@ -393,10 +399,18 @@ where
393399
unsafe fn schedule(ptr: *const ()) {
394400
let raw = Self::from_ptr(ptr);
395401

396-
(*raw.schedule)(Task {
402+
// If the schedule function has captured variables, create a temporary waker that prevents
403+
// the task from getting deallocated while the function is being invoked.
404+
let _waker;
405+
if mem::size_of::<S>() > 0 {
406+
_waker = Waker::from_raw(Self::clone_waker(ptr));
407+
}
408+
409+
let task = Task {
397410
raw_task: NonNull::new_unchecked(ptr as *mut ()),
398411
_marker: PhantomData,
399-
});
412+
};
413+
(*raw.schedule)(task);
400414
}
401415

402416
/// Drops the future inside a task.
@@ -448,7 +462,7 @@ where
448462
// Create a context from the raw task pointer and the vtable inside the its header.
449463
let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(
450464
ptr,
451-
&(*raw.header).vtable.raw_waker,
465+
&(*raw.header).vtable.raw_waker_vtable,
452466
)));
453467
let cx = &mut Context::from_waker(&waker);
454468

tests/basic.rs

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -109,132 +109,132 @@ macro_rules! task {
109109
fn cancel_and_drop_handle() {
110110
future!(f, POLL, DROP_F);
111111
schedule!(s, SCHEDULE, DROP_S);
112-
task!(task, handle, f, s, DROP_D);
112+
task!(task, handle, f, s, DROP_T);
113113

114114
assert_eq!(POLL.load(), 0);
115115
assert_eq!(SCHEDULE.load(), 0);
116116
assert_eq!(DROP_F.load(), 0);
117117
assert_eq!(DROP_S.load(), 0);
118-
assert_eq!(DROP_D.load(), 0);
118+
assert_eq!(DROP_T.load(), 0);
119119

120120
task.cancel();
121121
assert_eq!(POLL.load(), 0);
122122
assert_eq!(SCHEDULE.load(), 0);
123123
assert_eq!(DROP_F.load(), 0);
124124
assert_eq!(DROP_S.load(), 0);
125-
assert_eq!(DROP_D.load(), 0);
125+
assert_eq!(DROP_T.load(), 0);
126126

127127
drop(handle);
128128
assert_eq!(POLL.load(), 0);
129129
assert_eq!(SCHEDULE.load(), 0);
130130
assert_eq!(DROP_F.load(), 0);
131131
assert_eq!(DROP_S.load(), 0);
132-
assert_eq!(DROP_D.load(), 0);
132+
assert_eq!(DROP_T.load(), 0);
133133

134134
drop(task);
135135
assert_eq!(POLL.load(), 0);
136136
assert_eq!(SCHEDULE.load(), 0);
137137
assert_eq!(DROP_F.load(), 1);
138138
assert_eq!(DROP_S.load(), 1);
139-
assert_eq!(DROP_D.load(), 1);
139+
assert_eq!(DROP_T.load(), 1);
140140
}
141141

142142
#[test]
143143
fn run_and_drop_handle() {
144144
future!(f, POLL, DROP_F);
145145
schedule!(s, SCHEDULE, DROP_S);
146-
task!(task, handle, f, s, DROP_D);
146+
task!(task, handle, f, s, DROP_T);
147147

148148
drop(handle);
149149
assert_eq!(POLL.load(), 0);
150150
assert_eq!(SCHEDULE.load(), 0);
151151
assert_eq!(DROP_F.load(), 0);
152152
assert_eq!(DROP_S.load(), 0);
153-
assert_eq!(DROP_D.load(), 0);
153+
assert_eq!(DROP_T.load(), 0);
154154

155155
task.run();
156156
assert_eq!(POLL.load(), 1);
157157
assert_eq!(SCHEDULE.load(), 0);
158158
assert_eq!(DROP_F.load(), 1);
159159
assert_eq!(DROP_S.load(), 1);
160-
assert_eq!(DROP_D.load(), 1);
160+
assert_eq!(DROP_T.load(), 1);
161161
}
162162

163163
#[test]
164164
fn drop_handle_and_run() {
165165
future!(f, POLL, DROP_F);
166166
schedule!(s, SCHEDULE, DROP_S);
167-
task!(task, handle, f, s, DROP_D);
167+
task!(task, handle, f, s, DROP_T);
168168

169169
drop(handle);
170170
assert_eq!(POLL.load(), 0);
171171
assert_eq!(SCHEDULE.load(), 0);
172172
assert_eq!(DROP_F.load(), 0);
173173
assert_eq!(DROP_S.load(), 0);
174-
assert_eq!(DROP_D.load(), 0);
174+
assert_eq!(DROP_T.load(), 0);
175175

176176
task.run();
177177
assert_eq!(POLL.load(), 1);
178178
assert_eq!(SCHEDULE.load(), 0);
179179
assert_eq!(DROP_F.load(), 1);
180180
assert_eq!(DROP_S.load(), 1);
181-
assert_eq!(DROP_D.load(), 1);
181+
assert_eq!(DROP_T.load(), 1);
182182
}
183183

184184
#[test]
185185
fn cancel_and_run() {
186186
future!(f, POLL, DROP_F);
187187
schedule!(s, SCHEDULE, DROP_S);
188-
task!(task, handle, f, s, DROP_D);
188+
task!(task, handle, f, s, DROP_T);
189189

190190
handle.cancel();
191191
assert_eq!(POLL.load(), 0);
192192
assert_eq!(SCHEDULE.load(), 0);
193193
assert_eq!(DROP_F.load(), 0);
194194
assert_eq!(DROP_S.load(), 0);
195-
assert_eq!(DROP_D.load(), 0);
195+
assert_eq!(DROP_T.load(), 0);
196196

197197
drop(handle);
198198
assert_eq!(POLL.load(), 0);
199199
assert_eq!(SCHEDULE.load(), 0);
200200
assert_eq!(DROP_F.load(), 0);
201201
assert_eq!(DROP_S.load(), 0);
202-
assert_eq!(DROP_D.load(), 0);
202+
assert_eq!(DROP_T.load(), 0);
203203

204204
task.run();
205205
assert_eq!(POLL.load(), 0);
206206
assert_eq!(SCHEDULE.load(), 0);
207207
assert_eq!(DROP_F.load(), 1);
208208
assert_eq!(DROP_S.load(), 1);
209-
assert_eq!(DROP_D.load(), 1);
209+
assert_eq!(DROP_T.load(), 1);
210210
}
211211

212212
#[test]
213213
fn run_and_cancel() {
214214
future!(f, POLL, DROP_F);
215215
schedule!(s, SCHEDULE, DROP_S);
216-
task!(task, handle, f, s, DROP_D);
216+
task!(task, handle, f, s, DROP_T);
217217

218218
task.run();
219219
assert_eq!(POLL.load(), 1);
220220
assert_eq!(SCHEDULE.load(), 0);
221221
assert_eq!(DROP_F.load(), 1);
222222
assert_eq!(DROP_S.load(), 0);
223-
assert_eq!(DROP_D.load(), 0);
223+
assert_eq!(DROP_T.load(), 0);
224224

225225
handle.cancel();
226226
assert_eq!(POLL.load(), 1);
227227
assert_eq!(SCHEDULE.load(), 0);
228228
assert_eq!(DROP_F.load(), 1);
229229
assert_eq!(DROP_S.load(), 0);
230-
assert_eq!(DROP_D.load(), 0);
230+
assert_eq!(DROP_T.load(), 0);
231231

232232
drop(handle);
233233
assert_eq!(POLL.load(), 1);
234234
assert_eq!(SCHEDULE.load(), 0);
235235
assert_eq!(DROP_F.load(), 1);
236236
assert_eq!(DROP_S.load(), 1);
237-
assert_eq!(DROP_D.load(), 1);
237+
assert_eq!(DROP_T.load(), 1);
238238
}
239239

240240
#[test]
@@ -310,3 +310,25 @@ fn schedule_counter() {
310310
assert_eq!(handle.tag().load(Ordering::SeqCst), 3);
311311
r.recv().unwrap();
312312
}
313+
314+
#[test]
315+
fn drop_inside_schedule() {
316+
struct DropGuard(AtomicUsize);
317+
impl Drop for DropGuard {
318+
fn drop(&mut self) {
319+
self.0.fetch_add(1, Ordering::SeqCst);
320+
}
321+
}
322+
let guard = DropGuard(AtomicUsize::new(0));
323+
324+
let (task, _) = async_task::spawn(
325+
async {},
326+
move |task| {
327+
assert_eq!(guard.0.load(Ordering::SeqCst), 0);
328+
drop(task);
329+
assert_eq!(guard.0.load(Ordering::SeqCst), 0);
330+
},
331+
(),
332+
);
333+
task.schedule();
334+
}

0 commit comments

Comments
 (0)