-
Notifications
You must be signed in to change notification settings - Fork 13.4k
Replace RwLock by a futex based one on Linux #95801
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
f1a4041
6cb463c
307aa58
7c28791
1f2c2bb
c4a4f48
8339381
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,293 @@ | ||
use crate::sync::atomic::{ | ||
AtomicI32, | ||
Ordering::{Acquire, Relaxed, Release}, | ||
}; | ||
use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all}; | ||
|
||
pub type MovableRwLock = RwLock; | ||
|
||
pub struct RwLock { | ||
// The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag. | ||
// Bits 0..30: | ||
// 0: Unlocked | ||
// 1..=0x3FFF_FFFE: Locked by N readers | ||
// 0x3FFF_FFFF: Write locked | ||
// Bit 30: Readers are waiting on this futex. | ||
// Bit 31: Writers are waiting on the writer_notify futex. | ||
state: AtomicI32, | ||
// The 'condition variable' to notify writers through. | ||
// Incremented on every signal. | ||
writer_notify: AtomicI32, | ||
} | ||
|
||
const READ_LOCKED: i32 = 1; | ||
const MASK: i32 = (1 << 30) - 1; | ||
const WRITE_LOCKED: i32 = MASK; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems a bit strange to represent There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll try that out and see if it simplifies things. It probably does. Thanks. :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried it, but it does not seem to simplify things.
Simplifying the overflow check doesn't help, because at the points where this is checked, we also need to check the waiting bits too. I also prefer to combine the fields, to make invalid states unrepresentable. (Effectively |
||
const MAX_READERS: i32 = MASK - 1; | ||
const READERS_WAITING: i32 = 1 << 30; | ||
const WRITERS_WAITING: i32 = 1 << 31; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any reason why these are placed in the high bits rather than the low bits? In parking_lot I put the flags in the low bits so that counter overflows can be caught with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Putting them in the high bits makes it possible for the read condition to compile down to a single comparison:
is then equal to
(Not sure if llvm sees that though.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LLVM doesn't currently see this, although it sees a related pattern where the Reported upstream: llvm/llvm-project#54856 (Note that I looked at |
||
|
||
fn unlocked(state: i32) -> bool { | ||
state & MASK == 0 | ||
} | ||
|
||
fn write_locked(state: i32) -> bool { | ||
state & MASK == WRITE_LOCKED | ||
} | ||
|
||
fn readers_waiting(state: i32) -> bool { | ||
state & READERS_WAITING != 0 | ||
} | ||
|
||
fn writers_waiting(state: i32) -> bool { | ||
state & WRITERS_WAITING != 0 | ||
} | ||
|
||
fn read_lockable(state: i32) -> bool { | ||
// This also returns false if the counter could overflow if we tried to read lock it. | ||
state & MASK < MAX_READERS && !readers_waiting(state) && !writers_waiting(state) | ||
m-ou-se marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
fn reached_max_readers(state: i32) -> bool { | ||
state & MASK == MAX_READERS | ||
} | ||
m-ou-se marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
impl RwLock { | ||
#[inline] | ||
pub const fn new() -> Self { | ||
Self { state: AtomicI32::new(0), writer_notify: AtomicI32::new(0) } | ||
} | ||
|
||
#[inline] | ||
pub unsafe fn destroy(&self) {} | ||
|
||
#[inline] | ||
pub unsafe fn try_read(&self) -> bool { | ||
self.state | ||
.fetch_update(Acquire, Relaxed, |s| read_lockable(s).then(|| s + READ_LOCKED)) | ||
.is_ok() | ||
} | ||
|
||
#[inline] | ||
pub unsafe fn read(&self) { | ||
if !self.try_read() { | ||
m-ou-se marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.read_contended(); | ||
} | ||
} | ||
|
||
#[inline] | ||
pub unsafe fn read_unlock(&self) { | ||
let state = self.state.fetch_sub(READ_LOCKED, Release) - 1; | ||
m-ou-se marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// It's impossible for a reader to be waiting on a read-locked RwLock, | ||
// except if there is also a writer waiting. | ||
debug_assert!(!readers_waiting(state) || writers_waiting(state)); | ||
m-ou-se marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Wake up a writer if we were the last reader and there's a writer waiting. | ||
if unlocked(state) && writers_waiting(state) { | ||
self.wake_writer_or_readers(state); | ||
} | ||
} | ||
|
||
#[cold] | ||
fn read_contended(&self) { | ||
let mut state = self.spin_read(); | ||
|
||
loop { | ||
// If we can lock it, lock it. | ||
if read_lockable(state) { | ||
match self.state.compare_exchange(state, state + READ_LOCKED, Acquire, Relaxed) { | ||
m-ou-se marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Ok(_) => return, // Locked! | ||
Err(s) => { | ||
state = s; | ||
continue; | ||
} | ||
} | ||
} | ||
|
||
// Check for overflow. | ||
if reached_max_readers(state) { | ||
panic!("too many active read locks on RwLock"); | ||
m-ou-se marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// Make sure the readers waiting bit is set before we go to sleep. | ||
if !readers_waiting(state) { | ||
if let Err(s) = | ||
self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed) | ||
{ | ||
state = s; | ||
continue; | ||
} | ||
} | ||
|
||
// Wait for the state to change. | ||
futex_wait(&self.state, state | READERS_WAITING, None); | ||
|
||
// Spin again after waking up. | ||
state = self.spin_read(); | ||
} | ||
} | ||
|
||
#[inline] | ||
pub unsafe fn try_write(&self) -> bool { | ||
self.state.fetch_update(Acquire, Relaxed, |s| unlocked(s).then(|| s + WRITE_LOCKED)).is_ok() | ||
} | ||
|
||
#[inline] | ||
pub unsafe fn write(&self) { | ||
if !self.try_write() { | ||
self.write_contended(); | ||
} | ||
} | ||
|
||
#[inline] | ||
pub unsafe fn write_unlock(&self) { | ||
let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED; | ||
|
||
debug_assert!(unlocked(state)); | ||
|
||
if writers_waiting(state) || readers_waiting(state) { | ||
self.wake_writer_or_readers(state); | ||
} | ||
m-ou-se marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
#[cold] | ||
fn write_contended(&self) { | ||
let mut state = self.spin_write(); | ||
|
||
let mut other_writers_waiting = 0; | ||
|
||
loop { | ||
// If it's unlocked, we try to lock it. | ||
if unlocked(state) { | ||
match self.state.compare_exchange( | ||
m-ou-se marked this conversation as resolved.
Show resolved
Hide resolved
|
||
state, | ||
state | WRITE_LOCKED | other_writers_waiting, | ||
Acquire, | ||
Relaxed, | ||
) { | ||
Ok(_) => return, // Locked! | ||
Err(s) => { | ||
state = s; | ||
continue; | ||
} | ||
} | ||
} | ||
|
||
// Set the waiting bit indicating that we're waiting on it. | ||
if !writers_waiting(state) { | ||
if let Err(s) = | ||
self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed) | ||
{ | ||
state = s; | ||
continue; | ||
} | ||
} | ||
|
||
// Other writers might be waiting now too, so we should make sure | ||
// we keep that bit on once we manage lock it. | ||
other_writers_waiting = WRITERS_WAITING; | ||
|
||
// Examine the notification counter before we check if `state` has changed, | ||
// to make sure we don't miss any notifications. | ||
let seq = self.writer_notify.load(Acquire); | ||
|
||
// Don't go to sleep if the lock has become available, | ||
// or if the writers waiting bit is no longer set. | ||
let s = self.state.load(Relaxed); | ||
if unlocked(state) || !writers_waiting(s) { | ||
state = s; | ||
continue; | ||
} | ||
|
||
// Wait for the state to change. | ||
futex_wait(&self.writer_notify, seq, None); | ||
|
||
// Spin again after waking up. | ||
state = self.spin_write(); | ||
} | ||
} | ||
|
||
/// Wake up waiting threads after unlocking. | ||
/// | ||
/// If both are waiting, this will wake up only one writer, but will fall | ||
/// back to waking up readers if there was no writer to wake up. | ||
#[cold] | ||
fn wake_writer_or_readers(&self, mut state: i32) { | ||
assert!(unlocked(state)); | ||
|
||
// The readers waiting bit might be turned on at any point now, | ||
// since readers will block when there's anything waiting. | ||
// Writers will just lock the lock though, regardless of the waiting bits, | ||
// so we don't have to worry about the writer waiting bit. | ||
// | ||
// If the lock gets locked in the meantime, we don't have to do | ||
// anything, because then the thread that locked the lock will take | ||
// care of waking up waiters when it unlocks. | ||
|
||
// If only writers are waiting, wake one of them up. | ||
if state == WRITERS_WAITING { | ||
match self.state.compare_exchange(state, 0, Relaxed, Relaxed) { | ||
m-ou-se marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Ok(_) => { | ||
self.wake_writer(); | ||
return; | ||
} | ||
Err(s) => { | ||
// Maybe some readers are now waiting too. So, continue to the next `if`. | ||
state = s; | ||
} | ||
} | ||
} | ||
|
||
// If both writers and readers are waiting, leave the readers waiting | ||
// and only wake up one writer. | ||
if state == READERS_WAITING + WRITERS_WAITING { | ||
if self.state.compare_exchange(state, READERS_WAITING, Relaxed, Relaxed).is_err() { | ||
// The lock got locked. Not our problem anymore. | ||
return; | ||
} | ||
if self.wake_writer() { | ||
return; | ||
} | ||
// No writers were actually waiting. Continue to wake up readers instead. | ||
m-ou-se marked this conversation as resolved.
Show resolved
Hide resolved
|
||
state = READERS_WAITING; | ||
} | ||
|
||
// If readers are waiting, wake them all up. | ||
if state == READERS_WAITING { | ||
if self.state.compare_exchange(state, 0, Relaxed, Relaxed).is_ok() { | ||
futex_wake_all(&self.state); | ||
} | ||
} | ||
} | ||
|
||
fn wake_writer(&self) -> bool { | ||
m-ou-se marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.writer_notify.fetch_add(1, Release); | ||
futex_wake(&self.writer_notify) | ||
} | ||
|
||
/// Spin for a while, but stop directly at the given condition. | ||
fn spin_until(&self, f: impl Fn(i32) -> bool) -> i32 { | ||
let mut spin = 100; // Chosen by fair dice roll. | ||
loop { | ||
let state = self.state.load(Relaxed); | ||
m-ou-se marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if f(state) || spin == 0 { | ||
return state; | ||
} | ||
crate::hint::spin_loop(); | ||
spin -= 1; | ||
} | ||
} | ||
|
||
fn spin_write(&self) -> i32 { | ||
// Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair. | ||
self.spin_until(|state| unlocked(state) || writers_waiting(state)) | ||
} | ||
|
||
fn spin_read(&self) -> i32 { | ||
// Stop spinning when it's unlocked or read locked, or when there's waiting threads. | ||
self.spin_until(|state| { | ||
!write_locked(state) || readers_waiting(state) || writers_waiting(state) | ||
}) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.