Skip to content

Commit 1464fc3

Browse files
committed
Move thread parker to a separate module.
1 parent c9e5e6a commit 1464fc3

File tree

2 files changed

+142
-112
lines changed

2 files changed

+142
-112
lines changed

library/std/src/thread/mod.rs

Lines changed: 17 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@
149149
#[cfg(all(test, not(target_os = "emscripten")))]
150150
mod tests;
151151

152+
mod parker;
153+
152154
use crate::any::Any;
153155
use crate::cell::UnsafeCell;
154156
use crate::ffi::{CStr, CString};
@@ -159,15 +161,14 @@ use crate::num::NonZeroU64;
159161
use crate::panic;
160162
use crate::panicking;
161163
use crate::str;
162-
use crate::sync::atomic::AtomicUsize;
163-
use crate::sync::atomic::Ordering::SeqCst;
164-
use crate::sync::{Arc, Condvar, Mutex};
164+
use crate::sync::Arc;
165165
use crate::sys::thread as imp;
166166
use crate::sys_common::mutex;
167167
use crate::sys_common::thread;
168168
use crate::sys_common::thread_info;
169169
use crate::sys_common::{AsInner, IntoInner};
170170
use crate::time::Duration;
171+
use parker::Parker;
171172

172173
////////////////////////////////////////////////////////////////////////////////
173174
// Thread-local storage
@@ -667,6 +668,8 @@ pub fn current() -> Thread {
667668
///
668669
/// [`channel`]: crate::sync::mpsc
669670
/// [`join`]: JoinHandle::join
671+
/// [`Condvar`]: crate::sync::Condvar
672+
/// [`Mutex`]: crate::sync::Mutex
670673
#[stable(feature = "rust1", since = "1.0.0")]
671674
pub fn yield_now() {
672675
imp::Thread::yield_now()
@@ -712,6 +715,8 @@ pub fn yield_now() {
712715
/// panic!()
713716
/// }
714717
/// ```
718+
///
719+
/// [Mutex]: crate::sync::Mutex
715720
#[inline]
716721
#[stable(feature = "rust1", since = "1.0.0")]
717722
pub fn panicking() -> bool {
@@ -779,11 +784,6 @@ pub fn sleep(dur: Duration) {
779784
imp::Thread::sleep(dur)
780785
}
781786

782-
// constants for park/unpark
783-
const EMPTY: usize = 0;
784-
const PARKED: usize = 1;
785-
const NOTIFIED: usize = 2;
786-
787787
/// Blocks unless or until the current thread's token is made available.
788788
///
789789
/// A call to `park` does not guarantee that the thread will remain parked
@@ -870,45 +870,11 @@ const NOTIFIED: usize = 2;
870870
///
871871
/// [`unpark`]: Thread::unpark
872872
/// [`thread::park_timeout`]: park_timeout
873-
//
874-
// The implementation currently uses the trivial strategy of a Mutex+Condvar
875-
// with wakeup flag, which does not actually allow spurious wakeups. In the
876-
// future, this will be implemented in a more efficient way, perhaps along the lines of
877-
// http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp
878-
// or futuxes, and in either case may allow spurious wakeups.
879873
#[stable(feature = "rust1", since = "1.0.0")]
880874
pub fn park() {
881-
let thread = current();
882-
883-
// If we were previously notified then we consume this notification and
884-
// return quickly.
885-
if thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
886-
return;
887-
}
888-
889-
// Otherwise we need to coordinate going to sleep
890-
let mut m = thread.inner.lock.lock().unwrap();
891-
match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
892-
Ok(_) => {}
893-
Err(NOTIFIED) => {
894-
// We must read here, even though we know it will be `NOTIFIED`.
895-
// This is because `unpark` may have been called again since we read
896-
// `NOTIFIED` in the `compare_exchange` above. We must perform an
897-
// acquire operation that synchronizes with that `unpark` to observe
898-
// any writes it made before the call to unpark. To do that we must
899-
// read from the write it made to `state`.
900-
let old = thread.inner.state.swap(EMPTY, SeqCst);
901-
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
902-
return;
903-
} // should consume this notification, so prohibit spurious wakeups in next park.
904-
Err(_) => panic!("inconsistent park state"),
905-
}
906-
loop {
907-
m = thread.inner.cvar.wait(m).unwrap();
908-
match thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
909-
Ok(_) => return, // got a notification
910-
Err(_) => {} // spurious wakeup, go back to sleep
911-
}
875+
// SAFETY: park_timeout is called on the parker owned by this thread.
876+
unsafe {
877+
current().inner.parker.park();
912878
}
913879
}
914880

@@ -970,35 +936,9 @@ pub fn park_timeout_ms(ms: u32) {
970936
/// ```
971937
#[stable(feature = "park_timeout", since = "1.4.0")]
972938
pub fn park_timeout(dur: Duration) {
973-
let thread = current();
974-
975-
// Like `park` above we have a fast path for an already-notified thread, and
976-
// afterwards we start coordinating for a sleep.
977-
// return quickly.
978-
if thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
979-
return;
980-
}
981-
let m = thread.inner.lock.lock().unwrap();
982-
match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
983-
Ok(_) => {}
984-
Err(NOTIFIED) => {
985-
// We must read again here, see `park`.
986-
let old = thread.inner.state.swap(EMPTY, SeqCst);
987-
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
988-
return;
989-
} // should consume this notification, so prohibit spurious wakeups in next park.
990-
Err(_) => panic!("inconsistent park_timeout state"),
991-
}
992-
993-
// Wait with a timeout, and if we spuriously wake up or otherwise wake up
994-
// from a notification we just want to unconditionally set the state back to
995-
// empty, either consuming a notification or un-flagging ourselves as
996-
// parked.
997-
let (_m, _result) = thread.inner.cvar.wait_timeout(m, dur).unwrap();
998-
match thread.inner.state.swap(EMPTY, SeqCst) {
999-
NOTIFIED => {} // got a notification, hurray!
1000-
PARKED => {} // no notification, alas
1001-
n => panic!("inconsistent park_timeout state: {}", n),
939+
// SAFETY: park_timeout is called on the parker owned by this thread.
940+
unsafe {
941+
current().inner.parker.park_timeout(dur);
1002942
}
1003943
}
1004944

@@ -1077,11 +1017,7 @@ impl ThreadId {
10771017
struct Inner {
10781018
name: Option<CString>, // Guaranteed to be UTF-8
10791019
id: ThreadId,
1080-
1081-
// state for thread park/unpark
1082-
state: AtomicUsize,
1083-
lock: Mutex<()>,
1084-
cvar: Condvar,
1020+
parker: Parker,
10851021
}
10861022

10871023
#[derive(Clone)]
@@ -1115,13 +1051,7 @@ impl Thread {
11151051
let cname =
11161052
name.map(|n| CString::new(n).expect("thread name may not contain interior null bytes"));
11171053
Thread {
1118-
inner: Arc::new(Inner {
1119-
name: cname,
1120-
id: ThreadId::new(),
1121-
state: AtomicUsize::new(EMPTY),
1122-
lock: Mutex::new(()),
1123-
cvar: Condvar::new(),
1124-
}),
1054+
inner: Arc::new(Inner { name: cname, id: ThreadId::new(), parker: Parker::new() }),
11251055
}
11261056
}
11271057

@@ -1157,32 +1087,7 @@ impl Thread {
11571087
/// ```
11581088
#[stable(feature = "rust1", since = "1.0.0")]
11591089
pub fn unpark(&self) {
1160-
// To ensure the unparked thread will observe any writes we made
1161-
// before this call, we must perform a release operation that `park`
1162-
// can synchronize with. To do that we must write `NOTIFIED` even if
1163-
// `state` is already `NOTIFIED`. That is why this must be a swap
1164-
// rather than a compare-and-swap that returns if it reads `NOTIFIED`
1165-
// on failure.
1166-
match self.inner.state.swap(NOTIFIED, SeqCst) {
1167-
EMPTY => return, // no one was waiting
1168-
NOTIFIED => return, // already unparked
1169-
PARKED => {} // gotta go wake someone up
1170-
_ => panic!("inconsistent state in unpark"),
1171-
}
1172-
1173-
// There is a period between when the parked thread sets `state` to
1174-
// `PARKED` (or last checked `state` in the case of a spurious wake
1175-
// up) and when it actually waits on `cvar`. If we were to notify
1176-
// during this period it would be ignored and then when the parked
1177-
// thread went to sleep it would never wake up. Fortunately, it has
1178-
// `lock` locked at this stage so we can acquire `lock` to wait until
1179-
// it is ready to receive the notification.
1180-
//
1181-
// Releasing `lock` before the call to `notify_one` means that when the
1182-
// parked thread wakes it doesn't get woken only to have to wait for us
1183-
// to release `lock`.
1184-
drop(self.inner.lock.lock().unwrap());
1185-
self.inner.cvar.notify_one()
1090+
self.inner.parker.unpark();
11861091
}
11871092

11881093
/// Gets the thread's unique identifier.

library/std/src/thread/parker/mod.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
//! Parker implementaiton based on a Mutex and Condvar.
2+
//!
3+
//! The implementation currently uses the trivial strategy of a Mutex+Condvar
4+
//! with wakeup flag, which does not actually allow spurious wakeups. In the
5+
//! future, this will be implemented in a more efficient way, perhaps along the lines of
6+
//! http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp
7+
//! or futuxes, and in either case may allow spurious wakeups.
8+
9+
use crate::sync::atomic::AtomicUsize;
10+
use crate::sync::atomic::Ordering::SeqCst;
11+
use crate::sync::{Condvar, Mutex};
12+
use crate::time::Duration;
13+
14+
const EMPTY: usize = 0;
15+
const PARKED: usize = 1;
16+
const NOTIFIED: usize = 2;
17+
18+
pub struct Parker {
19+
state: AtomicUsize,
20+
lock: Mutex<()>,
21+
cvar: Condvar,
22+
}
23+
24+
impl Parker {
25+
pub fn new() -> Self {
26+
Parker { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new() }
27+
}
28+
29+
// This implementaiton doesn't require `unsafe`, but other implementations
30+
// may assume this is only called by the thread that owns the Parker.
31+
pub unsafe fn park(&self) {
32+
// If we were previously notified then we consume this notification and
33+
// return quickly.
34+
if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
35+
return;
36+
}
37+
38+
// Otherwise we need to coordinate going to sleep
39+
let mut m = self.lock.lock().unwrap();
40+
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
41+
Ok(_) => {}
42+
Err(NOTIFIED) => {
43+
// We must read here, even though we know it will be `NOTIFIED`.
44+
// This is because `unpark` may have been called again since we read
45+
// `NOTIFIED` in the `compare_exchange` above. We must perform an
46+
// acquire operation that synchronizes with that `unpark` to observe
47+
// any writes it made before the call to unpark. To do that we must
48+
// read from the write it made to `state`.
49+
let old = self.state.swap(EMPTY, SeqCst);
50+
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
51+
return;
52+
} // should consume this notification, so prohibit spurious wakeups in next park.
53+
Err(_) => panic!("inconsistent park state"),
54+
}
55+
loop {
56+
m = self.cvar.wait(m).unwrap();
57+
match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
58+
Ok(_) => return, // got a notification
59+
Err(_) => {} // spurious wakeup, go back to sleep
60+
}
61+
}
62+
}
63+
64+
// This implementaiton doesn't require `unsafe`, but other implementations
65+
// may assume this is only called by the thread that owns the Parker.
66+
pub unsafe fn park_timeout(&self, dur: Duration) {
67+
// Like `park` above we have a fast path for an already-notified thread, and
68+
// afterwards we start coordinating for a sleep.
69+
// return quickly.
70+
if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
71+
return;
72+
}
73+
let m = self.lock.lock().unwrap();
74+
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
75+
Ok(_) => {}
76+
Err(NOTIFIED) => {
77+
// We must read again here, see `park`.
78+
let old = self.state.swap(EMPTY, SeqCst);
79+
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
80+
return;
81+
} // should consume this notification, so prohibit spurious wakeups in next park.
82+
Err(_) => panic!("inconsistent park_timeout state"),
83+
}
84+
85+
// Wait with a timeout, and if we spuriously wake up or otherwise wake up
86+
// from a notification we just want to unconditionally set the state back to
87+
// empty, either consuming a notification or un-flagging ourselves as
88+
// parked.
89+
let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap();
90+
match self.state.swap(EMPTY, SeqCst) {
91+
NOTIFIED => {} // got a notification, hurray!
92+
PARKED => {} // no notification, alas
93+
n => panic!("inconsistent park_timeout state: {}", n),
94+
}
95+
}
96+
97+
pub fn unpark(&self) {
98+
// To ensure the unparked thread will observe any writes we made
99+
// before this call, we must perform a release operation that `park`
100+
// can synchronize with. To do that we must write `NOTIFIED` even if
101+
// `state` is already `NOTIFIED`. That is why this must be a swap
102+
// rather than a compare-and-swap that returns if it reads `NOTIFIED`
103+
// on failure.
104+
match self.state.swap(NOTIFIED, SeqCst) {
105+
EMPTY => return, // no one was waiting
106+
NOTIFIED => return, // already unparked
107+
PARKED => {} // gotta go wake someone up
108+
_ => panic!("inconsistent state in unpark"),
109+
}
110+
111+
// There is a period between when the parked thread sets `state` to
112+
// `PARKED` (or last checked `state` in the case of a spurious wake
113+
// up) and when it actually waits on `cvar`. If we were to notify
114+
// during this period it would be ignored and then when the parked
115+
// thread went to sleep it would never wake up. Fortunately, it has
116+
// `lock` locked at this stage so we can acquire `lock` to wait until
117+
// it is ready to receive the notification.
118+
//
119+
// Releasing `lock` before the call to `notify_one` means that when the
120+
// parked thread wakes it doesn't get woken only to have to wait for us
121+
// to release `lock`.
122+
drop(self.lock.lock().unwrap());
123+
self.cvar.notify_one()
124+
}
125+
}

0 commit comments

Comments
 (0)