From b45dd4202bb378cf32a24df80c50ffd4c6755191 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 10 Jan 2014 14:20:56 -0800 Subject: [PATCH 1/9] Remove try_send_deferred and can_resched parameter This removes all usage of `try_send_deferred` and all related functionality. --- src/libextra/sync.rs | 6 +++--- src/libgreen/simple.rs | 2 +- src/libgreen/task.rs | 11 +++-------- src/libnative/task.rs | 2 +- src/librustuv/lib.rs | 2 +- src/librustuv/queue.rs | 2 +- src/librustuv/timer.rs | 2 +- src/libstd/comm/mod.rs | 20 +++++++------------- src/libstd/rt/mod.rs | 2 +- src/libstd/rt/task.rs | 4 ++-- 10 files changed, 21 insertions(+), 32 deletions(-) diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index 12566ac85515f..a907c2a5323ca 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -52,7 +52,7 @@ impl WaitQueue { Some(ch) => { // Send a wakeup signal. If the waiter was killed, its port will // have closed. Keep trying until we get a live task. - if ch.try_send_deferred(()) { + if ch.try_send(()) { true } else { self.signal() @@ -68,7 +68,7 @@ impl WaitQueue { match self.head.try_recv() { None => break, Some(ch) => { - if ch.try_send_deferred(()) { + if ch.try_send(()) { count += 1; } } @@ -79,7 +79,7 @@ impl WaitQueue { fn wait_end(&self) -> WaitEnd { let (wait_end, signal_end) = Chan::new(); - assert!(self.tail.try_send_deferred(signal_end)); + assert!(self.tail.try_send(signal_end)); wait_end } } diff --git a/src/libgreen/simple.rs b/src/libgreen/simple.rs index 4a0523fe47a7a..21e7a99d30a43 100644 --- a/src/libgreen/simple.rs +++ b/src/libgreen/simple.rs @@ -54,7 +54,7 @@ impl Runtime for SimpleTask { } Local::put(cur_task); } - fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) { + fn reawaken(mut ~self, mut to_wake: ~Task) { let me = &mut *self as *mut SimpleTask; to_wake.put_runtime(self as ~Runtime); unsafe { diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs index 31752941231cb..07c7da1c99091 100644 --- a/src/libgreen/task.rs +++ b/src/libgreen/task.rs @@ -376,7 +376,7 @@ impl Runtime for GreenTask { } } - fn reawaken(mut ~self, to_wake: ~Task, can_resched: bool) { + fn reawaken(mut ~self, to_wake: ~Task) { self.put_task(to_wake); assert!(self.sched.is_none()); @@ -409,15 +409,10 @@ impl Runtime for GreenTask { match running_task.maybe_take_runtime::() { Some(mut running_green_task) => { running_green_task.put_task(running_task); - let mut sched = running_green_task.sched.take_unwrap(); + let sched = running_green_task.sched.take_unwrap(); if sched.pool_id == self.pool_id { - if can_resched { - sched.run_task(running_green_task, self); - } else { - sched.enqueue_task(self); - running_green_task.put_with_sched(sched); - } + sched.run_task(running_green_task, self); } else { self.reawaken_remotely(); diff --git a/src/libnative/task.rs b/src/libnative/task.rs index e827b495852a4..1ccb828949dc4 100644 --- a/src/libnative/task.rs +++ b/src/libnative/task.rs @@ -230,7 +230,7 @@ impl rt::Runtime for Ops { // See the comments on `deschedule` for why the task is forgotten here, and // why it's valid to do so. - fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) { + fn reawaken(mut ~self, mut to_wake: ~Task) { unsafe { let me = &mut *self as *mut Ops; to_wake.put_runtime(self as ~rt::Runtime); diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index 675e852ebaef0..e366c97e17bb4 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -207,7 +207,7 @@ fn wait_until_woken_after(slot: *mut Option, f: ||) { fn wakeup(slot: &mut Option) { assert!(slot.is_some()); - slot.take_unwrap().wake().map(|t| t.reawaken(true)); + slot.take_unwrap().wake().map(|t| t.reawaken()); } pub struct Request { diff --git a/src/librustuv/queue.rs b/src/librustuv/queue.rs index 32f8d8532a209..4eb198340d8f3 100644 --- a/src/librustuv/queue.rs +++ b/src/librustuv/queue.rs @@ -67,7 +67,7 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { loop { match state.consumer.pop() { mpsc::Data(Task(task)) => { - task.wake().map(|t| t.reawaken(true)); + task.wake().map(|t| t.reawaken()); } mpsc::Data(Increment) => unsafe { if state.refcnt == 0 { diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 4a0ad44d31147..8eda598c0ce2c 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -138,7 +138,7 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) { match timer.action.take_unwrap() { WakeTask(task) => { - task.wake().map(|t| t.reawaken(true)); + task.wake().map(|t| t.reawaken()); } SendOnce(chan) => { chan.try_send(()); } SendMany(chan, id) => { diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index bf37e5fca6a5f..1f045c20268d9 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -420,9 +420,9 @@ impl Packet { // This function must have had at least an acquire fence before it to be // properly called. - fn wakeup(&mut self, can_resched: bool) { + fn wakeup(&mut self) { match self.to_wake.take_unwrap().wake() { - Some(task) => task.reawaken(can_resched), + Some(task) => task.reawaken(), None => {} } self.selecting.store(false, Relaxed); @@ -496,7 +496,7 @@ impl Packet { match self.channels.fetch_sub(1, SeqCst) { 1 => { match self.cnt.swap(DISCONNECTED, SeqCst) { - -1 => { self.wakeup(true); } + -1 => { self.wakeup(); } DISCONNECTED => {} n => { assert!(n >= 0); } } @@ -571,20 +571,14 @@ impl Chan { /// /// Like `send`, this method will never block. If the failure of send cannot /// be tolerated, then this method should be used instead. - pub fn try_send(&self, t: T) -> bool { self.try(t, true) } - - /// This function will not stick around for very long. The purpose of this - /// function is to guarantee that no rescheduling is performed. - pub fn try_send_deferred(&self, t: T) -> bool { self.try(t, false) } - - fn try(&self, t: T, can_resched: bool) -> bool { + pub fn try_send(&self, t: T) -> bool { unsafe { let this = cast::transmute_mut(self); this.queue.push(t); let packet = this.queue.packet(); match (*packet).increment() { // As described above, -1 == wakeup - -1 => { (*packet).wakeup(can_resched); true } + -1 => { (*packet).wakeup(); true } // Also as above, SPSC queues must be >= -2 -2 => true, // We succeeded if we sent data @@ -599,7 +593,7 @@ impl Chan { // the TLS overhead can be a bit much. n => { assert!(n >= 0); - if can_resched && n > 0 && n % RESCHED_FREQ == 0 { + if n > 0 && n % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); task.maybe_yield(); } @@ -675,7 +669,7 @@ impl SharedChan { match (*packet).increment() { DISCONNECTED => {} // oh well, we tried - -1 => { (*packet).wakeup(true); } + -1 => { (*packet).wakeup(); } n => { if n > 0 && n % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index e7adb5ad7ddaf..88939b7f399c1 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -148,7 +148,7 @@ pub trait Runtime { fn maybe_yield(~self, cur_task: ~Task); fn deschedule(~self, times: uint, cur_task: ~Task, f: |BlockedTask| -> Result<(), BlockedTask>); - fn reawaken(~self, to_wake: ~Task, can_resched: bool); + fn reawaken(~self, to_wake: ~Task); // Miscellaneous calls which are very different depending on what context // you're in. diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index b4ead4252ca41..40591785ff45d 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -259,9 +259,9 @@ impl Task { /// Wakes up a previously blocked task, optionally specifiying whether the /// current task can accept a change in scheduling. This function can only /// be called on tasks that were previously blocked in `deschedule`. - pub fn reawaken(mut ~self, can_resched: bool) { + pub fn reawaken(mut ~self) { let ops = self.imp.take_unwrap(); - ops.reawaken(self, can_resched); + ops.reawaken(self); } /// Yields control of this task to another task. This function will From 7753e287619c809376bb98e22c10569459a8b211 Mon Sep 17 00:00:00 2001 From: Bill Myers Date: Wed, 15 Jan 2014 02:09:12 +0100 Subject: [PATCH 2/9] mutex: fix test --- src/libstd/unstable/mutex.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs index 4d12435e01a90..7fd25ba79ef5f 100644 --- a/src/libstd/unstable/mutex.rs +++ b/src/libstd/unstable/mutex.rs @@ -498,7 +498,7 @@ mod test { #[test] fn destroy_immediately() { unsafe { - let mut m = Mutex::empty(); + let mut m = Mutex::new(); m.destroy(); } } From a3f5690c3ea90c3b290169bab03ad5e386b051af Mon Sep 17 00:00:00 2001 From: Bill Myers Date: Wed, 15 Jan 2014 01:59:08 +0100 Subject: [PATCH 3/9] atomics: replace &mut self with &self &mut self implies that there are no aliases, but the whole points of atomics and mutexes is that there are aliases accessing the data concurrently! Note that the signature of all atomic intrinsics should also be fixed. --- src/libstd/sync/atomics.rs | 123 ++++++++++++++++-------------- src/libstd/unstable/intrinsics.rs | 2 + 2 files changed, 66 insertions(+), 59 deletions(-) diff --git a/src/libstd/sync/atomics.rs b/src/libstd/sync/atomics.rs index bc9d99c0f37d7..aa76d50c3539c 100644 --- a/src/libstd/sync/atomics.rs +++ b/src/libstd/sync/atomics.rs @@ -31,6 +31,7 @@ use util::NonCopyable; /** * A simple atomic flag, that can be set and cleared. The most basic atomic type. */ +#[no_freeze] pub struct AtomicFlag { priv v: int, priv nocopy: NonCopyable @@ -39,6 +40,7 @@ pub struct AtomicFlag { /** * An atomic boolean type. */ +#[no_freeze] pub struct AtomicBool { priv v: uint, priv nocopy: NonCopyable @@ -47,6 +49,7 @@ pub struct AtomicBool { /** * A signed atomic integer type, supporting basic atomic arithmetic operations */ +#[no_freeze] pub struct AtomicInt { priv v: int, priv nocopy: NonCopyable @@ -55,6 +58,7 @@ pub struct AtomicInt { /** * An unsigned atomic integer type, supporting basic atomic arithmetic operations */ +#[no_freeze] pub struct AtomicUint { priv v: uint, priv nocopy: NonCopyable @@ -63,6 +67,7 @@ pub struct AtomicUint { /** * An unsafe atomic pointer. Only supports basic atomic operations */ +#[no_freeze] pub struct AtomicPtr { priv p: *mut T, priv nocopy: NonCopyable @@ -99,8 +104,8 @@ impl AtomicFlag { * Clears the atomic flag */ #[inline] - pub fn clear(&mut self, order: Ordering) { - unsafe {atomic_store(&mut self.v, 0, order)} + pub fn clear(&self, order: Ordering) { + unsafe {atomic_store(&self.v, 0, order)} } /** @@ -108,8 +113,8 @@ impl AtomicFlag { * flag. */ #[inline] - pub fn test_and_set(&mut self, order: Ordering) -> bool { - unsafe { atomic_compare_and_swap(&mut self.v, 0, 1, order) > 0 } + pub fn test_and_set(&self, order: Ordering) -> bool { + unsafe { atomic_compare_and_swap(&self.v, 0, 1, order) > 0 } } } @@ -124,57 +129,57 @@ impl AtomicBool { } #[inline] - pub fn store(&mut self, val: bool, order: Ordering) { + pub fn store(&self, val: bool, order: Ordering) { let val = if val { 1 } else { 0 }; - unsafe { atomic_store(&mut self.v, val, order); } + unsafe { atomic_store(&self.v, val, order); } } #[inline] - pub fn swap(&mut self, val: bool, order: Ordering) -> bool { + pub fn swap(&self, val: bool, order: Ordering) -> bool { let val = if val { 1 } else { 0 }; - unsafe { atomic_swap(&mut self.v, val, order) > 0 } + unsafe { atomic_swap(&self.v, val, order) > 0 } } #[inline] - pub fn compare_and_swap(&mut self, old: bool, new: bool, order: Ordering) -> bool { + pub fn compare_and_swap(&self, old: bool, new: bool, order: Ordering) -> bool { let old = if old { 1 } else { 0 }; let new = if new { 1 } else { 0 }; - unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) > 0 } + unsafe { atomic_compare_and_swap(&self.v, old, new, order) > 0 } } /// Returns the old value #[inline] - pub fn fetch_and(&mut self, val: bool, order: Ordering) -> bool { + pub fn fetch_and(&self, val: bool, order: Ordering) -> bool { let val = if val { 1 } else { 0 }; - unsafe { atomic_and(&mut self.v, val, order) > 0 } + unsafe { atomic_and(&self.v, val, order) > 0 } } /// Returns the old value #[inline] - pub fn fetch_nand(&mut self, val: bool, order: Ordering) -> bool { + pub fn fetch_nand(&self, val: bool, order: Ordering) -> bool { let val = if val { 1 } else { 0 }; - unsafe { atomic_nand(&mut self.v, val, order) > 0 } + unsafe { atomic_nand(&self.v, val, order) > 0 } } /// Returns the old value #[inline] - pub fn fetch_or(&mut self, val: bool, order: Ordering) -> bool { + pub fn fetch_or(&self, val: bool, order: Ordering) -> bool { let val = if val { 1 } else { 0 }; - unsafe { atomic_or(&mut self.v, val, order) > 0 } + unsafe { atomic_or(&self.v, val, order) > 0 } } /// Returns the old value #[inline] - pub fn fetch_xor(&mut self, val: bool, order: Ordering) -> bool { + pub fn fetch_xor(&self, val: bool, order: Ordering) -> bool { let val = if val { 1 } else { 0 }; - unsafe { atomic_xor(&mut self.v, val, order) > 0 } + unsafe { atomic_xor(&self.v, val, order) > 0 } } } @@ -189,30 +194,30 @@ impl AtomicInt { } #[inline] - pub fn store(&mut self, val: int, order: Ordering) { - unsafe { atomic_store(&mut self.v, val, order); } + pub fn store(&self, val: int, order: Ordering) { + unsafe { atomic_store(&self.v, val, order); } } #[inline] - pub fn swap(&mut self, val: int, order: Ordering) -> int { - unsafe { atomic_swap(&mut self.v, val, order) } + pub fn swap(&self, val: int, order: Ordering) -> int { + unsafe { atomic_swap(&self.v, val, order) } } #[inline] - pub fn compare_and_swap(&mut self, old: int, new: int, order: Ordering) -> int { - unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) } + pub fn compare_and_swap(&self, old: int, new: int, order: Ordering) -> int { + unsafe { atomic_compare_and_swap(&self.v, old, new, order) } } /// Returns the old value (like __sync_fetch_and_add). #[inline] - pub fn fetch_add(&mut self, val: int, order: Ordering) -> int { - unsafe { atomic_add(&mut self.v, val, order) } + pub fn fetch_add(&self, val: int, order: Ordering) -> int { + unsafe { atomic_add(&self.v, val, order) } } /// Returns the old value (like __sync_fetch_and_sub). #[inline] - pub fn fetch_sub(&mut self, val: int, order: Ordering) -> int { - unsafe { atomic_sub(&mut self.v, val, order) } + pub fn fetch_sub(&self, val: int, order: Ordering) -> int { + unsafe { atomic_sub(&self.v, val, order) } } } @@ -227,30 +232,30 @@ impl AtomicUint { } #[inline] - pub fn store(&mut self, val: uint, order: Ordering) { - unsafe { atomic_store(&mut self.v, val, order); } + pub fn store(&self, val: uint, order: Ordering) { + unsafe { atomic_store(&self.v, val, order); } } #[inline] - pub fn swap(&mut self, val: uint, order: Ordering) -> uint { - unsafe { atomic_swap(&mut self.v, val, order) } + pub fn swap(&self, val: uint, order: Ordering) -> uint { + unsafe { atomic_swap(&self.v, val, order) } } #[inline] - pub fn compare_and_swap(&mut self, old: uint, new: uint, order: Ordering) -> uint { - unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) } + pub fn compare_and_swap(&self, old: uint, new: uint, order: Ordering) -> uint { + unsafe { atomic_compare_and_swap(&self.v, old, new, order) } } /// Returns the old value (like __sync_fetch_and_add). #[inline] - pub fn fetch_add(&mut self, val: uint, order: Ordering) -> uint { - unsafe { atomic_add(&mut self.v, val, order) } + pub fn fetch_add(&self, val: uint, order: Ordering) -> uint { + unsafe { atomic_add(&self.v, val, order) } } /// Returns the old value (like __sync_fetch_and_sub).. #[inline] - pub fn fetch_sub(&mut self, val: uint, order: Ordering) -> uint { - unsafe { atomic_sub(&mut self.v, val, order) } + pub fn fetch_sub(&self, val: uint, order: Ordering) -> uint { + unsafe { atomic_sub(&self.v, val, order) } } } @@ -265,18 +270,18 @@ impl AtomicPtr { } #[inline] - pub fn store(&mut self, ptr: *mut T, order: Ordering) { - unsafe { atomic_store(&mut self.p, ptr, order); } + pub fn store(&self, ptr: *mut T, order: Ordering) { + unsafe { atomic_store(&self.p, ptr, order); } } #[inline] - pub fn swap(&mut self, ptr: *mut T, order: Ordering) -> *mut T { - unsafe { atomic_swap(&mut self.p, ptr, order) } + pub fn swap(&self, ptr: *mut T, order: Ordering) -> *mut T { + unsafe { atomic_swap(&self.p, ptr, order) } } #[inline] - pub fn compare_and_swap(&mut self, old: *mut T, new: *mut T, order: Ordering) -> *mut T { - unsafe { atomic_compare_and_swap(&mut self.p, old, new, order) } + pub fn compare_and_swap(&self, old: *mut T, new: *mut T, order: Ordering) -> *mut T { + unsafe { atomic_compare_and_swap(&self.p, old, new, order) } } } @@ -298,11 +303,11 @@ impl AtomicOption { } #[inline] - pub fn swap(&mut self, val: ~T, order: Ordering) -> Option<~T> { + pub fn swap(&self, val: ~T, order: Ordering) -> Option<~T> { unsafe { let val = cast::transmute(val); - let p = atomic_swap(&mut self.p, val, order); + let p = atomic_swap(&self.p, val, order); let pv : &uint = cast::transmute(&p); if *pv == 0 { @@ -314,7 +319,7 @@ impl AtomicOption { } #[inline] - pub fn take(&mut self, order: Ordering) -> Option<~T> { + pub fn take(&self, order: Ordering) -> Option<~T> { unsafe { self.swap(cast::transmute(0), order) } @@ -324,11 +329,11 @@ impl AtomicOption { /// if so. If the option was already 'Some', returns 'Some' of the rejected /// value. #[inline] - pub fn fill(&mut self, val: ~T, order: Ordering) -> Option<~T> { + pub fn fill(&self, val: ~T, order: Ordering) -> Option<~T> { unsafe { let val = cast::transmute(val); let expected = cast::transmute(0); - let oldval = atomic_compare_and_swap(&mut self.p, expected, val, order); + let oldval = atomic_compare_and_swap(&self.p, expected, val, order); if oldval == expected { None } else { @@ -340,7 +345,7 @@ impl AtomicOption { /// Be careful: The caller must have some external method of ensuring the /// result does not get invalidated by another task after this returns. #[inline] - pub fn is_empty(&mut self, order: Ordering) -> bool { + pub fn is_empty(&self, order: Ordering) -> bool { unsafe { atomic_load(&self.p, order) == cast::transmute(0) } } } @@ -353,7 +358,7 @@ impl Drop for AtomicOption { } #[inline] -pub unsafe fn atomic_store(dst: &mut T, val: T, order:Ordering) { +pub unsafe fn atomic_store(dst: &T, val: T, order:Ordering) { let dst = cast::transmute(dst); let val = cast::transmute(val); @@ -376,7 +381,7 @@ pub unsafe fn atomic_load(dst: &T, order:Ordering) -> T { } #[inline] -pub unsafe fn atomic_swap(dst: &mut T, val: T, order: Ordering) -> T { +pub unsafe fn atomic_swap(dst: &T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); @@ -391,7 +396,7 @@ pub unsafe fn atomic_swap(dst: &mut T, val: T, order: Ordering) -> T { /// Returns the old value (like __sync_fetch_and_add). #[inline] -pub unsafe fn atomic_add(dst: &mut T, val: T, order: Ordering) -> T { +pub unsafe fn atomic_add(dst: &T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); @@ -406,7 +411,7 @@ pub unsafe fn atomic_add(dst: &mut T, val: T, order: Ordering) -> T { /// Returns the old value (like __sync_fetch_and_sub). #[inline] -pub unsafe fn atomic_sub(dst: &mut T, val: T, order: Ordering) -> T { +pub unsafe fn atomic_sub(dst: &T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); @@ -420,7 +425,7 @@ pub unsafe fn atomic_sub(dst: &mut T, val: T, order: Ordering) -> T { } #[inline] -pub unsafe fn atomic_compare_and_swap(dst:&mut T, old:T, new:T, order: Ordering) -> T { +pub unsafe fn atomic_compare_and_swap(dst:&T, old:T, new:T, order: Ordering) -> T { let dst = cast::transmute(dst); let old = cast::transmute(old); let new = cast::transmute(new); @@ -435,7 +440,7 @@ pub unsafe fn atomic_compare_and_swap(dst:&mut T, old:T, new:T, order: Orderi } #[inline] -pub unsafe fn atomic_and(dst: &mut T, val: T, order: Ordering) -> T { +pub unsafe fn atomic_and(dst: &T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); @@ -450,7 +455,7 @@ pub unsafe fn atomic_and(dst: &mut T, val: T, order: Ordering) -> T { #[inline] -pub unsafe fn atomic_nand(dst: &mut T, val: T, order: Ordering) -> T { +pub unsafe fn atomic_nand(dst: &T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); @@ -465,7 +470,7 @@ pub unsafe fn atomic_nand(dst: &mut T, val: T, order: Ordering) -> T { #[inline] -pub unsafe fn atomic_or(dst: &mut T, val: T, order: Ordering) -> T { +pub unsafe fn atomic_or(dst: &T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); @@ -480,7 +485,7 @@ pub unsafe fn atomic_or(dst: &mut T, val: T, order: Ordering) -> T { #[inline] -pub unsafe fn atomic_xor(dst: &mut T, val: T, order: Ordering) -> T { +pub unsafe fn atomic_xor(dst: &T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); diff --git a/src/libstd/unstable/intrinsics.rs b/src/libstd/unstable/intrinsics.rs index d6b33fda7455d..68e5b4e213a37 100644 --- a/src/libstd/unstable/intrinsics.rs +++ b/src/libstd/unstable/intrinsics.rs @@ -209,6 +209,8 @@ extern "rust-intrinsic" { pub fn atomic_load_relaxed(src: &int) -> int; + /// XXX: these are ALL wrong, because they can't take &mut, since that implies no aliasing + /// Atomic store, sequentially consistent. pub fn atomic_store(dst: &mut int, val: int); /// Atomic store, release ordering. From 314c577a916fafa871d004c0bfe5a72f3ca9a14c Mon Sep 17 00:00:00 2001 From: Bill Myers Date: Wed, 15 Jan 2014 02:43:54 +0100 Subject: [PATCH 4/9] mutex: improve native mutex - Refactor imp - Make Windows mutexes statically initializable - Remove rust_builtin.c - Split into Mutex and Cond - Change &mut to & --- src/libgreen/simple.rs | 2 +- src/libnative/bookeeping.rs | 7 +- src/libnative/task.rs | 10 +- src/libstd/unstable/mutex.rs | 443 ++++++++++++++++++++++------------- src/libstd/unstable/sync.rs | 27 ++- src/rt/rust_builtin.c | 20 -- 6 files changed, 300 insertions(+), 209 deletions(-) diff --git a/src/libgreen/simple.rs b/src/libgreen/simple.rs index 21e7a99d30a43..95dff0d9f3418 100644 --- a/src/libgreen/simple.rs +++ b/src/libgreen/simple.rs @@ -39,7 +39,7 @@ impl Runtime for SimpleTask { // See libnative/task.rs for what's going on here with the `awoken` // field and the while loop around wait() unsafe { - let mut guard = (*me).lock.lock(); + let guard = (*me).lock.lock(); (*me).awoken = false; match f(task) { Ok(()) => { diff --git a/src/libnative/bookeeping.rs b/src/libnative/bookeeping.rs index ca40c1a1958c9..dc42d2395e029 100644 --- a/src/libnative/bookeeping.rs +++ b/src/libnative/bookeeping.rs @@ -17,10 +17,11 @@ //! The green counterpart for this is bookeeping on sched pools. use std::sync::atomics; -use std::unstable::mutex::{Mutex, MUTEX_INIT}; +use std::unstable::mutex::{Cond, COND_INIT, Mutex, MUTEX_INIT}; static mut TASK_COUNT: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT; static mut TASK_LOCK: Mutex = MUTEX_INIT; +static mut TASK_COND: Cond = COND_INIT; pub fn increment() { unsafe { TASK_COUNT.fetch_add(1, atomics::SeqCst); } @@ -30,7 +31,7 @@ pub fn decrement() { unsafe { if TASK_COUNT.fetch_sub(1, atomics::SeqCst) == 1 { TASK_LOCK.lock(); - TASK_LOCK.signal(); + TASK_COND.signal(); TASK_LOCK.unlock(); } } @@ -42,7 +43,7 @@ pub fn wait_for_other_tasks() { unsafe { TASK_LOCK.lock(); while TASK_COUNT.load(atomics::SeqCst) > 0 { - TASK_LOCK.wait(); + TASK_COND.wait(&TASK_LOCK); } TASK_LOCK.unlock(); } diff --git a/src/libnative/task.rs b/src/libnative/task.rs index 1ccb828949dc4..d6dc0bc5fcd26 100644 --- a/src/libnative/task.rs +++ b/src/libnative/task.rs @@ -22,7 +22,7 @@ use std::rt::task::{Task, BlockedTask}; use std::rt::thread::Thread; use std::rt; use std::task::TaskOpts; -use std::unstable::mutex::Mutex; +use std::unstable::mutex::{Mutex, Cond}; use std::unstable::stack; use io; @@ -41,6 +41,7 @@ pub fn new(stack_bounds: (uint, uint)) -> ~Task { fn ops() -> ~Ops { ~Ops { lock: unsafe { Mutex::new() }, + cond: unsafe { Cond::new() }, awoken: false, io: io::IoFactory::new(), // these *should* get overwritten @@ -112,6 +113,7 @@ pub fn spawn_opts(opts: TaskOpts, f: proc()) { // structure is allocated once per task. struct Ops { lock: Mutex, // native synchronization + cond: Cond, awoken: bool, // used to prevent spurious wakeups io: io::IoFactory, // local I/O factory @@ -196,7 +198,7 @@ impl rt::Runtime for Ops { match f(task) { Ok(()) => { while !(*me).awoken { - (*me).lock.wait(); + (*me).cond.wait(&(*me).lock); } } Err(task) => { cast::forget(task.wake()); } @@ -216,7 +218,7 @@ impl rt::Runtime for Ops { } }); while success && !(*me).awoken { - (*me).lock.wait(); + (*me).cond.wait(&(*me).lock); } (*me).lock.unlock(); } @@ -237,7 +239,7 @@ impl rt::Runtime for Ops { cast::forget(to_wake); (*me).lock.lock(); (*me).awoken = true; - (*me).lock.signal(); + (*me).cond.signal(); (*me).lock.unlock(); } } diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs index 7fd25ba79ef5f..1b0ecc25eac18 100644 --- a/src/libstd/unstable/mutex.rs +++ b/src/libstd/unstable/mutex.rs @@ -47,215 +47,347 @@ #[allow(non_camel_case_types)]; -use int; -use libc::c_void; -use sync::atomics; - pub struct Mutex { - // pointers for the lock/cond handles, atomically updated - priv lock: atomics::AtomicUint, - priv cond: atomics::AtomicUint, + priv inner: imp::Mutex, } pub static MUTEX_INIT: Mutex = Mutex { - lock: atomics::INIT_ATOMIC_UINT, - cond: atomics::INIT_ATOMIC_UINT, + inner: imp::MUTEX_INIT, }; impl Mutex { - /// Creates a new mutex, with the lock/condition variable pre-initialized + /// Creates a new mutex pub unsafe fn new() -> Mutex { - Mutex { - lock: atomics::AtomicUint::new(imp::init_lock()), - cond: atomics::AtomicUint::new(imp::init_cond()), - } - } - - /// Creates a new mutex, with the lock/condition variable not initialized. - /// This is the same as initializing from the MUTEX_INIT static. - pub unsafe fn empty() -> Mutex { - Mutex { - lock: atomics::AtomicUint::new(0), - cond: atomics::AtomicUint::new(0), - } - } - - /// Creates a new copy of this mutex. This is an unsafe operation because - /// there is no reference counting performed on this type. - /// - /// This function may only be called on mutexes which have had both the - /// internal condition variable and lock initialized. This means that the - /// mutex must have been created via `new`, or usage of it has already - /// initialized the internal handles. - /// - /// This is a dangerous function to call as both this mutex and the returned - /// mutex will share the same handles to the underlying mutex/condition - /// variable. Care must be taken to ensure that deallocation happens - /// accordingly. - pub unsafe fn clone(&self) -> Mutex { - let lock = self.lock.load(atomics::Relaxed); - let cond = self.cond.load(atomics::Relaxed); - assert!(lock != 0); - assert!(cond != 0); - Mutex { - lock: atomics::AtomicUint::new(lock), - cond: atomics::AtomicUint::new(cond), - } + Mutex { inner: imp::Mutex::new() } } /// Acquires this lock. This assumes that the current thread does not /// already hold the lock. - pub unsafe fn lock(&mut self) { imp::lock(self.getlock()) } + pub unsafe fn lock(&self) { self.inner.lock() } /// Attempts to acquire the lock. The value returned is whether the lock was /// acquired or not - pub unsafe fn trylock(&mut self) -> bool { imp::trylock(self.getlock()) } + pub unsafe fn trylock(&self) -> bool { self.inner.trylock() } /// Unlocks the lock. This assumes that the current thread already holds the /// lock. - pub unsafe fn unlock(&mut self) { imp::unlock(self.getlock()) } + pub unsafe fn unlock(&self) { self.inner.unlock() } + + /// This function is especially unsafe because there are no guarantees made + /// that no other thread is currently holding the lock or waiting on the + /// condition variable contained inside. + pub unsafe fn destroy(&self) { self.inner.destroy() } +} + +pub struct Cond { + priv inner: imp::Cond, +} + +pub static COND_INIT: Cond = Cond { + inner: imp::COND_INIT, +}; + +impl Cond { + /// Creates a new condition variable + pub unsafe fn new() -> Cond { + Cond { inner: imp::Cond::new() } + } /// Block on the internal condition variable. /// /// This function assumes that the lock is already held - pub unsafe fn wait(&mut self) { imp::wait(self.getcond(), self.getlock()) } + pub unsafe fn wait(&self, mutex: &Mutex) { self.inner.wait(&mutex.inner) } /// Signals a thread in `wait` to wake up - pub unsafe fn signal(&mut self) { imp::signal(self.getcond()) } + pub unsafe fn signal(&self) { self.inner.signal() } /// This function is especially unsafe because there are no guarantees made /// that no other thread is currently holding the lock or waiting on the /// condition variable contained inside. - pub unsafe fn destroy(&mut self) { - let lock = self.lock.swap(0, atomics::Relaxed); - let cond = self.cond.swap(0, atomics::Relaxed); - if lock != 0 { imp::free_lock(lock) } - if cond != 0 { imp::free_cond(cond) } - } - - unsafe fn getlock(&mut self) -> *c_void { - match self.lock.load(atomics::Relaxed) { - 0 => {} - n => return n as *c_void - } - let lock = imp::init_lock(); - match self.lock.compare_and_swap(0, lock, atomics::SeqCst) { - 0 => return lock as *c_void, - _ => {} - } - imp::free_lock(lock); - return self.lock.load(atomics::Relaxed) as *c_void; - } - - unsafe fn getcond(&mut self) -> *c_void { - match self.cond.load(atomics::Relaxed) { - 0 => {} - n => return n as *c_void - } - let cond = imp::init_cond(); - match self.cond.compare_and_swap(0, cond, atomics::SeqCst) { - 0 => return cond as *c_void, - _ => {} - } - imp::free_cond(cond); - return self.cond.load(atomics::Relaxed) as *c_void; - } + pub unsafe fn destroy(&self) { self.inner.destroy() } } #[cfg(unix)] mod imp { - use libc::c_void; use libc; - use ptr; - use ptr::RawPtr; + use self::os::{PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, + pthread_mutex_t, pthread_cond_t}; + use unstable::intrinsics; - type pthread_mutex_t = libc::c_void; type pthread_mutexattr_t = libc::c_void; - type pthread_cond_t = libc::c_void; type pthread_condattr_t = libc::c_void; - pub unsafe fn init_lock() -> uint { - let block = libc::malloc(rust_pthread_mutex_t_size() as libc::size_t); - assert!(!block.is_null()); - let n = pthread_mutex_init(block, ptr::null()); - assert_eq!(n, 0); - return block as uint; - } + #[cfg(target_os = "freebsd")] + mod os { + use libc; + + pub type pthread_mutex_t = *libc::c_void; + pub type pthread_cond_t = *libc::c_void; + + pub static PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = + 0 as pthread_mutex_t; + pub static PTHREAD_COND_INITIALIZER: pthread_cond_t = + 0 as pthread_cond_t; + } + + #[cfg(target_os = "macos")] + mod os { + use libc; + + #[cfg(target_arch = "x86_64")] + static __PTHREAD_MUTEX_SIZE__: uint = 56; + #[cfg(target_arch = "x86_64")] + static __PTHREAD_COND_SIZE__: uint = 40; + #[cfg(target_arch = "x86")] + static __PTHREAD_MUTEX_SIZE__: uint = 40; + #[cfg(target_arch = "x86")] + static __PTHREAD_COND_SIZE__: uint = 24; + static _PTHREAD_MUTEX_SIG_init: libc::c_long = 0x32AAABA7; + static _PTHREAD_COND_SIG_init: libc::c_long = 0x3CB0B1BB; + + pub struct pthread_mutex_t { + __sig: libc::c_long, + __opaque: [u8, ..__PTHREAD_MUTEX_SIZE__], + } + pub struct pthread_cond_t { + __sig: libc::c_long, + __opaque: [u8, ..__PTHREAD_COND_SIZE__], + } - pub unsafe fn init_cond() -> uint { - let block = libc::malloc(rust_pthread_cond_t_size() as libc::size_t); - assert!(!block.is_null()); - let n = pthread_cond_init(block, ptr::null()); - assert_eq!(n, 0); - return block as uint; - } + pub static PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = pthread_mutex_t { + __sig: _PTHREAD_MUTEX_SIG_init, + __opaque: [0, ..__PTHREAD_MUTEX_SIZE__], + }; + pub static PTHREAD_COND_INITIALIZER: pthread_cond_t = pthread_cond_t { + __sig: _PTHREAD_COND_SIG_init, + __opaque: [0, ..__PTHREAD_COND_SIZE__], + }; + } + + #[cfg(target_os = "linux")] + #[cfg(target_os = "android")] + mod os { + use libc; + + // minus 8 because we have an 'align' field + #[cfg(target_arch = "x86_64")] + static __SIZEOF_PTHREAD_MUTEX_T: uint = 40 - 8; + #[cfg(target_arch = "x86")] + static __SIZEOF_PTHREAD_MUTEX_T: uint = 24 - 8; + #[cfg(target_arch = "x86_64")] + static __SIZEOF_PTHREAD_COND_T: uint = 48 - 8; + #[cfg(target_arch = "x86")] + static __SIZEOF_PTHREAD_COND_T: uint = 48 - 8; + + pub struct pthread_mutex_t { + __align: libc::c_long, + size: [u8, ..__SIZEOF_PTHREAD_MUTEX_T], + } + pub struct pthread_cond_t { + __align: libc::c_longlong, + size: [u8, ..__SIZEOF_PTHREAD_COND_T], + } - pub unsafe fn free_lock(h: uint) { - let block = h as *c_void; - assert_eq!(pthread_mutex_destroy(block), 0); - libc::free(block); + pub static PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = pthread_mutex_t { + __align: 0, + size: [0, ..__SIZEOF_PTHREAD_MUTEX_T], + }; + pub static PTHREAD_COND_INITIALIZER: pthread_cond_t = pthread_cond_t { + __align: 0, + size: [0, ..__SIZEOF_PTHREAD_COND_T], + }; } - pub unsafe fn free_cond(h: uint) { - let block = h as *c_void; - assert_eq!(pthread_cond_destroy(block), 0); - libc::free(block); + #[no_freeze] + pub struct Mutex { + priv lock: pthread_mutex_t, } - pub unsafe fn lock(l: *pthread_mutex_t) { - assert_eq!(pthread_mutex_lock(l), 0); - } + pub static MUTEX_INIT: Mutex = Mutex { + lock: PTHREAD_MUTEX_INITIALIZER, + }; - pub unsafe fn trylock(l: *c_void) -> bool { - pthread_mutex_trylock(l) == 0 - } + impl Mutex { + pub unsafe fn new() -> Mutex { + let m = Mutex { + lock: intrinsics::init(), + }; - pub unsafe fn unlock(l: *pthread_mutex_t) { - assert_eq!(pthread_mutex_unlock(l), 0); - } + pthread_mutex_init(&m.lock, 0 as *libc::c_void); + + return m; + } - pub unsafe fn wait(cond: *pthread_cond_t, m: *pthread_mutex_t) { - assert_eq!(pthread_cond_wait(cond, m), 0); + pub unsafe fn lock(&self) { pthread_mutex_lock(&self.lock); } + pub unsafe fn unlock(&self) { pthread_mutex_unlock(&self.lock); } + pub unsafe fn trylock(&self) -> bool { + pthread_mutex_trylock(&self.lock) == 0 + } + pub unsafe fn destroy(&self) { + pthread_mutex_destroy(&self.lock); + } } - pub unsafe fn signal(cond: *pthread_cond_t) { - assert_eq!(pthread_cond_signal(cond), 0); + pub struct Cond { + priv cond: pthread_cond_t, } - extern { - fn rust_pthread_mutex_t_size() -> libc::c_int; - fn rust_pthread_cond_t_size() -> libc::c_int; + pub static COND_INIT: Cond = Cond { + cond: PTHREAD_COND_INITIALIZER, + }; + + impl Cond { + pub unsafe fn new() -> Cond { + let c = Cond { + cond: intrinsics::init(), + }; + + pthread_cond_init(&c.cond, 0 as *libc::c_void); + + return c; + } + + pub unsafe fn signal(&self) { pthread_cond_signal(&self.cond); } + + pub unsafe fn wait(&self, mutex: &Mutex) { + pthread_cond_wait(&self.cond, &mutex.lock); + } + + pub unsafe fn destroy(&self) { + pthread_cond_destroy(&self.cond); + } } extern { fn pthread_mutex_init(lock: *pthread_mutex_t, - attr: *pthread_mutexattr_t) -> libc::c_int; - fn pthread_mutex_destroy(lock: *pthread_mutex_t) -> libc::c_int; - fn pthread_cond_init(cond: *pthread_cond_t, - attr: *pthread_condattr_t) -> libc::c_int; - fn pthread_cond_destroy(cond: *pthread_cond_t) -> libc::c_int; + attr: *libc::c_void) -> libc::c_int; fn pthread_mutex_lock(lock: *pthread_mutex_t) -> libc::c_int; fn pthread_mutex_trylock(lock: *pthread_mutex_t) -> libc::c_int; fn pthread_mutex_unlock(lock: *pthread_mutex_t) -> libc::c_int; + fn pthread_cond_init(cond: *pthread_cond_t, + attr: *libc::c_void) -> libc::c_int; fn pthread_cond_wait(cond: *pthread_cond_t, lock: *pthread_mutex_t) -> libc::c_int; fn pthread_cond_signal(cond: *pthread_cond_t) -> libc::c_int; + fn pthread_mutex_destroy(lock: *pthread_mutex_t) -> libc::c_int; + fn pthread_cond_destroy(lock: *pthread_cond_t) -> libc::c_int; } } #[cfg(windows)] mod imp { - use libc; use libc::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES, c_void, DWORD, LPCSTR}; - use ptr; + use libc; use ptr::RawPtr; + use ptr; + use sync::atomics; type LPCRITICAL_SECTION = *c_void; static SPIN_COUNT: DWORD = 4000; + #[cfg(target_arch = "x86")] + static CRIT_SECTION_SIZE: uint = 24; + + #[no_freeze] + pub struct Mutex { + // pointers for the lock/cond handles, atomically updated + priv lock: atomics::AtomicUint, + } + + pub static MUTEX_INIT: Mutex = Mutex { + lock: atomics::INIT_ATOMIC_UINT, + }; + + impl Mutex { + pub unsafe fn new() -> Mutex { + Mutex { + cond: atomics::AtomicUint::new(init_cond()), + } + } + + pub unsafe fn lock(&self) { + EnterCriticalSection(self.getlock() as LPCRITICAL_SECTION) + } + pub unsafe fn trylock(&self) -> bool { + TryEnterCriticalSection(self.getlock() as LPCRITICAL_SECTION) != 0 + } + pub unsafe fn unlock(&self) { + LeaveCriticalSection(self.getlock() as LPCRITICAL_SECTION) + } + + /// This function is especially unsafe because there are no guarantees made + /// that no other thread is currently holding the lock or waiting on the + /// condition variable contained inside. + pub unsafe fn destroy(&self) { + let lock = self.lock.swap(0, atomics::Relaxed); + if lock != 0 { free_lock(lock) } + } + + unsafe fn getlock(&self) -> *c_void { + match self.lock.load(atomics::Relaxed) { + 0 => {} + n => return n as *c_void + } + let lock = init_lock(); + match self.lock.compare_and_swap(0, lock, atomics::SeqCst) { + 0 => return lock as *c_void, + _ => {} + } + free_lock(lock); + return self.lock.load(atomics::Relaxed) as *c_void; + } + } + + pub struct Cond { + priv cond: atomics::AtomicUint, + } + + pub static COND_INIT: Cond = Cond { + cond: atomics::INIT_ATOMIC_UINT, + }; + + impl Cond { + pub unsafe fn new() -> Cond { + Cond { + cond: atomics::AtomicUint::new(init_cond()), + } + } + + pub unsafe fn wait(&self, mutex: &Mutex) { + mutex.unlock(); + WaitForSingleObject(self.getcond() as HANDLE, libc::INFINITE); + mutex.lock(); + } + + pub unsafe fn signal(&self) { + assert!(SetEvent(self.getcond() as HANDLE) != 0); + } + + /// This function is especially unsafe because there are no guarantees made + /// that no other thread is currently holding the lock or waiting on the + /// condition variable contained inside. + pub unsafe fn destroy(&self) { + let cond = self.cond.swap(0, atomics::Relaxed); + if cond != 0 { free_cond(cond) } + } + + unsafe fn getcond(&self) -> *c_void { + match self.cond.load(atomics::Relaxed) { + 0 => {} + n => return n as *c_void + } + let cond = init_cond(); + match self.cond.compare_and_swap(0, cond, atomics::SeqCst) { + 0 => return cond as *c_void, + _ => {} + } + free_cond(cond); + return self.cond.load(atomics::Relaxed) as *c_void; + } + } pub unsafe fn init_lock() -> uint { - let block = libc::malloc(rust_crit_section_size() as libc::size_t); + let block = libc::malloc(CRIT_SECTION_SIZE as libc::size_t); assert!(!block.is_null()); InitializeCriticalSectionAndSpinCount(block, SPIN_COUNT); return block as uint; @@ -276,32 +408,6 @@ mod imp { libc::CloseHandle(block); } - pub unsafe fn lock(l: *c_void) { - EnterCriticalSection(l as LPCRITICAL_SECTION) - } - - pub unsafe fn trylock(l: *c_void) -> bool { - TryEnterCriticalSection(l as LPCRITICAL_SECTION) != 0 - } - - pub unsafe fn unlock(l: *c_void) { - LeaveCriticalSection(l as LPCRITICAL_SECTION) - } - - pub unsafe fn wait(cond: *c_void, m: *c_void) { - unlock(m); - WaitForSingleObject(cond as HANDLE, libc::INFINITE); - lock(m); - } - - pub unsafe fn signal(cond: *c_void) { - assert!(SetEvent(cond as HANDLE) != 0); - } - - extern { - fn rust_crit_section_size() -> libc::c_int; - } - extern "system" { fn CreateEventA(lpSecurityAttributes: LPSECURITY_ATTRIBUTES, bManualReset: BOOL, @@ -423,7 +529,7 @@ mod test { use prelude::*; use rt::thread::Thread; - use super::{ONCE_INIT, Once, Mutex, MUTEX_INIT}; + use super::{ONCE_INIT, Once, Mutex, MUTEX_INIT, Cond, COND_INIT}; use task; #[test] @@ -472,7 +578,7 @@ mod test { #[test] fn somke_lock() { - static mut lock: Mutex = MUTEX_INIT; + static lock: Mutex = MUTEX_INIT; unsafe { lock.lock(); lock.unlock(); @@ -481,15 +587,16 @@ mod test { #[test] fn somke_cond() { - static mut lock: Mutex = MUTEX_INIT; + static lock: Mutex = MUTEX_INIT; + static cond: Cond = COND_INIT; unsafe { lock.lock(); let t = do Thread::start { lock.lock(); - lock.signal(); + cond.signal(); lock.unlock(); }; - lock.wait(); + cond.wait(&lock); lock.unlock(); t.join(); } @@ -498,7 +605,7 @@ mod test { #[test] fn destroy_immediately() { unsafe { - let mut m = Mutex::new(); + let m = Mutex::new(); m.destroy(); } } diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs index 687efea939b52..498002bb4c89e 100644 --- a/src/libstd/unstable/sync.rs +++ b/src/libstd/unstable/sync.rs @@ -13,14 +13,15 @@ use kinds::Send; use ops::Drop; use option::{Option,Some,None}; use sync::arc::UnsafeArc; -use unstable::mutex::Mutex; +use unstable::mutex::{Mutex, Cond}; pub struct LittleLock { priv l: Mutex, + priv c: Cond, } pub struct LittleGuard<'a> { - priv l: &'a mut Mutex, + priv l: &'a LittleLock, } impl Drop for LittleLock { @@ -32,36 +33,36 @@ impl Drop for LittleLock { #[unsafe_destructor] impl<'a> Drop for LittleGuard<'a> { fn drop(&mut self) { - unsafe { self.l.unlock(); } + unsafe { self.l.l.unlock(); } } } impl LittleLock { pub fn new() -> LittleLock { - unsafe { LittleLock { l: Mutex::new() } } + unsafe { LittleLock { l: Mutex::new(), c: Cond::new() } } } - pub unsafe fn lock<'a>(&'a mut self) -> LittleGuard<'a> { + pub unsafe fn lock<'a>(&'a self) -> LittleGuard<'a> { self.l.lock(); - LittleGuard { l: &mut self.l } + LittleGuard { l: self } } - pub unsafe fn try_lock<'a>(&'a mut self) -> Option> { + pub unsafe fn try_lock<'a>(&'a self) -> Option> { if self.l.trylock() { - Some(LittleGuard { l: &mut self.l }) + Some(LittleGuard { l: self }) } else { None } } - pub unsafe fn signal(&mut self) { - self.l.signal(); + pub unsafe fn signal(&self) { + self.c.signal(); } } impl<'a> LittleGuard<'a> { - pub unsafe fn wait(&mut self) { - self.l.wait(); + pub unsafe fn wait(&self) { + self.l.c.wait(&self.l.l); } } @@ -144,7 +145,7 @@ impl Exclusive { #[inline] pub unsafe fn hold_and_wait(&self, f: |x: &T| -> bool) { let rec = self.x.get(); - let mut l = (*rec).lock.lock(); + let l = (*rec).lock.lock(); if (*rec).failed { fail!("Poisoned Exclusive::new - another task failed inside!"); } diff --git a/src/rt/rust_builtin.c b/src/rt/rust_builtin.c index 6de5f80829003..81eba2984dad0 100644 --- a/src/rt/rust_builtin.c +++ b/src/rt/rust_builtin.c @@ -437,26 +437,6 @@ rust_win32_rand_release() { #endif -#if defined(__WIN32__) - -int -rust_crit_section_size() { return sizeof(CRITICAL_SECTION); } -int -rust_pthread_mutex_t_size() { return 0; } -int -rust_pthread_cond_t_size() { return 0; } - -#else - -int -rust_crit_section_size() { return 0; } -int -rust_pthread_mutex_t_size() { return sizeof(pthread_mutex_t); } -int -rust_pthread_cond_t_size() { return sizeof(pthread_cond_t); } - -#endif - // // Local Variables: // mode: C++ From d41dc7a14cd96eed6773cc75913854742c0612db Mon Sep 17 00:00:00 2001 From: Bill Myers Date: Wed, 15 Jan 2014 02:46:39 +0100 Subject: [PATCH 5/9] rt: introduce task.can_block() --- src/libgreen/simple.rs | 1 + src/libgreen/task.rs | 2 ++ src/libnative/task.rs | 2 ++ src/libstd/rt/mod.rs | 1 + src/libstd/rt/task.rs | 6 ++++++ 5 files changed, 12 insertions(+) diff --git a/src/libgreen/simple.rs b/src/libgreen/simple.rs index 95dff0d9f3418..81a5528ef12a9 100644 --- a/src/libgreen/simple.rs +++ b/src/libgreen/simple.rs @@ -76,6 +76,7 @@ impl Runtime for SimpleTask { } fn local_io<'a>(&'a mut self) -> Option> { None } fn stack_bounds(&self) -> (uint, uint) { fail!() } + fn can_block(&self) -> bool { true } fn wrap(~self) -> ~Any { fail!() } } diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs index 07c7da1c99091..1c451435844e6 100644 --- a/src/libgreen/task.rs +++ b/src/libgreen/task.rs @@ -457,6 +457,8 @@ impl Runtime for GreenTask { c.current_stack_segment.end() as uint) } + fn can_block(&self) -> bool { false } + fn wrap(~self) -> ~Any { self as ~Any } } diff --git a/src/libnative/task.rs b/src/libnative/task.rs index d6dc0bc5fcd26..d0d7f8ddc0c59 100644 --- a/src/libnative/task.rs +++ b/src/libnative/task.rs @@ -144,6 +144,8 @@ impl rt::Runtime for Ops { fn stack_bounds(&self) -> (uint, uint) { self.stack_bounds } + fn can_block(&self) -> bool { true } + // This function gets a little interesting. There are a few safety and // ownership violations going on here, but this is all done in the name of // shared state. Additionally, all of the violations are protected with a diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 88939b7f399c1..69d3ff39d4696 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -156,6 +156,7 @@ pub trait Runtime { fn local_io<'a>(&'a mut self) -> Option>; /// The (low, high) edges of the current stack. fn stack_bounds(&self) -> (uint, uint); // (lo, hi) + fn can_block(&self) -> bool; // XXX: This is a serious code smell and this should not exist at all. fn wrap(~self) -> ~Any; diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 40591785ff45d..af06541b43945 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -292,6 +292,12 @@ impl Task { pub fn stack_bounds(&self) -> (uint, uint) { self.imp.get_ref().stack_bounds() } + + /// Returns whether it is legal for this task to block the OS thread that it + /// is running on. + pub fn can_block(&self) -> bool { + self.imp.get_ref().can_block() + } } impl Drop for Task { From 45148ac77524ccfc7ff296ffdfd9542f6a594872 Mon Sep 17 00:00:00 2001 From: Bill Myers Date: Wed, 15 Jan 2014 03:40:33 +0100 Subject: [PATCH 6/9] std::rt: limit tasks to 2^16 on 32-bit and 2^32 on 64-bit --- src/libstd/rt/task.rs | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index af06541b43945..b2c5a2cbdf829 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -35,7 +35,8 @@ use rt::rtio::LocalIo; use rt::unwind::Unwinder; use send_str::SendStr; use sync::arc::UnsafeArc; -use sync::atomics::{AtomicUint, SeqCst}; +use sync::atomics; +use sync::atomics::{AtomicUint, SeqCst, Relaxed}; use task::{TaskResult, TaskOpts}; use unstable::finally::Finally; @@ -84,8 +85,31 @@ pub struct BlockedTaskIterator { priv inner: UnsafeArc, } + +static mut TASK_COUNT: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT; + +// this limit is due to the Mutex implementation +// it cannot be triggered unless the user specifies a non-default stack size < 64KB +// (the program will run out of virtual address space before it hits the task limit) +// +// it can be lifted if desired by changing Mutex to use double-uint atomics +// +// currently the limit is due to the fact that we need to store two integers counting +// tasks in a single AtomicUint +// +// we subtract 2 and not 1, because in addition to tasks having a Task structure, there +// might be up to one C thread without a task structure holding the mutex, because we +// don't require a Task structure to call try_lock +#[cfg(target_word_size = "32")] static TASK_LIMIT: uint = (1 << 16) - 2; +#[cfg(target_word_size = "64")] static TASK_LIMIT: uint = (1 << 32) - 2; + impl Task { pub fn new() -> Task { + if (unsafe { TASK_COUNT.fetch_add(1, atomics::Relaxed) } >= TASK_LIMIT { + unsafe { TASK_COUNT.fetch_sub(1, atomics::Relaxed) }; + fail!("tried to create more than {} tasks, which is the task limit", TASK_LIMIT) + } + Task { heap: LocalHeap::new(), gc: GarbageCollector, @@ -302,6 +326,8 @@ impl Task { impl Drop for Task { fn drop(&mut self) { + unsafe { TASK_COUNT.fetch_sub(1, atomics::Relaxed) }; + rtdebug!("called drop for a task: {}", borrow::to_uint(self)); rtassert!(self.destroyed); } From 15c0c93178f1206e074022441e1fb85c16c97dbc Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 9 Jan 2014 11:57:05 -0800 Subject: [PATCH 7/9] std: Introduce a standard Mutex type Much of the justification can be found in the large implementation comment found in the module itself. --- src/etc/licenseck.py | 3 +- src/libstd/sync/mod.rs | 5 + src/libstd/sync/mpsc_intrusive.rs | 98 ++++++ src/libstd/sync/mutex.rs | 567 ++++++++++++++++++++++++++++++ 4 files changed, 672 insertions(+), 1 deletion(-) create mode 100644 src/libstd/sync/mpsc_intrusive.rs create mode 100644 src/libstd/sync/mutex.rs diff --git a/src/etc/licenseck.py b/src/etc/licenseck.py index b5a721c03ff09..a14c5e8adf38e 100644 --- a/src/etc/licenseck.py +++ b/src/etc/licenseck.py @@ -41,6 +41,7 @@ "libstd/sync/mpsc_queue.rs", # BSD "libstd/sync/spsc_queue.rs", # BSD "libstd/sync/mpmc_bounded_queue.rs", # BSD + "libstd/sync/mpsc_intrusive.rs", # BSD ] def check_license(name, contents): @@ -59,4 +60,4 @@ def check_license(name, contents): if (boilerplate.find(license1) == -1 or boilerplate.find(license2) == -1) and \ (boilerplate.find(license3) == -1 or boilerplate.find(license4) == -1): return False - return True \ No newline at end of file + return True diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs index 3213c538152c6..1dc350dd7cac4 100644 --- a/src/libstd/sync/mod.rs +++ b/src/libstd/sync/mod.rs @@ -15,9 +15,14 @@ //! and/or blocking at all, but rather provide the necessary tools to build //! other types of concurrent primitives. +pub use self::mutex::{Mutex, StaticMutex, Guard, MUTEX_INIT}; + pub mod arc; pub mod atomics; pub mod deque; pub mod mpmc_bounded_queue; pub mod mpsc_queue; pub mod spsc_queue; + +mod mpsc_intrusive; +mod mutex; diff --git a/src/libstd/sync/mpsc_intrusive.rs b/src/libstd/sync/mpsc_intrusive.rs new file mode 100644 index 0000000000000..a42c7082b587a --- /dev/null +++ b/src/libstd/sync/mpsc_intrusive.rs @@ -0,0 +1,98 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A lock-free multi-producer, single consumer queue. +//! +//! This module implements an intrusive MPSC queue. This queue is incredibly +//! unsafe (due to use of unsafe pointers for nodes), and hence is not public. + +use cast; +use kinds::Send; +use option::{Option, Some, None}; +use sync::atomics; + +// NB: all links are done as AtomicUint instead of AtomicPtr to allow for static +// initialization. + +pub struct Node { + next: *mut Node, + data: T, +} + +pub struct Queue { + producer: atomics::AtomicUint, + consumer: *mut Node, +} + +impl Queue { + pub fn new() -> Queue { + Queue { + producer: atomics::AtomicUint::new(0), + consumer: 0 as *mut Node, + } + } + + pub unsafe fn push(&self, node: *mut Node) { + // prepend the node to the producer queue + let mut a = 0; + loop { + (*node).next = cast::transmute(a); + let v = self.producer.compare_and_swap(a, node as uint, atomics::Release); + if a == v { + return; + } + a = v; + } + } + + /// This has worst case O(n) because it needs to reverse the queue + /// However it is of course amortized O(1) + pub unsafe fn pop(&self) -> Option<*mut Node> { + // self.consumer is only used by the single consumer, so let's get an &mut to it + let Queue {producer: ref ref_producer, consumer: ref ref_consumer} = *self; + let mut_consumer: &mut *mut Node = cast::transmute(ref_consumer); + + let node = *mut_consumer; + if node != 0 as *mut Node { + // pop from the consumer queue if non-empty + *mut_consumer = (*node).next; + Some(node) + } else { + // otherwise steal the producer queue, reverse it, take the last element + // and store the rest as the consumer queue + let mut node: *mut Node = cast::transmute(ref_producer.swap(0, atomics::Acquire)); + if node != 0 as *mut Node { + let mut prev = 0 as *mut Node; + + loop { + let next = (*node).next; + if next == 0 as *mut Node {break}; + (*node).next = prev; + prev = node; + node = next; + } + *mut_consumer = prev; + Some(node) + } else { + None + } + } + } +} + +impl Node { + pub fn new(t: T) -> Node { + Node { + data: t, + next: 0 as *mut Node, + } + } +} + diff --git a/src/libstd/sync/mutex.rs b/src/libstd/sync/mutex.rs new file mode 100644 index 0000000000000..3757061fbdc2d --- /dev/null +++ b/src/libstd/sync/mutex.rs @@ -0,0 +1,567 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A proper mutex implementation regardless of the "flavor of task" which is +//! acquiring the lock. + +// # The implementation of Rust's mutexes +// +// As hinted in the doc-comment above, the fundamental problem of implementing a +// mutex for rust is that you can't "just use pthreads". Green tasks are not +// allowed to block on a pthread mutex, because this can very easily lead to +// deadlock. Otherwise, there are other properties that we would want out of an +// "official mutex": +// +// * Any flavor of task can acquire the mutex, green or native +// * Any mixing of flavors of tasks can acquire the mutex. It should be possible +// for green and native threads to contend over acquiring the mutex +// * This mutex should be "just as fast" as pthreads +// * Mutexes should be statically initializeable +// * Mutexes should really not need to have destructors (see static +// initialization) +// +// Some properties which have been deemed not critical +// +// * Enforcing bounded waiting among all tasks acquiring the mutex. Mixing +// green/native tasks is predicted to be a fairly rare case. +// +// ## Mutexes, take 1 +// +// Within these constraints, the primitives we have available to us for blocking +// a task are the `deschedule` and `reawaken` methods on the `rt::Runtime` +// trait. These are the obvious choices to use first because they're "what we +// havel already" and should certainly be efficient. +// +// The sketch behind this mutex would be to use an intrusive (to avoid +// allocations) MPSC queue (the consumer is the lock holder) with some +// sprinkling of atomics to wake threads up. Each `BlockedTask` would be stored +// in the nodes of the queue. +// +// This implementation is all fine and dandy for green threads (user space +// context switching is fast), but when implemented, it was found that this +// implementation was about 50x slower than pthreads for native threads. +// +// Upon profiling, nearly all time was spent in cvar signal/wait (that's how +// native threads implement deschedule/reawaken). The problem was never tracked +// down with 100% certainty, but it was able discovered that this huge slowdown +// was only on a multicore system, not a single core system. With this knowledge +// in hand, plus some idea of how pthread mutexes are implemented, it was +// deduced that the kernel essentially knows what's going on when everyone's +// contended on the same mutex (as in the pthreads case). The kernel can +// cleverly schedule threads to *not* wake up on remote cores because all the +// work needs to happen on the same core (that's the whole point of a mutex). +// The deschedule/reawaken methods put threads to sleep on localized cvars, so +// the kernel had no idea that all our threads were contending *on the same +// mutex*. +// +// With this information in mind, it was concluded that it's impossible to +// create a pthreads-competitive mutex with the deschedule/reawaken primitives. +// We simply have no way of instructing the kernel that all native threads are +// contended on one object and should therefore *not* be spread out on many +// cores. +// +// ## Mutexes, take 2 +// +// Back do the drawing board, the key idea was to actually have this mutex be a +// wrapper around a pthreads mutex. This would clearly solve the native threads +// problem (we'd be "just as fast" as pthreads), but the green problem comes +// back into play (you can't just grab the lock). +// +// The solution found (and the current implementation) ended up having a hybrid +// solution of queues/mutexes. The key idea is that green threads only ever +// *trylock* and use an internal queue to keep track of who's waiting, and +// native threads will simply just call *lock*. +// +// With this scheme, we get all the benefits of both worlds: +// +// * Any flavor of task (even mixed) can grab a mutex, pthreads arbitrates among +// all native and the first green tasks, and then green tasks use atomics to +// arbitrate among themselves. +// * We're just as fast as pthreads (within a small percentage of course) +// * Native mutexes are statically initializeable, and some clever usage of +// atomics can make the green halves of the mutex also statically +// initializeable. +// * No destructors are necessary (there is no memory allocation). The caveat +// here is that windows doesn't have statically initialized mutexes, but it is +// predicted that statically initialized mutexes won't be *too* common. Plus, +// the "free" happens at program end when cleaning up doesn't matter *that* +// much. +// +// ## Mutexes, take 3 +// +// Take 3 uses a more sophisticated atomic state, allowing it to not use yield loops: +// we use an atomic integer containing a (queue_size, lockers) tuple, where queue_size +// is the size of the queue of queued up tasks, and lockers is the number of tasks who +// have or are about to take the OS mutex using a blocking lock call. +// +// It is now as fair as the OS mutex allows, even when mixing green and native tasks, +// since native threads will queue like green tasks, if any green task is queued. +// +// This is the high-level implementation of the mutexes, but the nitty gritty +// details can be found in the code below. + +use ops::Drop; +use q = sync::mpsc_intrusive; +use option::{Option, Some, None}; +use result::{Ok, Err}; +use rt::local::Local; +use rt::task::{BlockedTask, Task}; +use sync::atomics; +use unstable::mutex; + +/// A mutual exclusion primitive useful for protecting shared data +/// +/// This mutex is an implementation of a lock for all flavors of tasks which may +/// be grabbing. A common problem with green threads is that they cannot grab +/// locks (if they reschedule during the lock a contender could deadlock the +/// system), but this mutex does *not* suffer this problem. +/// +/// This mutex will properly block tasks waiting for the lock to become +/// available. The mutex can also be statically initialized or created via a +/// `new` constructor. +/// +/// # Example +/// +/// ```rust +/// use std::sync::Mutex; +/// +/// let mut m = Mutex::new(); +/// let guard = m.lock(); +/// // do some work +/// drop(guard); // unlock the lock +/// +/// { +/// let _g = m.lock(); +/// // do some work in a scope +/// } +/// +/// // now the mutex is unlocked +/// ``` +pub struct Mutex { + priv lock: StaticMutex, +} + +/// The static mutex type is provided to allow for static allocation of mutexes. +/// +/// Note that this is a separate type because using a Mutex correctly means that +/// it needs to have a destructor run. In Rust, statics are not allowed to have +/// destructors. As a result, a `StaticMutex` has one extra method when compared +/// to a `Mutex`, a `destroy` method. This method is unsafe to call, and +/// documentation can be found directly on the method. +/// +/// # Example +/// +/// ```rust +/// use std::sync::{StaticMutex, MUTEX_INIT}; +/// +/// static mut LOCK: StaticMutex = MUTEX_INIT; +/// +/// unsafe { +/// let _g = LOCK.lock(); +/// // do some productive work +/// } +/// // lock is unlocked here. +/// ``` +pub struct StaticMutex { + /// The OS mutex (pthreads/windows equivalent) that we're wrapping. + priv lock: mutex::Mutex, + /// Internal mutex state + priv state: MutexState, + /// Internal queue that all green threads will be blocked on. + priv q: q::Queue, +} + +/// An RAII implementation of a "scoped lock" of a mutex. When this structure is +/// dropped (falls out of scope), the lock will be unlocked. +pub struct Guard<'a> { + priv lock: &'a StaticMutex, +} + +/// Static initialization of a mutex. This constant can be used to initialize +/// other mutex constants. +pub static MUTEX_INIT: StaticMutex = StaticMutex { + lock: mutex::MUTEX_INIT, + state: INIT_MUTEX_STATE, + q: q::Queue { + producer: atomics::INIT_ATOMIC_UINT, + consumer: 0 as *mut q::Node, + } +}; + +/// this is logically an atomic tuple of (lockers, queue_size) +/// lockers is the number of tasks about to call lock() or holding the mutex +/// queue_size is the number of queued up tasks +struct MutexState { + priv state: atomics::AtomicUint // XXX: this needs to become AtomicU64 +} + +static INIT_MUTEX_STATE: MutexState = MutexState {state: atomics::INIT_ATOMIC_UINT}; + +static LOCKERS_SHIFT: uint = 0; +// XXX: this limits 32-bit tasks to 2^16; we need to use 64-bit atomics on 32-bit too to fix this +#[cfg(target_word_size = "32")] static QUEUE_SIZE_SHIFT: uint = 16; +#[cfg(target_word_size = "64")] static QUEUE_SIZE_SHIFT: uint = 32; + +static LOCKERS_MASK: uint = (1 << QUEUE_SIZE_SHIFT) - (1 << LOCKERS_SHIFT); +static QUEUE_SIZE_MASK: uint = -(1 << QUEUE_SIZE_SHIFT); + +impl MutexState { + pub fn new() -> MutexState { + MutexState {state: atomics::AtomicUint::new(0)} + } + + // if queue_size == 0 {++lockers; true} else {false} + pub fn should_lock(&self) -> bool { + // optimistically speculate we have no contention + let mut a = self.state.compare_and_swap(0, (1 << LOCKERS_SHIFT), atomics::SeqCst); + if a == 0 {return true;} + + loop { + let (b, r) = if (a & QUEUE_SIZE_MASK) != 0 { + return false; + } else { + (a + (1 << LOCKERS_SHIFT), true) + }; + let v = self.state.compare_and_swap(a, b, atomics::SeqCst); + if a == v {return r;} + a = v; + } + } + + // ++queue_size; if(lockers == 0) {++lockers; true} else {false} + pub fn queue_and_should_lock(&self) -> bool { + // optimistically speculate we have only green tasks and nothing MUST_QUEUE + let mut a = self.state.compare_and_swap((1 << LOCKERS_SHIFT), + (1 << LOCKERS_SHIFT) + (1 << QUEUE_SIZE_SHIFT), atomics::SeqCst); + if a == (1 << LOCKERS_SHIFT) {return false;} + + loop { + let (b, r) = if (a & LOCKERS_MASK) == 0 { + (a + (1 << LOCKERS_SHIFT) + (1 << QUEUE_SIZE_SHIFT), true) + } else { + (a + (1 << QUEUE_SIZE_SHIFT), false) + }; + let v = self.state.compare_and_swap(a, b, atomics::SeqCst); + if a == v {return r;} + a = v; + } + } + + // --queue_size; + pub fn dequeue(&self) { + self.state.fetch_sub((1 << QUEUE_SIZE_SHIFT), atomics::SeqCst); + } + + // if(queue_size != 0 && lockers == 1) {--queue_size; true} else {--lockers; false} + pub fn should_dequeue(&self) -> bool { + // optimistically speculate we have no contention + let mut a = self.state.compare_and_swap((1 << LOCKERS_SHIFT), 0, atomics::SeqCst); + if a == (1 << LOCKERS_SHIFT) {return false;} + + loop { + let (b, r) = if ((a & LOCKERS_MASK) == (1 << LOCKERS_SHIFT) + && (a & QUEUE_SIZE_MASK) != 0) { + (a - (1 << QUEUE_SIZE_SHIFT), true) + } else { + (a - (1 << LOCKERS_SHIFT), false) + }; + let v = self.state.compare_and_swap(a, b, atomics::SeqCst); + if a == v {return r;} + a = v; + } + } + + // queue_size == 0 && lockers == 0 + pub fn can_try_lock(&self) -> bool { + self.state.load(atomics::SeqCst) == 0 + } +} + +// try_lock() { +// if atomically {queue_size == 0 && lockers == 0} && lock.try_lock() { +// if atomically {if queue_size == 0 {++lockers; true} else {false}} { +// ok +// } else { +// lock.unlock() +// fail +// } +// } else { +// fail +// } +// } +// +// lock() { +// if try_lock() { +// return guard; +// } +// +// if can_block && atomically {if queue_size == 0 {++lockers; true} else {false}} { +// lock.lock(); +// } else { +// q.push(); +// if atomically {++queue_size; if(lockers == 0) {++lockers; true} else {false}} { +// // this never blocks indefinitely +// // this is because lockers was 0, so we have no one having or trying to get the lock +// // and we atomically set queue_size to a positive value, so no one will start blocking +// lock.lock(); +// atomically {--queue_size} +// t = q.pop(); +// if t != ourselves { +// t.wakeup(); +// go to sleep +// } +// } else { +// go to sleep +// } +// } +// } +// +// unlock() { +// if atomically +// {if(queue_size != 0 && lockers == 1) {--queue_size; true} else {--lockers; false}} +// { +// t = q.pop(); +// t.wakeup(); +// } else { +// lock.unlock() +// } +// } +impl StaticMutex { + /// Try to acquire this lock, see `Mutex::try_lock` + fn try_lock<'a>(&'a self) -> Option> { + // note that we can't implement this by first calling should_lock() + // and then try_lock(), because once should_lock() succeeds we have + // committed to waking up tasks, and we can only do that by blocking on the mutex + + // also, this is the only place in the Mutex code where we aren't guaranteed that a + // Task structure exists, and thus the task number limit doesn't limit this + // however, we don't change self.state unless we manage to get the lock, so this + // can only account for a single extra "task without ~Task", which is accounted by + // having the Task limit be (1 << 16) - 2 or (1 << 32) - 2 + if self.state.can_try_lock() && unsafe { self.lock.trylock() } { + // here we have the lock, but haven't told anyone about it + // this means that a green task might be blocking expecting to get the lock + // so if queue_size != 0 we abort and unlock, otherwise atomically increasing lockers + + // this is the same code used for the blocking lock(), because since we have the lock + // already, we don't care have the problem of possibly "blocking" on other tasks + if self.state.should_lock() { + Some(Guard{ lock: self }) + } else { + // oops, we shouldn't have taken the lock because a task got queued in between + // just unlock it and return failure, no one will know since we changed no state + unsafe { self.lock.unlock(); } + None + } + } else { + None + } + } + + /// Acquires this lock, see `Mutex::lock` + pub fn lock<'a>(&'a self) -> Guard<'a> { + // Remember that an explicit goal of these mutexes is to be "just as + // fast" as pthreads. Note that at some point our implementation + // requires an answer to the question "can we block" and implies a hit + // to OS TLS. In attempt to avoid this hit and to maintain efficiency in + // the uncontended case (very important) we start off by hitting a + // trylock on the OS mutex. If we succeed, then we're lucky! + match self.try_lock() { + Some(guard) => {return guard; }, + None => {} + } + + let t: ~Task = Local::take(); + let can_block = t.can_block(); + if can_block && self.state.should_lock() { + // Tasks which can block are super easy. These tasks just accept the + // TLS hit we just made, and then call the blocking `lock()` + // function. Turns out the TLS hit is essentially 0 on contention. + Local::put(t); + unsafe { self.lock.lock(); } + } else { + let mut our_node = q::Node::new(0); + t.deschedule(1, |task| { + our_node.data = unsafe { task.cast_to_uint() }; + unsafe { self.q.push(&mut our_node); } + + if self.state.queue_and_should_lock() { + // this code generally only gets executed in a race window, since typically + // either the trylock succeeds, and we return early, or we have someone else + // running (lockers != 0), so we take the other branch of this if and wait + // for someone else to wake us up + // + // in particular, this code only runs if someone unlocked the mutex between + // the try_lock and the self.state.queue_and_should_lock above + unsafe { self.lock.lock(); } + self.state.dequeue(); + + let node = unsafe { self.q.pop() }.expect("the queue is empty but queue_size was != 0"); + + // If we popped ourselves, then we just unblock. If it's someone + // else, we wake up the task and go to sleep + if node == &mut our_node as *mut q::Node { + Err(unsafe { BlockedTask::cast_from_uint(our_node.data) }) + } else { + unsafe { BlockedTask::cast_from_uint((*node).data) }.wake().map(|t| t.reawaken()); + Ok(()) + } + } else { + Ok(()) + } + }); + } + + Guard { lock: self } + } + + fn unlock(&self) { + // If we are the only locker and someone is queued, dequeue and wake them up + // otherwise unlock, either to let another locker run, or to completely unlock the mutex + + // This allows to preserve fairness, by prioritizing tasks acquiring the OS mutex over + // queued up task. + // Note that once the queue is non-empty, everyone will queue, so fairness is preserved + // in the other sense too. + if self.state.should_dequeue() { + let node = unsafe { self.q.pop() }.expect("the queue is empty but queue_size was != 0"); + unsafe { BlockedTask::cast_from_uint((*node).data) }.wake().map(|t| t.reawaken()); + } else { + unsafe { self.lock.unlock(); } + } + } + + /// Deallocates resources associated with this static mutex. + /// + /// This method is unsafe because it provides no guarantees that there are + /// no active users of this mutex, and safety is not guaranteed if there are + /// active users of this mutex. + /// + /// This method is required to ensure that there are no memory leaks on + /// *all* platforms. It may be the case that some platforms do not leak + /// memory if this method is not called, but this is not guaranteed to be + /// true on all platforms. + pub unsafe fn destroy(&self) { + self.lock.destroy() + } +} + +impl Mutex { + /// Creates a new mutex in an unlocked state ready for use. + pub fn new() -> Mutex { + Mutex { + lock: StaticMutex { + state: MutexState::new(), + q: q::Queue::new(), + lock: unsafe { mutex::Mutex::new() }, + } + } + } + + /// Attempts to acquire this lock. + /// + /// If the lock could not be acquired at this time, then `Err(self)` is returned. + /// Otherwise, an RAII guard is returned. The lock will be unlocked when the + /// guard is dropped. + /// + /// This function does not block. + pub fn try_lock<'a>(&'a self) -> Option> { + self.lock.try_lock() + } + + /// Acquires a mutex, blocking the current task until it is able to do so. + /// + /// This function will block the local task until it is availble to acquire + /// the mutex. Upon returning, the task is the only task with the mutex + /// held. An RAII guard is returned to allow scoped unlock of the lock. When + /// the guard goes out of scope, the mutex will be unlocked. + pub fn lock<'a>(&'a self) -> Guard<'a> { self.lock.lock() } +} + +#[unsafe_destructor] +impl<'a> Drop for Guard<'a> { + #[inline] + fn drop(&mut self) { + self.lock.unlock(); + } +} + +impl Drop for Mutex { + fn drop(&mut self) { + // This is actually safe b/c we know that there is no further usage of + // this mutex (it's up to the user to arrange for a mutex to get + // dropped, that's not our job) + unsafe { self.lock.destroy() } + } +} + +#[cfg(test)] +mod test { + use prelude::*; + use super::{Mutex, StaticMutex, MUTEX_INIT}; + use native; + + #[test] + fn smoke() { + let mut m = Mutex::new(); + drop(m.lock()); + drop(m.lock()); + } + + #[test] + fn smoke_static() { + static mut m: StaticMutex = MUTEX_INIT; + unsafe { + drop(m.lock()); + drop(m.lock()); + m.destroy(); + } + } + + #[test] + fn lots_and_lots() { + static mut m: StaticMutex = MUTEX_INIT; + static mut CNT: uint = 0; + static M: uint = 10000; + static N: uint = 3; + + fn inc() { + for _ in range(0, M) { + unsafe { + let _g = m.lock(); + CNT += 1; + } + } + } + + let (p, c) = SharedChan::new(); + for _ in range(0, N) { + let c2 = c.clone(); + do native::task::spawn { inc(); c2.send(()); } + let c2 = c.clone(); + do spawn { inc(); c2.send(()); } + } + + drop(c); + for _ in range(0, 2 * N) { + p.recv(); + } + assert_eq!(unsafe {CNT}, M * N * 2); + unsafe { + m.destroy(); + } + } + + #[test] + fn trylock() { + let mut m = Mutex::new(); + assert!(m.try_lock().is_some()); + } +} From 5c1da18c8415312ce7375feebd139d425b9c279c Mon Sep 17 00:00:00 2001 From: Bill Myers Date: Wed, 15 Jan 2014 02:48:30 +0100 Subject: [PATCH 8/9] once: replace Once with an Once based on the high-level Mutex --- src/libnative/io/net.rs | 2 +- src/librustc/back/link.rs | 2 +- src/librustc/middle/trans/base.rs | 2 +- src/libstd/sync/mod.rs | 2 + src/libstd/sync/one.rs | 170 ++++++++++++++++++++++++++++++ src/libstd/unstable/mutex.rs | 145 +------------------------ 6 files changed, 176 insertions(+), 147 deletions(-) create mode 100644 src/libstd/sync/one.rs diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index adcd21f0ac4c5..a60034a7170ca 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -201,7 +201,7 @@ pub fn init() { } unsafe { - use std::unstable::mutex::{Once, ONCE_INIT}; + use std::sync::{Once, ONCE_INIT}; static mut INIT: Once = ONCE_INIT; INIT.doit(|| { let mut data: WSADATA = intrinsics::init(); diff --git a/src/librustc/back/link.rs b/src/librustc/back/link.rs index ffb9cce033ed7..04c4c9ce99f2b 100644 --- a/src/librustc/back/link.rs +++ b/src/librustc/back/link.rs @@ -311,7 +311,7 @@ pub mod write { } unsafe fn configure_llvm(sess: Session) { - use std::unstable::mutex::{Once, ONCE_INIT}; + use std::sync::{Once, ONCE_INIT}; static mut INIT: Once = ONCE_INIT; // Copy what clan does by turning on loop vectorization at O2 and diff --git a/src/librustc/middle/trans/base.rs b/src/librustc/middle/trans/base.rs index aaa8d071aff5d..f2c9425293236 100644 --- a/src/librustc/middle/trans/base.rs +++ b/src/librustc/middle/trans/base.rs @@ -3295,7 +3295,7 @@ pub fn trans_crate(sess: session::Session, output: &Path) -> CrateTranslation { // Before we touch LLVM, make sure that multithreading is enabled. unsafe { - use std::unstable::mutex::{Once, ONCE_INIT}; + use std::sync::{Once, ONCE_INIT}; static mut INIT: Once = ONCE_INIT; static mut POISONED: bool = false; INIT.doit(|| { diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs index 1dc350dd7cac4..e206ba6129f16 100644 --- a/src/libstd/sync/mod.rs +++ b/src/libstd/sync/mod.rs @@ -16,6 +16,7 @@ //! other types of concurrent primitives. pub use self::mutex::{Mutex, StaticMutex, Guard, MUTEX_INIT}; +pub use self::one::{Once, ONCE_INIT}; pub mod arc; pub mod atomics; @@ -26,3 +27,4 @@ pub mod spsc_queue; mod mpsc_intrusive; mod mutex; +mod one; diff --git a/src/libstd/sync/one.rs b/src/libstd/sync/one.rs new file mode 100644 index 0000000000000..1c395b9cb08ab --- /dev/null +++ b/src/libstd/sync/one.rs @@ -0,0 +1,170 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A "once initialization" primitive +//! +//! This primitive is meant to be used to run one-time initialization. An +//! example use case would be for initializing an FFI library. + +use int; +use sync::atomics; +use sync::{StaticMutex, MUTEX_INIT}; + +/// A type which can be used to run a one-time global initialization. This type +/// is *unsafe* to use because it is built on top of the `Mutex` in this module. +/// It does not know whether the currently running task is in a green or native +/// context, and a blocking mutex should *not* be used under normal +/// circumstances on a green task. +/// +/// Despite its unsafety, it is often useful to have a one-time initialization +/// routine run for FFI bindings or related external functionality. This type +/// can only be statically constructed with the `ONCE_INIT` value. +/// +/// # Example +/// +/// ```rust +/// use std::unstable::mutex::{Once, ONCE_INIT}; +/// +/// static mut START: Once = ONCE_INIT; +/// unsafe { +/// START.doit(|| { +/// // run initialization here +/// }); +/// } +/// ``` +pub struct Once { + priv mutex: StaticMutex, + priv cnt: atomics::AtomicInt, + priv lock_cnt: atomics::AtomicInt, +} + +/// Initialization value for static `Once` values. +pub static ONCE_INIT: Once = Once { + mutex: MUTEX_INIT, + cnt: atomics::INIT_ATOMIC_INT, + lock_cnt: atomics::INIT_ATOMIC_INT, +}; + +impl Once { + /// Perform an initialization routine once and only once. The given closure + /// will be executed if this is the first time `doit` has been called, and + /// otherwise the routine will *not* be invoked. + /// + /// This method will block the calling *os thread* if another initialization + /// routine is currently running. + /// + /// When this function returns, it is guaranteed that some initialization + /// has run and completed (it may not be the closure specified). + pub fn doit(&mut self, f: ||) { + // Implementation-wise, this would seem like a fairly trivial primitive. + // The stickler part is where our mutexes currently require an + // allocation, and usage of a `Once` should't leak this allocation. + // + // This means that there must be a deterministic destroyer of the mutex + // contained within (because it's not needed after the initialization + // has run). + // + // The general scheme here is to gate all future threads once + // initialization has completed with a "very negative" count, and to + // allow through threads to lock the mutex if they see a non negative + // count. For all threads grabbing the mutex, exactly one of them should + // be responsible for unlocking the mutex, and this should only be done + // once everyone else is done with the mutex. + // + // This atomicity is achieved by swapping a very negative value into the + // shared count when the initialization routine has completed. This will + // read the number of threads which will at some point attempt to + // acquire the mutex. This count is then squirreled away in a separate + // variable, and the last person on the way out of the mutex is then + // responsible for destroying the mutex. + // + // It is crucial that the negative value is swapped in *after* the + // initialization routine has completed because otherwise new threads + // calling `doit` will return immediately before the initialization has + // completed. + + let prev = self.cnt.fetch_add(1, atomics::SeqCst); + if prev < 0 { + // Make sure we never overflow, we'll never have int::min_value + // simultaneous calls to `doit` to make this value go back to 0 + self.cnt.store(int::min_value, atomics::SeqCst); + return + } + + // If the count is negative, then someone else finished the job, + // otherwise we run the job and record how many people will try to grab + // this lock + { + let _guard = self.mutex.lock(); + if self.cnt.load(atomics::SeqCst) > 0 { + f(); + let prev = self.cnt.swap(int::min_value, atomics::SeqCst); + self.lock_cnt.store(prev, atomics::SeqCst); + } + } + + // Last one out cleans up after everyone else, no leaks! + if self.lock_cnt.fetch_add(-1, atomics::SeqCst) == 1 { + unsafe { self.mutex.destroy() } + } + } +} + +#[cfg(test)] +mod test { + use prelude::*; + + use super::{ONCE_INIT, Once}; + use task; + + #[test] + fn smoke_once() { + static mut o: Once = ONCE_INIT; + let mut a = 0; + unsafe { o.doit(|| a += 1); } + assert_eq!(a, 1); + unsafe { o.doit(|| a += 1); } + assert_eq!(a, 1); + } + + #[test] + fn stampede_once() { + static mut o: Once = ONCE_INIT; + static mut run: bool = false; + + let (p, c) = SharedChan::new(); + for _ in range(0, 10) { + let c = c.clone(); + do spawn { + for _ in range(0, 4) { task::deschedule() } + unsafe { + o.doit(|| { + assert!(!run); + run = true; + }); + assert!(run); + } + c.send(()); + } + } + + unsafe { + o.doit(|| { + assert!(!run); + run = true; + }); + assert!(run); + } + + for _ in range(0, 10) { + p.recv(); + } + } +} diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs index 1b0ecc25eac18..d9d4f956eae36 100644 --- a/src/libstd/unstable/mutex.rs +++ b/src/libstd/unstable/mutex.rs @@ -425,157 +425,14 @@ mod imp { } } -/// A type which can be used to run a one-time global initialization. This type -/// is *unsafe* to use because it is built on top of the `Mutex` in this module. -/// It does not know whether the currently running task is in a green or native -/// context, and a blocking mutex should *not* be used under normal -/// circumstances on a green task. -/// -/// Despite its unsafety, it is often useful to have a one-time initialization -/// routine run for FFI bindings or related external functionality. This type -/// can only be statically constructed with the `ONCE_INIT` value. -/// -/// # Example -/// -/// ```rust -/// use std::unstable::mutex::{Once, ONCE_INIT}; -/// -/// static mut START: Once = ONCE_INIT; -/// unsafe { -/// START.doit(|| { -/// // run initialization here -/// }); -/// } -/// ``` -pub struct Once { - priv mutex: Mutex, - priv cnt: atomics::AtomicInt, - priv lock_cnt: atomics::AtomicInt, -} - -/// Initialization value for static `Once` values. -pub static ONCE_INIT: Once = Once { - mutex: MUTEX_INIT, - cnt: atomics::INIT_ATOMIC_INT, - lock_cnt: atomics::INIT_ATOMIC_INT, -}; - -impl Once { - /// Perform an initialization routine once and only once. The given closure - /// will be executed if this is the first time `doit` has been called, and - /// otherwise the routine will *not* be invoked. - /// - /// This method will block the calling *os thread* if another initialization - /// routine is currently running. - /// - /// When this function returns, it is guaranteed that some initialization - /// has run and completed (it may not be the closure specified). - pub fn doit(&mut self, f: ||) { - // Implementation-wise, this would seem like a fairly trivial primitive. - // The stickler part is where our mutexes currently require an - // allocation, and usage of a `Once` should't leak this allocation. - // - // This means that there must be a deterministic destroyer of the mutex - // contained within (because it's not needed after the initialization - // has run). - // - // The general scheme here is to gate all future threads once - // initialization has completed with a "very negative" count, and to - // allow through threads to lock the mutex if they see a non negative - // count. For all threads grabbing the mutex, exactly one of them should - // be responsible for unlocking the mutex, and this should only be done - // once everyone else is done with the mutex. - // - // This atomicity is achieved by swapping a very negative value into the - // shared count when the initialization routine has completed. This will - // read the number of threads which will at some point attempt to - // acquire the mutex. This count is then squirreled away in a separate - // variable, and the last person on the way out of the mutex is then - // responsible for destroying the mutex. - // - // It is crucial that the negative value is swapped in *after* the - // initialization routine has completed because otherwise new threads - // calling `doit` will return immediately before the initialization has - // completed. - - let prev = self.cnt.fetch_add(1, atomics::SeqCst); - if prev < 0 { - // Make sure we never overflow, we'll never have int::min_value - // simultaneous calls to `doit` to make this value go back to 0 - self.cnt.store(int::min_value, atomics::SeqCst); - return - } - - // If the count is negative, then someone else finished the job, - // otherwise we run the job and record how many people will try to grab - // this lock - unsafe { self.mutex.lock() } - if self.cnt.load(atomics::SeqCst) > 0 { - f(); - let prev = self.cnt.swap(int::min_value, atomics::SeqCst); - self.lock_cnt.store(prev, atomics::SeqCst); - } - unsafe { self.mutex.unlock() } - - // Last one out cleans up after everyone else, no leaks! - if self.lock_cnt.fetch_add(-1, atomics::SeqCst) == 1 { - unsafe { self.mutex.destroy() } - } - } -} - #[cfg(test)] mod test { use prelude::*; use rt::thread::Thread; - use super::{ONCE_INIT, Once, Mutex, MUTEX_INIT, Cond, COND_INIT}; + use super::{Mutex, MUTEX_INIT, Cond, COND_INIT}; use task; - #[test] - fn smoke_once() { - static mut o: Once = ONCE_INIT; - let mut a = 0; - unsafe { o.doit(|| a += 1); } - assert_eq!(a, 1); - unsafe { o.doit(|| a += 1); } - assert_eq!(a, 1); - } - - #[test] - fn stampede_once() { - static mut o: Once = ONCE_INIT; - static mut run: bool = false; - - let (p, c) = SharedChan::new(); - for _ in range(0, 10) { - let c = c.clone(); - do spawn { - for _ in range(0, 4) { task::deschedule() } - unsafe { - o.doit(|| { - assert!(!run); - run = true; - }); - assert!(run); - } - c.send(()); - } - } - - unsafe { - o.doit(|| { - assert!(!run); - run = true; - }); - assert!(run); - } - - for _ in range(0, 10) { - p.recv(); - } - } - #[test] fn somke_lock() { static lock: Mutex = MUTEX_INIT; From ddf77e4215bd9884f9eff3ac25a64166be570048 Mon Sep 17 00:00:00 2001 From: Bill Myers Date: Wed, 15 Jan 2014 02:09:28 +0100 Subject: [PATCH 9/9] extra::sync: make semaphore use sync::Mutex --- src/libextra/sync.rs | 45 ++++++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index a907c2a5323ca..14cb7b9709ed2 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -19,12 +19,13 @@ use std::borrow; -use std::unstable::sync::Exclusive; +use std::cast; use std::sync::arc::UnsafeArc; use std::sync::atomics; +use std::sync; use std::unstable::finally::Finally; -use std::util; use std::util::NonCopyable; +use std::util; /**************************************************************************** * Internals @@ -85,30 +86,38 @@ impl WaitQueue { } // The building-block used to make semaphores, mutexes, and rwlocks. -#[doc(hidden)] struct SemInner { + lock: sync::Mutex, count: int, - waiters: WaitQueue, + waiters: WaitQueue, // Can be either unit or another waitqueue. Some sems shouldn't come with // a condition variable attached, others should. - blocked: Q + blocked: Q } -#[doc(hidden)] -struct Sem(Exclusive>); +struct Sem(UnsafeArc>); -#[doc(hidden)] impl Sem { fn new(count: int, q: Q) -> Sem { - Sem(Exclusive::new(SemInner { - count: count, waiters: WaitQueue::new(), blocked: q })) + Sem(UnsafeArc::new(SemInner { + count: count, + waiters: WaitQueue::new(), + blocked: q, + lock: sync::Mutex::new(), + })) + } + + unsafe fn with(&self, f: |&mut SemInner|) { + let Sem(ref arc) = *self; + let state = arc.get(); + let _g = (*state).lock.lock(); + f(cast::transmute(state)); } pub fn acquire(&self) { unsafe { let mut waiter_nobe = None; - let Sem(ref lock) = *self; - lock.with(|state| { + self.with(|state| { state.count -= 1; if state.count < 0 { // Create waiter nobe, enqueue ourself, and tell @@ -127,8 +136,7 @@ impl Sem { pub fn release(&self) { unsafe { - let Sem(ref lock) = *self; - lock.with(|state| { + self.with(|state| { state.count += 1; if state.count <= 0 { state.waiters.signal(); @@ -208,8 +216,7 @@ impl<'a> Condvar<'a> { let mut out_of_bounds = None; // Release lock, 'atomically' enqueuing ourselves in so doing. unsafe { - let Sem(ref queue) = *self.sem; - queue.with(|state| { + self.sem.with(|state| { if condvar_id < state.blocked.len() { // Drop the lock. state.count += 1; @@ -251,8 +258,7 @@ impl<'a> Condvar<'a> { unsafe { let mut out_of_bounds = None; let mut result = false; - let Sem(ref lock) = *self.sem; - lock.with(|state| { + self.sem.with(|state| { if condvar_id < state.blocked.len() { result = state.blocked[condvar_id].signal(); } else { @@ -274,8 +280,7 @@ impl<'a> Condvar<'a> { let mut out_of_bounds = None; let mut queue = None; unsafe { - let Sem(ref lock) = *self.sem; - lock.with(|state| { + self.sem.with(|state| { if condvar_id < state.blocked.len() { // To avoid :broadcast_heavy, we make a new waitqueue, // swap it out with the old one, and broadcast on the