Skip to content

Commit 0f7756e

Browse files
committed
De-bloat Mutexes by moving with RawMutex
All `Mutex`es now internally use `RawMutex` (which is similar to a `RawMutex<()>`, only providing locking semantics but not data), therefore instantiating `Mutex`es on different types do not duplicate code. This patch does not otherwise change the algorithm used by `Mutex`. Signed-off-by: Gary Guo <gary@garyguo.net>
1 parent 1dd0478 commit 0f7756e

File tree

1 file changed

+130
-100
lines changed

1 file changed

+130
-100
lines changed

src/sync/mutex.rs

Lines changed: 130 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,130 @@ const LOCK: usize = 1;
1515
/// Set if there are tasks blocked on the mutex.
1616
const BLOCKED: usize = 1 << 1;
1717

18+
struct RawMutex {
19+
state: AtomicUsize,
20+
blocked: std::sync::Mutex<Slab<Option<Waker>>>,
21+
}
22+
23+
unsafe impl Send for RawMutex {}
24+
unsafe impl Sync for RawMutex {}
25+
26+
impl RawMutex {
27+
/// Creates a new raw mutex.
28+
pub fn new() -> RawMutex {
29+
RawMutex {
30+
state: AtomicUsize::new(0),
31+
blocked: std::sync::Mutex::new(Slab::new()),
32+
}
33+
}
34+
35+
/// Acquires the lock.
36+
///
37+
/// We don't use `async` signature here for performance concern.
38+
pub fn lock(&self) -> RawLockFuture<'_> {
39+
RawLockFuture {
40+
mutex: self,
41+
opt_key: None,
42+
acquired: false,
43+
}
44+
}
45+
46+
/// Attempts to acquire the lock.
47+
pub fn try_lock(&self) -> bool {
48+
self.state.fetch_or(LOCK, Ordering::Acquire) & LOCK == 0
49+
}
50+
51+
/// Unlock this mutex.
52+
pub fn unlock(&self) {
53+
let state = self.state.fetch_and(!LOCK, Ordering::AcqRel);
54+
55+
// If there are any blocked tasks, wake one of them up.
56+
if state & BLOCKED != 0 {
57+
let mut blocked = self.blocked.lock().unwrap();
58+
59+
if let Some((_, opt_waker)) = blocked.iter_mut().next() {
60+
// If there is no waker in this entry, that means it was already woken.
61+
if let Some(w) = opt_waker.take() {
62+
w.wake();
63+
}
64+
}
65+
}
66+
}
67+
}
68+
69+
struct RawLockFuture<'a> {
70+
mutex: &'a RawMutex,
71+
opt_key: Option<usize>,
72+
acquired: bool,
73+
}
74+
75+
impl<'a> Future for RawLockFuture<'a> {
76+
type Output = ();
77+
78+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79+
if self.mutex.try_lock() {
80+
self.acquired = true;
81+
Poll::Ready(())
82+
} else {
83+
let mut blocked = self.mutex.blocked.lock().unwrap();
84+
85+
// Register the current task.
86+
match self.opt_key {
87+
None => {
88+
// Insert a new entry into the list of blocked tasks.
89+
let w = cx.waker().clone();
90+
let key = blocked.insert(Some(w));
91+
self.opt_key = Some(key);
92+
93+
if blocked.len() == 1 {
94+
self.mutex.state.fetch_or(BLOCKED, Ordering::Relaxed);
95+
}
96+
}
97+
Some(key) => {
98+
// There is already an entry in the list of blocked tasks. Just
99+
// reset the waker if it was removed.
100+
if blocked[key].is_none() {
101+
let w = cx.waker().clone();
102+
blocked[key] = Some(w);
103+
}
104+
}
105+
}
106+
107+
// Try locking again because it's possible the mutex got unlocked just
108+
// before the current task was registered as a blocked task.
109+
if self.mutex.try_lock() {
110+
self.acquired = true;
111+
Poll::Ready(())
112+
} else {
113+
Poll::Pending
114+
}
115+
}
116+
}
117+
}
118+
119+
impl Drop for RawLockFuture<'_> {
120+
fn drop(&mut self) {
121+
if let Some(key) = self.opt_key {
122+
let mut blocked = self.mutex.blocked.lock().unwrap();
123+
let opt_waker = blocked.remove(key);
124+
125+
if opt_waker.is_none() && !self.acquired {
126+
// We were awoken but didn't acquire the lock. Wake up another task.
127+
if let Some((_, opt_waker)) = blocked.iter_mut().next() {
128+
// If there is no waker in this entry, that means it was already woken.
129+
if let Some(w) = opt_waker.take() {
130+
w.wake();
131+
}
132+
}
133+
}
134+
135+
if blocked.is_empty() {
136+
self.mutex.state.fetch_and(!BLOCKED, Ordering::Relaxed);
137+
}
138+
}
139+
}
140+
}
141+
18142
/// A mutual exclusion primitive for protecting shared data.
19143
///
20144
/// This type is an async version of [`std::sync::Mutex`].
@@ -49,8 +173,7 @@ const BLOCKED: usize = 1 << 1;
49173
/// # })
50174
/// ```
51175
pub struct Mutex<T> {
52-
state: AtomicUsize,
53-
blocked: std::sync::Mutex<Slab<Option<Waker>>>,
176+
mutex: RawMutex,
54177
value: UnsafeCell<T>,
55178
}
56179

@@ -69,8 +192,7 @@ impl<T> Mutex<T> {
69192
/// ```
70193
pub fn new(t: T) -> Mutex<T> {
71194
Mutex {
72-
state: AtomicUsize::new(0),
73-
blocked: std::sync::Mutex::new(Slab::new()),
195+
mutex: RawMutex::new(),
74196
value: UnsafeCell::new(t),
75197
}
76198
}
@@ -102,88 +224,8 @@ impl<T> Mutex<T> {
102224
/// # })
103225
/// ```
104226
pub async fn lock(&self) -> MutexGuard<'_, T> {
105-
pub struct LockFuture<'a, T> {
106-
mutex: &'a Mutex<T>,
107-
opt_key: Option<usize>,
108-
acquired: bool,
109-
}
110-
111-
impl<'a, T> Future for LockFuture<'a, T> {
112-
type Output = MutexGuard<'a, T>;
113-
114-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
115-
match self.mutex.try_lock() {
116-
Some(guard) => {
117-
self.acquired = true;
118-
Poll::Ready(guard)
119-
}
120-
None => {
121-
let mut blocked = self.mutex.blocked.lock().unwrap();
122-
123-
// Register the current task.
124-
match self.opt_key {
125-
None => {
126-
// Insert a new entry into the list of blocked tasks.
127-
let w = cx.waker().clone();
128-
let key = blocked.insert(Some(w));
129-
self.opt_key = Some(key);
130-
131-
if blocked.len() == 1 {
132-
self.mutex.state.fetch_or(BLOCKED, Ordering::Relaxed);
133-
}
134-
}
135-
Some(key) => {
136-
// There is already an entry in the list of blocked tasks. Just
137-
// reset the waker if it was removed.
138-
if blocked[key].is_none() {
139-
let w = cx.waker().clone();
140-
blocked[key] = Some(w);
141-
}
142-
}
143-
}
144-
145-
// Try locking again because it's possible the mutex got unlocked just
146-
// before the current task was registered as a blocked task.
147-
match self.mutex.try_lock() {
148-
Some(guard) => {
149-
self.acquired = true;
150-
Poll::Ready(guard)
151-
}
152-
None => Poll::Pending,
153-
}
154-
}
155-
}
156-
}
157-
}
158-
159-
impl<T> Drop for LockFuture<'_, T> {
160-
fn drop(&mut self) {
161-
if let Some(key) = self.opt_key {
162-
let mut blocked = self.mutex.blocked.lock().unwrap();
163-
let opt_waker = blocked.remove(key);
164-
165-
if opt_waker.is_none() && !self.acquired {
166-
// We were awoken but didn't acquire the lock. Wake up another task.
167-
if let Some((_, opt_waker)) = blocked.iter_mut().next() {
168-
if let Some(w) = opt_waker.take() {
169-
w.wake();
170-
}
171-
}
172-
}
173-
174-
if blocked.is_empty() {
175-
self.mutex.state.fetch_and(!BLOCKED, Ordering::Relaxed);
176-
}
177-
}
178-
}
179-
}
180-
181-
LockFuture {
182-
mutex: self,
183-
opt_key: None,
184-
acquired: false,
185-
}
186-
.await
227+
self.mutex.lock().await;
228+
MutexGuard(self)
187229
}
188230

189231
/// Attempts to acquire the lock.
@@ -220,7 +262,7 @@ impl<T> Mutex<T> {
220262
/// # })
221263
/// ```
222264
pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
223-
if self.state.fetch_or(LOCK, Ordering::Acquire) & LOCK == 0 {
265+
if self.mutex.try_lock() {
224266
Some(MutexGuard(self))
225267
} else {
226268
None
@@ -303,19 +345,7 @@ unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}
303345

304346
impl<T> Drop for MutexGuard<'_, T> {
305347
fn drop(&mut self) {
306-
let state = self.0.state.fetch_and(!LOCK, Ordering::AcqRel);
307-
308-
// If there are any blocked tasks, wake one of them up.
309-
if state & BLOCKED != 0 {
310-
let mut blocked = self.0.blocked.lock().unwrap();
311-
312-
if let Some((_, opt_waker)) = blocked.iter_mut().next() {
313-
// If there is no waker in this entry, that means it was already woken.
314-
if let Some(w) = opt_waker.take() {
315-
w.wake();
316-
}
317-
}
318-
}
348+
self.0.mutex.unlock();
319349
}
320350
}
321351

0 commit comments

Comments
 (0)