diff --git a/Cargo.toml b/Cargo.toml index 15d0cf5..7dbaba8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ concurrent-queue = "2.0.0" fastrand = "2.0.0" futures-lite = { version = "2.0.0", default-features = false } slab = "0.4.4" +st3 = "0.4" [target.'cfg(target_family = "wasm")'.dependencies] futures-lite = { version = "2.0.0", default-features = false, features = ["std"] } diff --git a/src/lib.rs b/src/lib.rs index a663be8..72460b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] +use st3::fifo::{Stealer, Worker}; use std::fmt; use std::marker::PhantomData; use std::panic::{RefUnwindSafe, UnwindSafe}; @@ -687,7 +688,7 @@ struct State { queue: ConcurrentQueue, /// Local queues created by runners. - local_queues: RwLock>>>, + local_queues: RwLock>>, /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. notified: AtomicBool, @@ -930,7 +931,7 @@ struct Runner<'a> { ticker: Ticker<'a>, /// The local queue. - local: Arc>, + local: Worker, /// Bumped every time a runnable task is found. ticks: usize, @@ -939,17 +940,18 @@ struct Runner<'a> { impl Runner<'_> { /// Creates a runner and registers it in the executor state. fn new(state: &State) -> Runner<'_> { + let worker = Worker::new(512); let runner = Runner { state, ticker: Ticker::new(state), - local: Arc::new(ConcurrentQueue::bounded(512)), + local: worker, ticks: 0, }; state .local_queues .write() .unwrap() - .push(runner.local.clone()); + .push(runner.local.stealer()); runner } @@ -959,7 +961,7 @@ impl Runner<'_> { .ticker .runnable_with(|| { // Try the local queue. - if let Ok(r) = self.local.pop() { + if let Some(r) = self.local.pop() { return Some(r); } @@ -982,12 +984,13 @@ impl Runner<'_> { .take(n); // Remove this runner's local queue. - let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local)); + let local_stealer = self.local.stealer_ref(); + let iter = iter.filter(|local| !core::ptr::eq(*local, local_stealer)); // Try stealing from each local queue in the list. for local in iter { - steal(local, &self.local); - if let Ok(r) = self.local.pop() { + let count_fn = |remaining| remaining / 2; + if let Ok((r, _)) = local.steal_and_pop(&self.local, count_fn) { return Some(r); } } @@ -1010,38 +1013,32 @@ impl Runner<'_> { impl Drop for Runner<'_> { fn drop(&mut self) { + let stealer_ref = self.local.stealer_ref(); // Remove the local queue. self.state .local_queues .write() .unwrap() - .retain(|local| !Arc::ptr_eq(local, &self.local)); + .retain(|stealer| !core::ptr::eq(stealer, stealer_ref)); // Re-schedule remaining tasks in the local queue. - while let Ok(r) = self.local.pop() { + while let Some(r) = self.local.pop() { r.schedule(); } } } /// Steals some items from one queue into another. -fn steal(src: &ConcurrentQueue, dest: &ConcurrentQueue) { +fn steal(src: &ConcurrentQueue, dest: &Worker) { // Half of `src`'s length rounded up. let mut count = (src.len() + 1) / 2; if count > 0 { - // Don't steal more than fits into the queue. - if let Some(cap) = dest.capacity() { - count = count.min(cap - dest.len()); - } + count = count.min(dest.spare_capacity()); // Steal tasks. - for _ in 0..count { - if let Ok(t) = src.pop() { - assert!(dest.push(t).is_ok()); - } else { - break; - } + for t in src.try_iter().take(count) { + assert!(dest.push(t).is_ok()); } } } @@ -1082,15 +1079,12 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_ } /// Debug wrapper for the local runners. - struct LocalRunners<'a>(&'a RwLock>>>); + struct LocalRunners<'a>(&'a RwLock>>); impl fmt::Debug for LocalRunners<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self.0.try_read() { - Ok(lock) => f - .debug_list() - .entries(lock.iter().map(|queue| queue.len())) - .finish(), + Ok(lock) => write!(f, "count: {}", lock.len()), Err(TryLockError::WouldBlock) => f.write_str(""), Err(TryLockError::Poisoned(_)) => f.write_str(""), } @@ -1128,8 +1122,6 @@ impl Drop for CallOnDrop { } fn _ensure_send_and_sync() { - use futures_lite::future::pending; - fn is_send(_: T) {} fn is_sync(_: T) {} fn is_static(_: T) {} @@ -1138,8 +1130,6 @@ fn _ensure_send_and_sync() { is_sync::>(Executor::new()); let ex = Executor::new(); - is_send(ex.run(pending::<()>())); - is_sync(ex.run(pending::<()>())); is_send(ex.tick()); is_sync(ex.tick()); is_send(ex.schedule());