Skip to content

Commit 6cb463c

Browse files
committed
Add futex-based RwLock on Linux.
1 parent f1a4041 commit 6cb463c

File tree

2 files changed

+295
-2
lines changed

2 files changed

+295
-2
lines changed
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
use crate::sync::atomic::{
2+
AtomicI32,
3+
Ordering::{Acquire, Relaxed, Release},
4+
};
5+
use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};
6+
7+
pub type MovableRwLock = RwLock;
8+
9+
pub struct RwLock {
10+
// The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
11+
// Bits 0..30:
12+
// 0: Unlocked
13+
// 1..=0x3FFF_FFFE: Locked by N readers
14+
// 0x3FFF_FFFF: Write locked
15+
// Bit 30: Readers are waiting on this futex.
16+
// Bit 31: Writers are waiting on the writer_notify futex.
17+
state: AtomicI32,
18+
// The 'condition variable' to notify writers through.
19+
// Incremented on every signal.
20+
writer_notify: AtomicI32,
21+
}
22+
23+
const READ_LOCKED: i32 = 1;
24+
const MASK: i32 = (1 << 30) - 1;
25+
const WRITE_LOCKED: i32 = MASK;
26+
const MAX_READERS: i32 = MASK - 1;
27+
const READERS_WAITING: i32 = 1 << 30;
28+
const WRITERS_WAITING: i32 = 1 << 31;
29+
30+
fn unlocked(state: i32) -> bool {
31+
state & MASK == 0
32+
}
33+
34+
fn write_locked(state: i32) -> bool {
35+
state & MASK == WRITE_LOCKED
36+
}
37+
38+
fn readers_waiting(state: i32) -> bool {
39+
state & READERS_WAITING != 0
40+
}
41+
42+
fn writers_waiting(state: i32) -> bool {
43+
state & WRITERS_WAITING != 0
44+
}
45+
46+
fn read_lockable(state: i32) -> bool {
47+
// This also returns false if the counter could overflow if we tried to read lock it.
48+
state & MASK < MAX_READERS && !readers_waiting(state) && !writers_waiting(state)
49+
}
50+
51+
fn reached_max_readers(state: i32) -> bool {
52+
state & MASK == MAX_READERS
53+
}
54+
55+
impl RwLock {
56+
#[inline]
57+
pub const fn new() -> Self {
58+
Self { state: AtomicI32::new(0), writer_notify: AtomicI32::new(0) }
59+
}
60+
61+
#[inline]
62+
pub unsafe fn destroy(&self) {}
63+
64+
#[inline]
65+
pub unsafe fn try_read(&self) -> bool {
66+
self.state
67+
.fetch_update(Acquire, Relaxed, |s| read_lockable(s).then(|| s + READ_LOCKED))
68+
.is_ok()
69+
}
70+
71+
#[inline]
72+
pub unsafe fn read(&self) {
73+
if !self.try_read() {
74+
self.read_contended();
75+
}
76+
}
77+
78+
#[inline]
79+
pub unsafe fn read_unlock(&self) {
80+
let state = self.state.fetch_sub(READ_LOCKED, Release) - 1;
81+
82+
// It's impossible for a reader to be waiting on a read-locked RwLock,
83+
// except if there is also a writer waiting.
84+
debug_assert!(!readers_waiting(state) || writers_waiting(state));
85+
86+
// Wake up a writer if we were the last reader and there's a writer waiting.
87+
if unlocked(state) && writers_waiting(state) {
88+
self.wake_writer_or_readers(state);
89+
}
90+
}
91+
92+
#[cold]
93+
fn read_contended(&self) {
94+
let mut state = self.spin_read();
95+
96+
loop {
97+
// If we can lock it, lock it.
98+
if read_lockable(state) {
99+
match self.state.compare_exchange(state, state + READ_LOCKED, Acquire, Relaxed) {
100+
Ok(_) => return, // Locked!
101+
Err(s) => {
102+
state = s;
103+
continue;
104+
}
105+
}
106+
}
107+
108+
// Check for overflow.
109+
if reached_max_readers(state) {
110+
panic!("too many active read locks on RwLock");
111+
}
112+
113+
// Make sure the readers waiting bit is set before we go to sleep.
114+
if !readers_waiting(state) {
115+
if let Err(s) =
116+
self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed)
117+
{
118+
state = s;
119+
continue;
120+
}
121+
}
122+
123+
// Wait for the state to change.
124+
futex_wait(&self.state, state | READERS_WAITING, None);
125+
126+
// Spin again after waking up.
127+
state = self.spin_read();
128+
}
129+
}
130+
131+
#[inline]
132+
pub unsafe fn try_write(&self) -> bool {
133+
self.state.fetch_update(Acquire, Relaxed, |s| unlocked(s).then(|| s + WRITE_LOCKED)).is_ok()
134+
}
135+
136+
#[inline]
137+
pub unsafe fn write(&self) {
138+
if !self.try_write() {
139+
self.write_contended();
140+
}
141+
}
142+
143+
#[inline]
144+
pub unsafe fn write_unlock(&self) {
145+
let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED;
146+
147+
debug_assert!(unlocked(state));
148+
149+
if writers_waiting(state) || readers_waiting(state) {
150+
self.wake_writer_or_readers(state);
151+
}
152+
}
153+
154+
#[cold]
155+
fn write_contended(&self) {
156+
let mut state = self.spin_write();
157+
158+
let mut other_writers_waiting = 0;
159+
160+
loop {
161+
// If it's unlocked, we try to lock it.
162+
if unlocked(state) {
163+
match self.state.compare_exchange(
164+
state,
165+
state | WRITE_LOCKED | other_writers_waiting,
166+
Acquire,
167+
Relaxed,
168+
) {
169+
Ok(_) => return, // Locked!
170+
Err(s) => {
171+
state = s;
172+
continue;
173+
}
174+
}
175+
}
176+
177+
// Set the waiting bit indicating that we're waiting on it.
178+
if !writers_waiting(state) {
179+
if let Err(s) =
180+
self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed)
181+
{
182+
state = s;
183+
continue;
184+
}
185+
}
186+
187+
// Other writers might be waiting now too, so we should make sure
188+
// we keep that bit on once we manage lock it.
189+
other_writers_waiting = WRITERS_WAITING;
190+
191+
// Examine the notification counter before we check if `state` has changed,
192+
// to make sure we don't miss any notifications.
193+
let seq = self.writer_notify.load(Acquire);
194+
195+
// Don't go to sleep if the lock has become available,
196+
// or if the writers waiting bit is no longer set.
197+
let s = self.state.load(Relaxed);
198+
if unlocked(state) || !writers_waiting(s) {
199+
state = s;
200+
continue;
201+
}
202+
203+
// Wait for the state to change.
204+
futex_wait(&self.writer_notify, seq, None);
205+
206+
// Spin again after waking up.
207+
state = self.spin_write();
208+
}
209+
}
210+
211+
/// Wake up waiting threads after unlocking.
212+
///
213+
/// If both are waiting, this will wake up only one writer, but will fall
214+
/// back to waking up readers if there was no writer to wake up.
215+
#[cold]
216+
fn wake_writer_or_readers(&self, mut state: i32) {
217+
assert!(unlocked(state));
218+
219+
// The readers waiting bit might be turned on at any point now,
220+
// since readers will block when there's anything waiting.
221+
// Writers will just lock the lock though, regardless of the waiting bits,
222+
// so we don't have to worry about the writer waiting bit.
223+
//
224+
// If the lock gets locked in the meantime, we don't have to do
225+
// anything, because then the thread that locked the lock will take
226+
// care of waking up waiters when it unlocks.
227+
228+
// If only writers are waiting, wake one of them up.
229+
if state == WRITERS_WAITING {
230+
match self.state.compare_exchange(state, 0, Relaxed, Relaxed) {
231+
Ok(_) => {
232+
self.wake_writer();
233+
return;
234+
}
235+
Err(s) => {
236+
// Maybe some readers are now waiting too. So, continue to the next `if`.
237+
state = s;
238+
}
239+
}
240+
}
241+
242+
// If both writers and readers are waiting, leave the readers waiting
243+
// and only wake up one writer.
244+
if state == READERS_WAITING + WRITERS_WAITING {
245+
if self.state.compare_exchange(state, READERS_WAITING, Relaxed, Relaxed).is_err() {
246+
// The lock got locked. Not our problem anymore.
247+
return;
248+
}
249+
if self.wake_writer() {
250+
return;
251+
}
252+
// No writers were actually waiting. Continue to wake up readers instead.
253+
state = READERS_WAITING;
254+
}
255+
256+
// If readers are waiting, wake them all up.
257+
if state == READERS_WAITING {
258+
if self.state.compare_exchange(state, 0, Relaxed, Relaxed).is_ok() {
259+
futex_wake_all(&self.state);
260+
}
261+
}
262+
}
263+
264+
fn wake_writer(&self) -> bool {
265+
self.writer_notify.fetch_add(1, Release);
266+
futex_wake(&self.writer_notify)
267+
}
268+
269+
/// Spin for a while, but stop directly at the given condition.
270+
fn spin_until(&self, f: impl Fn(i32) -> bool) -> i32 {
271+
let mut spin = 100; // Chosen by fair dice roll.
272+
loop {
273+
let state = self.state.load(Relaxed);
274+
if f(state) || spin == 0 {
275+
return state;
276+
}
277+
crate::hint::spin_loop();
278+
spin -= 1;
279+
}
280+
}
281+
282+
fn spin_write(&self) -> i32 {
283+
// Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair.
284+
self.spin_until(|state| unlocked(state) || writers_waiting(state))
285+
}
286+
287+
fn spin_read(&self) -> i32 {
288+
// Stop spinning when it's unlocked or read locked, or when there's waiting threads.
289+
self.spin_until(|state| {
290+
!write_locked(state) || readers_waiting(state) || writers_waiting(state)
291+
})
292+
}
293+
}

library/std/src/sys/unix/locks/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ cfg_if::cfg_if! {
44
target_os = "android",
55
))] {
66
mod futex;
7+
mod futex_rwlock;
78
#[allow(dead_code)]
89
mod pthread_mutex; // Only used for PthreadMutexAttr, needed by pthread_remutex.
910
mod pthread_remutex; // FIXME: Implement this using a futex
10-
mod pthread_rwlock; // FIXME: Implement this using a futex
1111
pub use futex::{Mutex, MovableMutex, Condvar, MovableCondvar};
1212
pub use pthread_remutex::ReentrantMutex;
13-
pub use pthread_rwlock::{RwLock, MovableRwLock};
13+
pub use futex_rwlock::{RwLock, MovableRwLock};
1414
} else {
1515
mod pthread_mutex;
1616
mod pthread_remutex;

0 commit comments

Comments
 (0)