From 37e9e78251ed1fda7acfe8a08c2736105f0f67c9 Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Fri, 18 Oct 2019 00:58:30 -0600 Subject: [PATCH 1/8] Implement async_std::sync::Condvar Part of #217 --- src/sync/condvar.rs | 380 ++++++++++++++++++++++++++++++++++++++++++++ src/sync/mod.rs | 2 + src/sync/mutex.rs | 4 + 3 files changed, 386 insertions(+) create mode 100644 src/sync/condvar.rs diff --git a/src/sync/condvar.rs b/src/sync/condvar.rs new file mode 100644 index 000000000..ca9f28a0a --- /dev/null +++ b/src/sync/condvar.rs @@ -0,0 +1,380 @@ +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; + +use futures_timer::Delay; +use slab::Slab; + +use super::mutex::{guard_lock, MutexGuard}; +use crate::future::Future; +use crate::task::{Context, Poll, Waker}; + +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub struct WaitTimeoutResult(bool); + +/// A type indicating whether a timed wait on a condition variable returned due to a time out or +/// not +impl WaitTimeoutResult { + /// Returns `true` if the wait was known to have timed out. + pub fn timed_out(&self) -> bool { + self.0 + } +} + +/// A Condition Variable +/// +/// This type is an async version of [`std::sync::Mutex`]. +/// +/// [`std::sync::Condvar`]: https://doc.rust-lang.org/std/sync/struct.Condvar.html +/// +/// # Examples +/// +/// ``` +/// # fn main() { async_std::task::block_on(async { +/// # +/// use std::sync::Arc; +/// +/// use async_std::sync::{Mutex, Condvar}; +/// use async_std::task; +/// +/// let pair = Arc::new((Mutex::new(false), Condvar::new())); +/// let pair2 = pair.clone(); +/// +/// // Inside of our lock, spawn a new thread, and then wait for it to start. +/// task::spawn(async move { +/// let (lock, cvar) = &*pair2; +/// let mut started = lock.lock().await; +/// *started = true; +/// // We notify the condvar that the value has changed. +/// cvar.notify_one(); +/// }); +/// +/// // Wait for the thread to start up. +/// let (lock, cvar) = &*pair; +/// let mut started = lock.lock().await; +/// while !*started { +/// started = cvar.wait(started).await; +/// } +/// +/// # }) } +/// ``` +#[derive(Debug)] +pub struct Condvar { + has_blocked: AtomicBool, + blocked: std::sync::Mutex>>, +} + +impl Condvar { + /// Creates a new condition variable + /// + /// # Examples + /// + /// ``` + /// use async_std::sync::Condvar; + /// + /// let cvar = Condvar::new(); + /// ``` + pub fn new() -> Self { + Condvar { + has_blocked: AtomicBool::new(false), + blocked: std::sync::Mutex::new(Slab::new()), + } + } + + /// Blocks the current task until this condition variable receives a notification. + /// + /// Unlike the std equivalent, this does not check that a single mutex is used at runtime. + /// However, as a best practice avoid using with multiple mutexes. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// use std::sync::Arc; + /// + /// use async_std::sync::{Mutex, Condvar}; + /// use async_std::task; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// task::spawn(async move { + /// let (lock, cvar) = &*pair2; + /// let mut started = lock.lock().await; + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // Wait for the thread to start up. + /// let (lock, cvar) = &*pair; + /// let mut started = lock.lock().await; + /// while !*started { + /// started = cvar.wait(started).await; + /// } + /// # }) } + /// ``` + pub async fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> { + let mutex = guard_lock(&guard); + + self.await_notify(guard).await; + + mutex.lock().await + } + + fn await_notify<'a, T>(&self, guard: MutexGuard<'a, T>) -> AwaitNotify<'_, 'a, T> { + AwaitNotify { + cond: self, + guard: Some(guard), + key: None, + } + } + + /// Blocks the current taks until this condition variable receives a notification and the + /// required condition is met. Spurious wakeups are ignored and this function will only + /// return once the condition has been met. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::sync::Arc; + /// + /// use async_std::sync::{Mutex, Condvar}; + /// use async_std::task; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// task::spawn(async move { + /// let (lock, cvar) = &*pair2; + /// let mut started = lock.lock().await; + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // Wait for the thread to start up. + /// let (lock, cvar) = &*pair; + /// // As long as the value inside the `Mutex` is `false`, we wait. + /// let _guard = cvar.wait_until(lock.lock().await, |started| { *started }).await; + /// # + /// # }) } + /// ``` + #[cfg(feature = "unstable")] + pub async fn wait_until<'a, T, F>( + &self, + mut guard: MutexGuard<'a, T>, + mut condition: F, + ) -> MutexGuard<'a, T> + where + F: FnMut(&mut T) -> bool, + { + while !condition(&mut *guard) { + guard = self.wait(guard).await; + } + guard + } + + /// Waits on this condition variable for a notification, timing out after a specified duration. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::sync::Arc; + /// use std::time::Duration; + /// + /// use async_std::sync::{Mutex, Condvar}; + /// use async_std::task; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// task::spawn(async move { + /// let (lock, cvar) = &*pair2; + /// let mut started = lock.lock().await; + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // wait for the thread to start up + /// let (lock, cvar) = &*pair; + /// let mut started = lock.lock().await; + /// loop { + /// let result = cvar.wait_timeout(started, Duration::from_millis(10)).await; + /// started = result.0; + /// if *started == true { + /// // We received the notification and the value has been updated, we can leave. + /// break + /// } + /// } + /// # + /// # }) } + /// ``` + pub async fn wait_timeout<'a, T>( + &self, + guard: MutexGuard<'a, T>, + dur: Duration, + ) -> (MutexGuard<'a, T>, WaitTimeoutResult) { + let mutex = guard_lock(&guard); + let timeout_result = TimeoutWaitFuture { + await_notify: self.await_notify(guard), + delay: Delay::new(dur), + } + .await; + + (mutex.lock().await, timeout_result) + } + + /// Wakes up one blocked task on this condvar. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// use std::sync::Arc; + /// + /// use async_std::sync::{Mutex, Condvar}; + /// use async_std::task; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// task::spawn(async move { + /// let (lock, cvar) = &*pair2; + /// let mut started = lock.lock().await; + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // Wait for the thread to start up. + /// let (lock, cvar) = &*pair; + /// let mut started = lock.lock().await; + /// while !*started { + /// started = cvar.wait(started).await; + /// } + /// # }) } + /// ``` + pub fn notify_one(&self) { + if self.has_blocked.load(Ordering::Acquire) { + let mut blocked = self.blocked.lock().unwrap(); + if let Some((_, opt_waker)) = blocked.iter_mut().next() { + if let Some(w) = opt_waker.take() { + w.wake(); + } + } + } + } + + /// Wakes up all blocked tasks on this condvar. + /// + /// # Examples + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::sync::Arc; + /// + /// use async_std::sync::{Mutex, Condvar}; + /// use async_std::task; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// task::spawn(async move { + /// let (lock, cvar) = &*pair2; + /// let mut started = lock.lock().await; + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_all(); + /// }); + /// + /// // Wait for the thread to start up. + /// let (lock, cvar) = &*pair; + /// let mut started = lock.lock().await; + /// // As long as the value inside the `Mutex` is `false`, we wait. + /// while !*started { + /// started = cvar.wait(started).await; + /// } + /// # + /// # }) } + /// ``` + pub fn notify_all(&self) { + if self.has_blocked.load(Ordering::Acquire) { + let mut blocked = self.blocked.lock().unwrap(); + for (_, opt_waker) in blocked.iter_mut() { + if let Some(w) = opt_waker.take() { + w.wake(); + } + } + } + } +} + +struct AwaitNotify<'a, 'b, T> { + cond: &'a Condvar, + guard: Option>, + key: Option, +} + +impl<'a, 'b, T> Future for AwaitNotify<'a, 'b, T> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.guard.take() { + Some(_) => { + let mut blocked = self.cond.blocked.lock().unwrap(); + let w = cx.waker().clone(); + self.key = Some(blocked.insert(Some(w))); + + if blocked.len() == 1 { + self.cond.has_blocked.store(true, Ordering::Relaxed); + } + // the guard is dropped when we return, which frees the lock + Poll::Pending + } + None => Poll::Ready(()), + } + } +} + +impl<'a, 'b, T> Drop for AwaitNotify<'a, 'b, T> { + fn drop(&mut self) { + if let Some(key) = self.key { + let mut blocked = self.cond.blocked.lock().unwrap(); + blocked.remove(key); + + if blocked.is_empty() { + self.cond.has_blocked.store(false, Ordering::Relaxed); + } + } + } +} + +struct TimeoutWaitFuture<'a, 'b, T> { + await_notify: AwaitNotify<'a, 'b, T>, + delay: Delay, +} + +impl<'a, 'b, T> TimeoutWaitFuture<'a, 'b, T> { + pin_utils::unsafe_pinned!(await_notify: AwaitNotify<'a, 'b, T>); + pin_utils::unsafe_pinned!(delay: Delay); +} + +impl<'a, 'b, T> Future for TimeoutWaitFuture<'a, 'b, T> { + type Output = WaitTimeoutResult; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().await_notify().poll(cx) { + Poll::Ready(_) => Poll::Ready(WaitTimeoutResult(false)), + Poll::Pending => match self.delay().poll(cx) { + Poll::Ready(_) => Poll::Ready(WaitTimeoutResult(true)), + Poll::Pending => Poll::Pending, + }, + } + } +} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 82759fb6b..085e82499 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -176,9 +176,11 @@ #[doc(inline)] pub use std::sync::{Arc, Weak}; +pub use condvar::Condvar; pub use mutex::{Mutex, MutexGuard}; pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +mod condvar; mod mutex; mod rwlock; diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index c62b5616a..65d47dee1 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -287,3 +287,7 @@ impl DerefMut for MutexGuard<'_, T> { unsafe { &mut *self.0.value.get() } } } + +pub fn guard_lock<'a, T>(guard: &MutexGuard<'a, T>) -> &'a Mutex { + guard.0 +} From fc3483d64e654373df4f5c91e5cabd37bf875350 Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Sat, 19 Oct 2019 15:57:43 -0600 Subject: [PATCH 2/8] More rigourous detection of notification for condvar --- src/sync/condvar.rs | 81 +++++++++++++++++++++++++++++---------------- tests/condvar.rs | 17 ++++++++++ 2 files changed, 69 insertions(+), 29 deletions(-) create mode 100644 tests/condvar.rs diff --git a/src/sync/condvar.rs b/src/sync/condvar.rs index ca9f28a0a..78fe697d8 100644 --- a/src/sync/condvar.rs +++ b/src/sync/condvar.rs @@ -1,5 +1,6 @@ use std::pin::Pin; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use std::time::Duration; use futures_timer::Delay; @@ -60,8 +61,19 @@ impl WaitTimeoutResult { /// ``` #[derive(Debug)] pub struct Condvar { - has_blocked: AtomicBool, - blocked: std::sync::Mutex>>, + blocked: std::sync::Mutex>, +} + +/// Flag to mark if the task was notified +const NOTIFIED: usize = 1; +/// State if the task was notified with `notify_once` +/// so it should notify another task if the future is dropped without waking. +const NOTIFIED_ONCE: usize = 0b11; + +#[derive(Debug)] +struct WaitEntry { + state: Arc, + waker: Option, } impl Condvar { @@ -76,7 +88,6 @@ impl Condvar { /// ``` pub fn new() -> Self { Condvar { - has_blocked: AtomicBool::new(false), blocked: std::sync::Mutex::new(Slab::new()), } } @@ -126,6 +137,7 @@ impl Condvar { AwaitNotify { cond: self, guard: Some(guard), + state: Arc::new(AtomicUsize::new(0)), key: None, } } @@ -261,14 +273,8 @@ impl Condvar { /// # }) } /// ``` pub fn notify_one(&self) { - if self.has_blocked.load(Ordering::Acquire) { - let mut blocked = self.blocked.lock().unwrap(); - if let Some((_, opt_waker)) = blocked.iter_mut().next() { - if let Some(w) = opt_waker.take() { - w.wake(); - } - } - } + let blocked = self.blocked.lock().unwrap(); + notify(blocked, false); } /// Wakes up all blocked tasks on this condvar. @@ -304,12 +310,20 @@ impl Condvar { /// # }) } /// ``` pub fn notify_all(&self) { - if self.has_blocked.load(Ordering::Acquire) { - let mut blocked = self.blocked.lock().unwrap(); - for (_, opt_waker) in blocked.iter_mut() { - if let Some(w) = opt_waker.take() { - w.wake(); - } + let blocked = self.blocked.lock().unwrap(); + notify(blocked, true); + } +} + +#[inline] +fn notify(mut blocked: std::sync::MutexGuard<'_, Slab>, all: bool) { + let state = if all { NOTIFIED } else { NOTIFIED_ONCE }; + for (_, entry) in blocked.iter_mut() { + if let Some(w) = entry.waker.take() { + entry.state.store(state, Ordering::Release); + w.wake(); + if !all { + return; } } } @@ -318,6 +332,7 @@ impl Condvar { struct AwaitNotify<'a, 'b, T> { cond: &'a Condvar, guard: Option>, + state: Arc, key: Option, } @@ -329,15 +344,21 @@ impl<'a, 'b, T> Future for AwaitNotify<'a, 'b, T> { Some(_) => { let mut blocked = self.cond.blocked.lock().unwrap(); let w = cx.waker().clone(); - self.key = Some(blocked.insert(Some(w))); + self.key = Some(blocked.insert(WaitEntry { + state: self.state.clone(), + waker: Some(w), + })); - if blocked.len() == 1 { - self.cond.has_blocked.store(true, Ordering::Relaxed); - } // the guard is dropped when we return, which frees the lock Poll::Pending } - None => Poll::Ready(()), + None => { + if self.state.fetch_and(!NOTIFIED, Ordering::AcqRel) & NOTIFIED != 0 { + Poll::Ready(()) + } else { + Poll::Pending + } + } } } } @@ -348,8 +369,10 @@ impl<'a, 'b, T> Drop for AwaitNotify<'a, 'b, T> { let mut blocked = self.cond.blocked.lock().unwrap(); blocked.remove(key); - if blocked.is_empty() { - self.cond.has_blocked.store(false, Ordering::Relaxed); + if !blocked.is_empty() && self.state.load(Ordering::Acquire) == NOTIFIED_ONCE { + // we got a notification form notify_once but didn't handle it, + // so send it to a different task + notify(blocked, false); } } } @@ -369,12 +392,12 @@ impl<'a, 'b, T> Future for TimeoutWaitFuture<'a, 'b, T> { type Output = WaitTimeoutResult; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.as_mut().await_notify().poll(cx) { - Poll::Ready(_) => Poll::Ready(WaitTimeoutResult(false)), - Poll::Pending => match self.delay().poll(cx) { - Poll::Ready(_) => Poll::Ready(WaitTimeoutResult(true)), + match self.as_mut().delay().poll(cx) { + Poll::Pending => match self.await_notify().poll(cx) { + Poll::Ready(_) => Poll::Ready(WaitTimeoutResult(false)), Poll::Pending => Poll::Pending, }, + Poll::Ready(_) => Poll::Ready(WaitTimeoutResult(true)), } } } diff --git a/tests/condvar.rs b/tests/condvar.rs new file mode 100644 index 000000000..1d2a58948 --- /dev/null +++ b/tests/condvar.rs @@ -0,0 +1,17 @@ +use std::sync::Arc; +use std::time::Duration; + +use async_std::sync::{Condvar, Mutex}; +use async_std::task; + +#[test] +fn wait_timeout() { + task::block_on(async { + let m = Mutex::new(()); + let c = Condvar::new(); + let (_, wait_result) = c + .wait_timeout(m.lock().await, Duration::from_millis(10)) + .await; + assert!(wait_result.timed_out()); + }) +} From 157878922f1b8f28a91cc9331331c9afcdb4f672 Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Sun, 20 Oct 2019 21:19:22 -0600 Subject: [PATCH 3/8] Use state of Waker instead of AtomicUsize to keep track of if task was notified. --- src/sync/condvar.rs | 48 +++++++++++++-------------------------------- tests/condvar.rs | 14 +++++++++++-- 2 files changed, 26 insertions(+), 36 deletions(-) diff --git a/src/sync/condvar.rs b/src/sync/condvar.rs index 78fe697d8..7b44beb13 100644 --- a/src/sync/condvar.rs +++ b/src/sync/condvar.rs @@ -1,6 +1,4 @@ use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; use std::time::Duration; use futures_timer::Delay; @@ -61,19 +59,7 @@ impl WaitTimeoutResult { /// ``` #[derive(Debug)] pub struct Condvar { - blocked: std::sync::Mutex>, -} - -/// Flag to mark if the task was notified -const NOTIFIED: usize = 1; -/// State if the task was notified with `notify_once` -/// so it should notify another task if the future is dropped without waking. -const NOTIFIED_ONCE: usize = 0b11; - -#[derive(Debug)] -struct WaitEntry { - state: Arc, - waker: Option, + blocked: std::sync::Mutex>>, } impl Condvar { @@ -137,8 +123,8 @@ impl Condvar { AwaitNotify { cond: self, guard: Some(guard), - state: Arc::new(AtomicUsize::new(0)), key: None, + notified: false, } } @@ -316,11 +302,9 @@ impl Condvar { } #[inline] -fn notify(mut blocked: std::sync::MutexGuard<'_, Slab>, all: bool) { - let state = if all { NOTIFIED } else { NOTIFIED_ONCE }; +fn notify(mut blocked: std::sync::MutexGuard<'_, Slab>>, all: bool) { for (_, entry) in blocked.iter_mut() { - if let Some(w) = entry.waker.take() { - entry.state.store(state, Ordering::Release); + if let Some(w) = entry.take() { w.wake(); if !all { return; @@ -332,8 +316,8 @@ fn notify(mut blocked: std::sync::MutexGuard<'_, Slab>, all: bool) { struct AwaitNotify<'a, 'b, T> { cond: &'a Condvar, guard: Option>, - state: Arc, key: Option, + notified: bool, } impl<'a, 'b, T> Future for AwaitNotify<'a, 'b, T> { @@ -344,20 +328,14 @@ impl<'a, 'b, T> Future for AwaitNotify<'a, 'b, T> { Some(_) => { let mut blocked = self.cond.blocked.lock().unwrap(); let w = cx.waker().clone(); - self.key = Some(blocked.insert(WaitEntry { - state: self.state.clone(), - waker: Some(w), - })); + self.key = Some(blocked.insert(Some(w))); // the guard is dropped when we return, which frees the lock Poll::Pending } None => { - if self.state.fetch_and(!NOTIFIED, Ordering::AcqRel) & NOTIFIED != 0 { - Poll::Ready(()) - } else { - Poll::Pending - } + self.notified = true; + Poll::Ready(()) } } } @@ -367,12 +345,14 @@ impl<'a, 'b, T> Drop for AwaitNotify<'a, 'b, T> { fn drop(&mut self) { if let Some(key) = self.key { let mut blocked = self.cond.blocked.lock().unwrap(); - blocked.remove(key); + let opt_waker = blocked.remove(key); - if !blocked.is_empty() && self.state.load(Ordering::Acquire) == NOTIFIED_ONCE { - // we got a notification form notify_once but didn't handle it, - // so send it to a different task + if opt_waker.is_none() && !self.notified { + // wake up the next task, because this task was notified, but + // we are dropping it before it can finished. + // This may result in a spurious wake-up, but that's ok. notify(blocked, false); + } } } diff --git a/tests/condvar.rs b/tests/condvar.rs index 1d2a58948..2188d6131 100644 --- a/tests/condvar.rs +++ b/tests/condvar.rs @@ -7,11 +7,21 @@ use async_std::task; #[test] fn wait_timeout() { task::block_on(async { - let m = Mutex::new(()); - let c = Condvar::new(); + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair2 = pair.clone(); + + task::spawn(async move { + let (m, c) = &*pair2; + let _g = m.lock().await; + task::sleep(Duration::from_millis(20)).await; + c.notify_one(); + }); + + let (m, c) = &*pair; let (_, wait_result) = c .wait_timeout(m.lock().await, Duration::from_millis(10)) .await; assert!(wait_result.timed_out()); }) } + From cc9a1b8fb968b7f41da77de9d91cf4634853366c Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Mon, 28 Oct 2019 00:32:48 -0600 Subject: [PATCH 4/8] Add test for notify_all --- src/sync/condvar.rs | 12 ++++++++++-- tests/condvar.rs | 37 ++++++++++++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/src/sync/condvar.rs b/src/sync/condvar.rs index 7b44beb13..ba0a1770e 100644 --- a/src/sync/condvar.rs +++ b/src/sync/condvar.rs @@ -15,7 +15,7 @@ pub struct WaitTimeoutResult(bool); /// not impl WaitTimeoutResult { /// Returns `true` if the wait was known to have timed out. - pub fn timed_out(&self) -> bool { + pub fn timed_out(self) -> bool { self.0 } } @@ -62,6 +62,12 @@ pub struct Condvar { blocked: std::sync::Mutex>>, } +impl Default for Condvar { + fn default() -> Self { + Condvar::new() + } +} + impl Condvar { /// Creates a new condition variable /// @@ -111,6 +117,7 @@ impl Condvar { /// } /// # }) } /// ``` + #[allow(clippy::needless_lifetimes)] pub async fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> { let mutex = guard_lock(&guard); @@ -161,6 +168,7 @@ impl Condvar { /// # }) } /// ``` #[cfg(feature = "unstable")] + #[allow(clippy::needless_lifetimes)] pub async fn wait_until<'a, T, F>( &self, mut guard: MutexGuard<'a, T>, @@ -213,6 +221,7 @@ impl Condvar { /// # /// # }) } /// ``` + #[allow(clippy::needless_lifetimes)] pub async fn wait_timeout<'a, T>( &self, guard: MutexGuard<'a, T>, @@ -352,7 +361,6 @@ impl<'a, 'b, T> Drop for AwaitNotify<'a, 'b, T> { // we are dropping it before it can finished. // This may result in a spurious wake-up, but that's ok. notify(blocked, false); - } } } diff --git a/tests/condvar.rs b/tests/condvar.rs index 2188d6131..60378ee1e 100644 --- a/tests/condvar.rs +++ b/tests/condvar.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use std::time::Duration; use async_std::sync::{Condvar, Mutex}; -use async_std::task; +use async_std::task::{self, JoinHandle}; #[test] fn wait_timeout() { @@ -25,3 +25,38 @@ fn wait_timeout() { }) } +#[test] +fn notify_all() { + task::block_on(async { + let mut tasks: Vec> = Vec::new(); + let pair = Arc::new((Mutex::new(0u32), Condvar::new())); + + for _ in 0..10 { + let pair = pair.clone(); + tasks.push(task::spawn(async move { + let (m, c) = &*pair; + let mut count = m.lock().await; + while *count == 0 { + count = c.wait(count).await; + } + *count += 1; + })); + } + + // Give some time for tasks to start up + task::sleep(Duration::from_millis(5)).await; + + let (m, c) = &*pair; + { + let mut count = m.lock().await; + *count += 1; + c.notify_all(); + } + + for t in tasks { + t.await; + } + let count = m.lock().await; + assert_eq!(11, *count); + }) +} From 104188cce3f0607bd091666f461c4585770b25d8 Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Sun, 3 Nov 2019 00:59:37 -0600 Subject: [PATCH 5/8] Implement wait_timeout_until And add warnings about spurious wakeups to wait and wait_timeout --- src/sync/condvar.rs | 120 +++++++++++++++++++++++++++++--------------- tests/condvar.rs | 16 ++++++ 2 files changed, 96 insertions(+), 40 deletions(-) diff --git a/src/sync/condvar.rs b/src/sync/condvar.rs index ba0a1770e..3cdac3057 100644 --- a/src/sync/condvar.rs +++ b/src/sync/condvar.rs @@ -1,11 +1,10 @@ use std::pin::Pin; use std::time::Duration; -use futures_timer::Delay; use slab::Slab; use super::mutex::{guard_lock, MutexGuard}; -use crate::future::Future; +use crate::future::{timeout, Future}; use crate::task::{Context, Poll, Waker}; #[derive(Debug, PartialEq, Eq, Copy, Clone)] @@ -29,7 +28,7 @@ impl WaitTimeoutResult { /// # Examples /// /// ``` -/// # fn main() { async_std::task::block_on(async { +/// # async_std::task::block_on(async { /// # /// use std::sync::Arc; /// @@ -55,7 +54,7 @@ impl WaitTimeoutResult { /// started = cvar.wait(started).await; /// } /// -/// # }) } +/// # }) /// ``` #[derive(Debug)] pub struct Condvar { @@ -89,10 +88,16 @@ impl Condvar { /// Unlike the std equivalent, this does not check that a single mutex is used at runtime. /// However, as a best practice avoid using with multiple mutexes. /// + /// # Warning + /// Any attempt to poll this future before the notification is received will result in a + /// spurious wakeup. This allows the implementation to be efficient, and is technically valid + /// semantics for a condition variable. However, this may result in unexpected behaviour when this future is + /// used with future combinators. In most cases `Condvar::wait_until` is easier to use correctly. + /// /// # Examples /// /// ``` - /// # fn main() { async_std::task::block_on(async { + /// # async_std::task::block_on(async { /// use std::sync::Arc; /// /// use async_std::sync::{Mutex, Condvar}; @@ -115,7 +120,7 @@ impl Condvar { /// while !*started { /// started = cvar.wait(started).await; /// } - /// # }) } + /// # }) /// ``` #[allow(clippy::needless_lifetimes)] pub async fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> { @@ -142,7 +147,7 @@ impl Condvar { /// # Examples /// /// ``` - /// # fn main() { async_std::task::block_on(async { + /// # async_std::task::block_on(async { /// # /// use std::sync::Arc; /// @@ -165,9 +170,8 @@ impl Condvar { /// // As long as the value inside the `Mutex` is `false`, we wait. /// let _guard = cvar.wait_until(lock.lock().await, |started| { *started }).await; /// # - /// # }) } + /// # }) /// ``` - #[cfg(feature = "unstable")] #[allow(clippy::needless_lifetimes)] pub async fn wait_until<'a, T, F>( &self, @@ -185,10 +189,19 @@ impl Condvar { /// Waits on this condition variable for a notification, timing out after a specified duration. /// + /// # Warning + /// This has similar limitations to `Condvar::wait`, where polling before a notify is sent can + /// result in a spurious wakeup. In addition, the timeout may itself trigger a spurious wakeup, + /// if no other task is holding the mutex when the future is polled. Thus the + /// `WaitTimeoutResult` should not be trusted to determine if the condition variable was + /// actually notified. + /// + /// For these reasons `Condvar::wait_timeout_until` is recommended in most cases. + /// /// # Examples /// /// ``` - /// # fn main() { async_std::task::block_on(async { + /// # async_std::task::block_on(async { /// # /// use std::sync::Arc; /// use std::time::Duration; @@ -219,8 +232,9 @@ impl Condvar { /// } /// } /// # - /// # }) } + /// # }) /// ``` + #[cfg(feature = "unstable")] #[allow(clippy::needless_lifetimes)] pub async fn wait_timeout<'a, T>( &self, @@ -228,13 +242,63 @@ impl Condvar { dur: Duration, ) -> (MutexGuard<'a, T>, WaitTimeoutResult) { let mutex = guard_lock(&guard); - let timeout_result = TimeoutWaitFuture { - await_notify: self.await_notify(guard), - delay: Delay::new(dur), + match timeout(dur, self.wait(guard)).await { + Ok(guard) => (guard, WaitTimeoutResult(false)), + Err(_) => (mutex.lock().await, WaitTimeoutResult(true)), } - .await; + } - (mutex.lock().await, timeout_result) + /// Waits on this condition variable for a notification, timing out after a specified duration. + /// Spurious wakes will not cause this function to return. + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use std::sync::Arc; + /// use std::time::Duration; + /// + /// use async_std::sync::{Mutex, Condvar}; + /// use async_std::task; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// task::spawn(async move { + /// let (lock, cvar) = &*pair2; + /// let mut started = lock.lock().await; + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // wait for the thread to start up + /// let (lock, cvar) = &*pair; + /// let result = cvar.wait_timeout_until( + /// lock.lock().await, + /// Duration::from_millis(100), + /// |&mut started| started, + /// ).await; + /// if result.1.timed_out() { + /// // timed-out without the condition ever evaluating to true. + /// } + /// // access the locked mutex via result.0 + /// # }); + /// ``` + #[allow(clippy::needless_lifetimes)] + pub async fn wait_timeout_until<'a, T, F>( + &self, + guard: MutexGuard<'a, T>, + dur: Duration, + condition: F, + ) -> (MutexGuard<'a, T>, WaitTimeoutResult) + where + F: FnMut(&mut T) -> bool, + { + let mutex = guard_lock(&guard); + match timeout(dur, self.wait_until(guard, condition)).await { + Ok(guard) => (guard, WaitTimeoutResult(false)), + Err(_) => (mutex.lock().await, WaitTimeoutResult(true)), + } } /// Wakes up one blocked task on this condvar. @@ -365,27 +429,3 @@ impl<'a, 'b, T> Drop for AwaitNotify<'a, 'b, T> { } } } - -struct TimeoutWaitFuture<'a, 'b, T> { - await_notify: AwaitNotify<'a, 'b, T>, - delay: Delay, -} - -impl<'a, 'b, T> TimeoutWaitFuture<'a, 'b, T> { - pin_utils::unsafe_pinned!(await_notify: AwaitNotify<'a, 'b, T>); - pin_utils::unsafe_pinned!(delay: Delay); -} - -impl<'a, 'b, T> Future for TimeoutWaitFuture<'a, 'b, T> { - type Output = WaitTimeoutResult; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.as_mut().delay().poll(cx) { - Poll::Pending => match self.await_notify().poll(cx) { - Poll::Ready(_) => Poll::Ready(WaitTimeoutResult(false)), - Poll::Pending => Poll::Pending, - }, - Poll::Ready(_) => Poll::Ready(WaitTimeoutResult(true)), - } - } -} diff --git a/tests/condvar.rs b/tests/condvar.rs index 60378ee1e..639d986ce 100644 --- a/tests/condvar.rs +++ b/tests/condvar.rs @@ -4,6 +4,7 @@ use std::time::Duration; use async_std::sync::{Condvar, Mutex}; use async_std::task::{self, JoinHandle}; +#[cfg(feature = "unstable")] #[test] fn wait_timeout() { task::block_on(async { @@ -25,6 +26,21 @@ fn wait_timeout() { }) } +#[test] +fn wait_timeout_until_timed_out() { + task::block_on(async { + let m = Mutex::new(false); + let c = Condvar::new(); + + let (_, wait_result) = c + .wait_timeout_until(m.lock().await, Duration::from_millis(10), |&mut started| { + started + }) + .await; + assert!(wait_result.timed_out()); + }) +} + #[test] fn notify_all() { task::block_on(async { From c6cfd2c1f211575b6ddb9fc0f98d5950333d2f05 Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Mon, 4 Nov 2019 01:30:51 -0700 Subject: [PATCH 6/8] Use WakerSet for Condvar This should also address concerns about spurious wakeups. --- src/sync/condvar.rs | 77 +++++++++++++++---------------------------- src/sync/mod.rs | 4 +-- src/sync/mutex.rs | 1 + src/sync/waker_set.rs | 23 +++++++++++++ tests/condvar.rs | 2 +- 5 files changed, 54 insertions(+), 53 deletions(-) diff --git a/src/sync/condvar.rs b/src/sync/condvar.rs index 3cdac3057..1b35c43a7 100644 --- a/src/sync/condvar.rs +++ b/src/sync/condvar.rs @@ -1,11 +1,11 @@ +use std::fmt; use std::pin::Pin; use std::time::Duration; -use slab::Slab; - use super::mutex::{guard_lock, MutexGuard}; use crate::future::{timeout, Future}; -use crate::task::{Context, Poll, Waker}; +use crate::sync::WakerSet; +use crate::task::{Context, Poll}; #[derive(Debug, PartialEq, Eq, Copy, Clone)] pub struct WaitTimeoutResult(bool); @@ -56,11 +56,13 @@ impl WaitTimeoutResult { /// /// # }) /// ``` -#[derive(Debug)] pub struct Condvar { - blocked: std::sync::Mutex>>, + wakers: WakerSet, } +unsafe impl Send for Condvar {} +unsafe impl Sync for Condvar {} + impl Default for Condvar { fn default() -> Self { Condvar::new() @@ -79,7 +81,7 @@ impl Condvar { /// ``` pub fn new() -> Self { Condvar { - blocked: std::sync::Mutex::new(Slab::new()), + wakers: WakerSet::new(), } } @@ -88,12 +90,6 @@ impl Condvar { /// Unlike the std equivalent, this does not check that a single mutex is used at runtime. /// However, as a best practice avoid using with multiple mutexes. /// - /// # Warning - /// Any attempt to poll this future before the notification is received will result in a - /// spurious wakeup. This allows the implementation to be efficient, and is technically valid - /// semantics for a condition variable. However, this may result in unexpected behaviour when this future is - /// used with future combinators. In most cases `Condvar::wait_until` is easier to use correctly. - /// /// # Examples /// /// ``` @@ -136,7 +132,6 @@ impl Condvar { cond: self, guard: Some(guard), key: None, - notified: false, } } @@ -189,13 +184,6 @@ impl Condvar { /// Waits on this condition variable for a notification, timing out after a specified duration. /// - /// # Warning - /// This has similar limitations to `Condvar::wait`, where polling before a notify is sent can - /// result in a spurious wakeup. In addition, the timeout may itself trigger a spurious wakeup, - /// if no other task is holding the mutex when the future is polled. Thus the - /// `WaitTimeoutResult` should not be trusted to determine if the condition variable was - /// actually notified. - /// /// For these reasons `Condvar::wait_timeout_until` is recommended in most cases. /// /// # Examples @@ -234,7 +222,6 @@ impl Condvar { /// # /// # }) /// ``` - #[cfg(feature = "unstable")] #[allow(clippy::needless_lifetimes)] pub async fn wait_timeout<'a, T>( &self, @@ -332,8 +319,7 @@ impl Condvar { /// # }) } /// ``` pub fn notify_one(&self) { - let blocked = self.blocked.lock().unwrap(); - notify(blocked, false); + self.wakers.notify_one(); } /// Wakes up all blocked tasks on this condvar. @@ -369,20 +355,14 @@ impl Condvar { /// # }) } /// ``` pub fn notify_all(&self) { - let blocked = self.blocked.lock().unwrap(); - notify(blocked, true); + self.wakers.notify_all(); } } -#[inline] -fn notify(mut blocked: std::sync::MutexGuard<'_, Slab>>, all: bool) { - for (_, entry) in blocked.iter_mut() { - if let Some(w) = entry.take() { - w.wake(); - if !all { - return; - } - } +impl fmt::Debug for Condvar { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + //f.debug_struct("Condvar").finish() + f.pad("Condvar { .. }") } } @@ -390,7 +370,6 @@ struct AwaitNotify<'a, 'b, T> { cond: &'a Condvar, guard: Option>, key: Option, - notified: bool, } impl<'a, 'b, T> Future for AwaitNotify<'a, 'b, T> { @@ -399,16 +378,22 @@ impl<'a, 'b, T> Future for AwaitNotify<'a, 'b, T> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.guard.take() { Some(_) => { - let mut blocked = self.cond.blocked.lock().unwrap(); - let w = cx.waker().clone(); - self.key = Some(blocked.insert(Some(w))); - + self.key = Some(self.cond.wakers.insert(cx)); // the guard is dropped when we return, which frees the lock Poll::Pending } None => { - self.notified = true; - Poll::Ready(()) + if let Some(key) = self.key { + if self.cond.wakers.complete_if_notified(key, cx) { + self.key = None; + Poll::Ready(()) + } else { + Poll::Pending + } + } else { + // This should only happen if it is polled twice after receiving a notification + Poll::Ready(()) + } } } } @@ -417,15 +402,7 @@ impl<'a, 'b, T> Future for AwaitNotify<'a, 'b, T> { impl<'a, 'b, T> Drop for AwaitNotify<'a, 'b, T> { fn drop(&mut self) { if let Some(key) = self.key { - let mut blocked = self.cond.blocked.lock().unwrap(); - let opt_waker = blocked.remove(key); - - if opt_waker.is_none() && !self.notified { - // wake up the next task, because this task was notified, but - // we are dropping it before it can finished. - // This may result in a spurious wake-up, but that's ok. - notify(blocked, false); - } + self.cond.wakers.cancel(key); } } } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 085e82499..1531f8c57 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -176,19 +176,19 @@ #[doc(inline)] pub use std::sync::{Arc, Weak}; -pub use condvar::Condvar; pub use mutex::{Mutex, MutexGuard}; pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -mod condvar; mod mutex; mod rwlock; cfg_unstable! { pub use barrier::{Barrier, BarrierWaitResult}; pub use channel::{channel, Sender, Receiver, RecvError, TryRecvError, TrySendError}; + pub use condvar::Condvar; mod barrier; + mod condvar; mod channel; } diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 65d47dee1..ae953fd82 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -288,6 +288,7 @@ impl DerefMut for MutexGuard<'_, T> { } } +#[cfg(feature = "unstable")] pub fn guard_lock<'a, T>(guard: &MutexGuard<'a, T>) -> &'a Mutex { guard.0 } diff --git a/src/sync/waker_set.rs b/src/sync/waker_set.rs index 7e897af15..50c95c416 100644 --- a/src/sync/waker_set.rs +++ b/src/sync/waker_set.rs @@ -80,6 +80,29 @@ impl WakerSet { } } + /// If the waker for this key is still waiting for a notification, then update + /// the waker for the entry, and return false. If the waker has been notified, + /// treat the entry as completed and return true. + #[cfg(feature = "unstable")] + pub fn complete_if_notified(&self, key: usize, cx: &Context<'_>) -> bool { + let mut inner = self.lock(); + + match &mut inner.entries[key] { + None => { + inner.entries.remove(key); + inner.none_count -= 1; + true + } + Some(w) => { + // We were never woken, so update instead + if !w.will_wake(cx.waker()) { + *w = cx.waker().clone(); + } + false + } + } + } + /// Removes the waker of a cancelled operation. /// /// Returns `true` if another blocked operation from the set was notified. diff --git a/tests/condvar.rs b/tests/condvar.rs index 639d986ce..0acfb2f97 100644 --- a/tests/condvar.rs +++ b/tests/condvar.rs @@ -1,10 +1,10 @@ +#![cfg(feature = "unstable")] use std::sync::Arc; use std::time::Duration; use async_std::sync::{Condvar, Mutex}; use async_std::task::{self, JoinHandle}; -#[cfg(feature = "unstable")] #[test] fn wait_timeout() { task::block_on(async { From 552928d7c3f179570db3468df6cecf2b87719b31 Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Mon, 4 Nov 2019 01:33:52 -0700 Subject: [PATCH 7/8] Add test for wait_timeout with no lock held --- src/sync/condvar.rs | 2 +- src/sync/waker_set.rs | 3 +-- tests/condvar.rs | 15 ++++++++++++++- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/sync/condvar.rs b/src/sync/condvar.rs index 1b35c43a7..5921230ee 100644 --- a/src/sync/condvar.rs +++ b/src/sync/condvar.rs @@ -384,7 +384,7 @@ impl<'a, 'b, T> Future for AwaitNotify<'a, 'b, T> { } None => { if let Some(key) = self.key { - if self.cond.wakers.complete_if_notified(key, cx) { + if self.cond.wakers.remove_if_notified(key, cx) { self.key = None; Poll::Ready(()) } else { diff --git a/src/sync/waker_set.rs b/src/sync/waker_set.rs index 50c95c416..881304bac 100644 --- a/src/sync/waker_set.rs +++ b/src/sync/waker_set.rs @@ -84,13 +84,12 @@ impl WakerSet { /// the waker for the entry, and return false. If the waker has been notified, /// treat the entry as completed and return true. #[cfg(feature = "unstable")] - pub fn complete_if_notified(&self, key: usize, cx: &Context<'_>) -> bool { + pub fn remove_if_notified(&self, key: usize, cx: &Context<'_>) -> bool { let mut inner = self.lock(); match &mut inner.entries[key] { None => { inner.entries.remove(key); - inner.none_count -= 1; true } Some(w) => { diff --git a/tests/condvar.rs b/tests/condvar.rs index 0acfb2f97..c4d680fc9 100644 --- a/tests/condvar.rs +++ b/tests/condvar.rs @@ -6,7 +6,7 @@ use async_std::sync::{Condvar, Mutex}; use async_std::task::{self, JoinHandle}; #[test] -fn wait_timeout() { +fn wait_timeout_with_lock() { task::block_on(async { let pair = Arc::new((Mutex::new(false), Condvar::new())); let pair2 = pair.clone(); @@ -26,6 +26,19 @@ fn wait_timeout() { }) } +#[test] +fn wait_timeout_without_lock() { + task::block_on(async { + let m = Mutex::new(false); + let c = Condvar::new(); + + let (_, wait_result) = c + .wait_timeout(m.lock().await, Duration::from_millis(10)) + .await; + assert!(wait_result.timed_out()); + }) +} + #[test] fn wait_timeout_until_timed_out() { task::block_on(async { From e4b287062ec7a64405d80059a54243c6503d9363 Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Sat, 11 Apr 2020 00:54:18 -0600 Subject: [PATCH 8/8] Add comments describing AwaitNotify struct And remove an unnneded comment in a Debug implementation --- src/sync/condvar.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/sync/condvar.rs b/src/sync/condvar.rs index 5921230ee..67507f384 100644 --- a/src/sync/condvar.rs +++ b/src/sync/condvar.rs @@ -361,14 +361,23 @@ impl Condvar { impl fmt::Debug for Condvar { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - //f.debug_struct("Condvar").finish() f.pad("Condvar { .. }") } } +/// A future that waits for another task to notify the condition variable. +/// +/// This is an internal future that `wait` and `wait_until` await on. struct AwaitNotify<'a, 'b, T> { + /// The condition variable that we are waiting on cond: &'a Condvar, + /// The lock used with `cond`. + /// This will be released the first time the future is polled, + /// after registering the context to be notified. guard: Option>, + /// A key into the conditions variable's `WakerSet`. + /// This is set to the index of the `Waker` for the context each time + /// the future is polled and not completed. key: Option, }