From efbd3724c012d68afd428beaa22f0d5aabff007d Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 15:51:31 -0700 Subject: [PATCH 01/13] std: Rebuild sync::deque on Arc This also removes the `&mut self` requirement, using the correct `&self` requirement for concurrent types. --- src/libstd/sync/deque.rs | 52 +++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/src/libstd/sync/deque.rs b/src/libstd/sync/deque.rs index 30b95ffb34f69..42a8bd886525b 100644 --- a/src/libstd/sync/deque.rs +++ b/src/libstd/sync/deque.rs @@ -48,6 +48,8 @@ // FIXME: all atomic operations in this module use a SeqCst ordering. That is // probably overkill +use alloc::arc::Arc; + use clone::Clone; use iter::{range, Iterator}; use kinds::Send; @@ -58,7 +60,6 @@ use owned::Box; use ptr::RawPtr; use ptr; use slice::ImmutableVector; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicInt, AtomicPtr, SeqCst}; use unstable::sync::Exclusive; use rt::heap::{allocate, deallocate}; @@ -87,14 +88,14 @@ struct Deque { /// /// There may only be one worker per deque. pub struct Worker { - deque: UnsafeArc>, + deque: Arc>, } /// The stealing half of the work-stealing deque. Stealers have access to the /// opposite end of the deque from the worker, and they only have access to the /// `steal` method. pub struct Stealer { - deque: UnsafeArc>, + deque: Arc>, } /// When stealing some data, this is an enumeration of the possible outcomes. @@ -149,12 +150,13 @@ impl BufferPool { /// Allocates a new work-stealing deque which will send/receiving memory to /// and from this buffer pool. - pub fn deque(&mut self) -> (Worker, Stealer) { - let (a, b) = UnsafeArc::new2(Deque::new(self.clone())); + pub fn deque(&self) -> (Worker, Stealer) { + let a = Arc::new(Deque::new(self.clone())); + let b = a.clone(); (Worker { deque: a }, Stealer { deque: b }) } - fn alloc(&mut self, bits: int) -> Box> { + fn alloc(&self, bits: int) -> Box> { unsafe { self.pool.with(|pool| { match pool.iter().position(|x| x.size() >= (1 << bits)) { @@ -165,7 +167,7 @@ impl BufferPool { } } - fn free(&mut self, buf: Box>) { + fn free(&self, buf: Box>) { unsafe { let mut buf = Some(buf); self.pool.with(|pool| { @@ -185,34 +187,34 @@ impl Clone for BufferPool { impl Worker { /// Pushes data onto the front of this work queue. - pub fn push(&mut self, t: T) { - unsafe { (*self.deque.get()).push(t) } + pub fn push(&self, t: T) { + unsafe { self.deque.push(t) } } /// Pops data off the front of the work queue, returning `None` on an empty /// queue. - pub fn pop(&mut self) -> Option { - unsafe { (*self.deque.get()).pop() } + pub fn pop(&self) -> Option { + unsafe { self.deque.pop() } } /// Gets access to the buffer pool that this worker is attached to. This can /// be used to create more deques which share the same buffer pool as this /// deque. - pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool { - unsafe { &mut (*self.deque.get()).pool } + pub fn pool<'a>(&'a self) -> &'a BufferPool { + &self.deque.pool } } impl Stealer { /// Steals work off the end of the queue (opposite of the worker's end) - pub fn steal(&mut self) -> Stolen { - unsafe { (*self.deque.get()).steal() } + pub fn steal(&self) -> Stolen { + unsafe { self.deque.steal() } } /// Gets access to the buffer pool that this stealer is attached to. This /// can be used to create more deques which share the same buffer pool as /// this deque. - pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool { - unsafe { &mut (*self.deque.get()).pool } + pub fn pool<'a>(&'a self) -> &'a BufferPool { + &self.deque.pool } } @@ -224,7 +226,7 @@ impl Clone for Stealer { // personally going to heavily comment what's going on here. impl Deque { - fn new(mut pool: BufferPool) -> Deque { + fn new(pool: BufferPool) -> Deque { let buf = pool.alloc(MIN_BITS); Deque { bottom: AtomicInt::new(0), @@ -234,7 +236,7 @@ impl Deque { } } - unsafe fn push(&mut self, data: T) { + unsafe fn push(&self, data: T) { let mut b = self.bottom.load(SeqCst); let t = self.top.load(SeqCst); let mut a = self.array.load(SeqCst); @@ -250,7 +252,7 @@ impl Deque { self.bottom.store(b + 1, SeqCst); } - unsafe fn pop(&mut self) -> Option { + unsafe fn pop(&self) -> Option { let b = self.bottom.load(SeqCst); let a = self.array.load(SeqCst); let b = b - 1; @@ -276,7 +278,7 @@ impl Deque { } } - unsafe fn steal(&mut self) -> Stolen { + unsafe fn steal(&self) -> Stolen { let t = self.top.load(SeqCst); let old = self.array.load(SeqCst); let b = self.bottom.load(SeqCst); @@ -298,7 +300,7 @@ impl Deque { } } - unsafe fn maybe_shrink(&mut self, b: int, t: int) { + unsafe fn maybe_shrink(&self, b: int, t: int) { let a = self.array.load(SeqCst); if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) { self.swap_buffer(b, a, (*a).resize(b, t, -1)); @@ -312,7 +314,7 @@ impl Deque { // after this method has called 'free' on it. The continued usage is simply // a read followed by a forget, but we must make sure that the memory can // continue to be read after we flag this buffer for reclamation. - unsafe fn swap_buffer(&mut self, b: int, old: *mut Buffer, + unsafe fn swap_buffer(&self, b: int, old: *mut Buffer, buf: Buffer) -> *mut Buffer { let newbuf: *mut Buffer = transmute(box buf); self.array.store(newbuf, SeqCst); @@ -373,7 +375,7 @@ impl Buffer { // Unsafe because this unsafely overwrites possibly uninitialized or // initialized data. - unsafe fn put(&mut self, i: int, t: T) { + unsafe fn put(&self, i: int, t: T) { let ptr = self.storage.offset(i & self.mask()); ptr::copy_nonoverlapping_memory(ptr as *mut T, &t as *T, 1); forget(t); @@ -382,7 +384,7 @@ impl Buffer { // Again, unsafe because this has incredibly dubious ownership violations. // It is assumed that this buffer is immediately dropped. unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer { - let mut buf = Buffer::new(self.log_size + delta); + let buf = Buffer::new(self.log_size + delta); for i in range(t, b) { buf.put(i, self.get(i)); } From 7db02e20f2140530a9402f7d7452b10cac6fdf7b Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 16:01:48 -0700 Subject: [PATCH 02/13] std: Rebuild mpmc queues on Unsafe/Arc This removes usage of UnsafeArc and uses proper self mutability for concurrent types. --- src/libstd/sync/mpmc_bounded_queue.rs | 50 +++++++++++++++------------ 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs index 2df5031b4826c..7fb98e140865d 100644 --- a/src/libstd/sync/mpmc_bounded_queue.rs +++ b/src/libstd/sync/mpmc_bounded_queue.rs @@ -29,13 +29,15 @@ // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue +use alloc::arc::Arc; + use clone::Clone; use kinds::Send; use num::next_power_of_two; use option::{Option, Some, None}; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicUint,Relaxed,Release,Acquire}; use vec::Vec; +use ty::Unsafe; struct Node { sequence: AtomicUint, @@ -44,7 +46,7 @@ struct Node { struct State { pad0: [u8, ..64], - buffer: Vec>, + buffer: Vec>>, mask: uint, pad1: [u8, ..64], enqueue_pos: AtomicUint, @@ -54,7 +56,7 @@ struct State { } pub struct Queue { - state: UnsafeArc>, + state: Arc>, } impl State { @@ -70,7 +72,7 @@ impl State { capacity }; let buffer = Vec::from_fn(capacity, |i| { - Node { sequence:AtomicUint::new(i), value: None } + Unsafe::new(Node { sequence:AtomicUint::new(i), value: None }) }); State{ pad0: [0, ..64], @@ -84,19 +86,21 @@ impl State { } } - fn push(&mut self, value: T) -> bool { + fn push(&self, value: T) -> bool { let mask = self.mask; let mut pos = self.enqueue_pos.load(Relaxed); loop { - let node = self.buffer.get_mut(pos & mask); - let seq = node.sequence.load(Acquire); + let node = self.buffer.get(pos & mask); + let seq = unsafe { (*node.get()).sequence.load(Acquire) }; let diff: int = seq as int - pos as int; if diff == 0 { let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed); if enqueue_pos == pos { - node.value = Some(value); - node.sequence.store(pos+1, Release); + unsafe { + (*node.get()).value = Some(value); + (*node.get()).sequence.store(pos+1, Release); + } break } else { pos = enqueue_pos; @@ -110,19 +114,21 @@ impl State { true } - fn pop(&mut self) -> Option { + fn pop(&self) -> Option { let mask = self.mask; let mut pos = self.dequeue_pos.load(Relaxed); loop { - let node = self.buffer.get_mut(pos & mask); - let seq = node.sequence.load(Acquire); + let node = self.buffer.get(pos & mask); + let seq = unsafe { (*node.get()).sequence.load(Acquire) }; let diff: int = seq as int - (pos + 1) as int; if diff == 0 { let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed); if dequeue_pos == pos { - let value = node.value.take(); - node.sequence.store(pos + mask + 1, Release); - return value + unsafe { + let value = (*node.get()).value.take(); + (*node.get()).sequence.store(pos + mask + 1, Release); + return value + } } else { pos = dequeue_pos; } @@ -138,24 +144,22 @@ impl State { impl Queue { pub fn with_capacity(capacity: uint) -> Queue { Queue{ - state: UnsafeArc::new(State::with_capacity(capacity)) + state: Arc::new(State::with_capacity(capacity)) } } - pub fn push(&mut self, value: T) -> bool { - unsafe { (*self.state.get()).push(value) } + pub fn push(&self, value: T) -> bool { + self.state.push(value) } - pub fn pop(&mut self) -> Option { - unsafe { (*self.state.get()).pop() } + pub fn pop(&self) -> Option { + self.state.pop() } } impl Clone for Queue { fn clone(&self) -> Queue { - Queue { - state: self.state.clone() - } + Queue { state: self.state.clone() } } } From 2966e970cabdf7103ad61c840c72bf58352150e0 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 17:27:52 -0700 Subject: [PATCH 03/13] std: Rebuild mpsc queue with Unsafe/&self This removes the incorrect `&mut self` taken because it can alias among many threads. --- src/libstd/sync/mpsc_queue.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index 4cdcd05e9b450..23afb9487ec23 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -45,6 +45,7 @@ use option::{Option, None, Some}; use owned::Box; use ptr::RawPtr; use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; +use ty::Unsafe; /// A result of the `pop` function. pub enum PopResult { @@ -69,7 +70,7 @@ struct Node { /// popper at a time (many pushers are allowed). pub struct Queue { head: AtomicPtr>, - tail: *mut Node, + tail: Unsafe<*mut Node>, } impl Node { @@ -88,12 +89,12 @@ impl Queue { let stub = unsafe { Node::new(None) }; Queue { head: AtomicPtr::new(stub), - tail: stub, + tail: Unsafe::new(stub), } } /// Pushes a new value onto this queue. - pub fn push(&mut self, t: T) { + pub fn push(&self, t: T) { unsafe { let n = Node::new(Some(t)); let prev = self.head.swap(n, AcqRel); @@ -111,13 +112,13 @@ impl Queue { /// /// This inconsistent state means that this queue does indeed have data, but /// it does not currently have access to it at this time. - pub fn pop(&mut self) -> PopResult { + pub fn pop(&self) -> PopResult { unsafe { - let tail = self.tail; + let tail = *self.tail.get(); let next = (*tail).next.load(Acquire); if !next.is_null() { - self.tail = next; + *self.tail.get() = next; assert!((*tail).value.is_none()); assert!((*next).value.is_some()); let ret = (*next).value.take_unwrap(); @@ -131,7 +132,7 @@ impl Queue { /// Attempts to pop data from this queue, but doesn't attempt too hard. This /// will canonicalize inconsistent states to a `None` value. - pub fn casual_pop(&mut self) -> Option { + pub fn casual_pop(&self) -> Option { match self.pop() { Data(t) => Some(t), Empty | Inconsistent => None, @@ -143,7 +144,7 @@ impl Queue { impl Drop for Queue { fn drop(&mut self) { unsafe { - let mut cur = self.tail; + let mut cur = *self.tail.get(); while !cur.is_null() { let next = (*cur).next.load(Relaxed); let _: Box> = mem::transmute(cur); From fe93c3d47ed0cdf0a0cbac66a9f35ddb4c6783a2 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 17:32:04 -0700 Subject: [PATCH 04/13] std: Rebuild spsc with Unsafe/&self This removes the incorrect usage of `&mut self` in a concurrent setting. --- src/libstd/sync/spsc_queue.rs | 51 ++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index ed6d690def06a..b9827ee6b2a53 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -40,6 +40,7 @@ use option::{Some, None, Option}; use owned::Box; use ptr::RawPtr; use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; +use ty::Unsafe; // Node within the linked list queue of messages to send struct Node { @@ -56,13 +57,13 @@ struct Node { /// time. pub struct Queue { // consumer fields - tail: *mut Node, // where to pop from + tail: Unsafe<*mut Node>, // where to pop from tail_prev: AtomicPtr>, // where to pop from // producer fields - head: *mut Node, // where to push to - first: *mut Node, // where to get new nodes from - tail_copy: *mut Node, // between first/tail + head: Unsafe<*mut Node>, // where to push to + first: Unsafe<*mut Node>, // where to get new nodes from + tail_copy: Unsafe<*mut Node>, // between first/tail // Cache maintenance fields. Additions and subtractions are stored // separately in order to allow them to use nonatomic addition/subtraction. @@ -101,11 +102,11 @@ impl Queue { let n2 = Node::new(); unsafe { (*n1).next.store(n2, Relaxed) } Queue { - tail: n2, + tail: Unsafe::new(n2), tail_prev: AtomicPtr::new(n1), - head: n2, - first: n1, - tail_copy: n1, + head: Unsafe::new(n2), + first: Unsafe::new(n1), + tail_copy: Unsafe::new(n1), cache_bound: bound, cache_additions: AtomicUint::new(0), cache_subtractions: AtomicUint::new(0), @@ -114,7 +115,7 @@ impl Queue { /// Pushes a new value onto this queue. Note that to use this function /// safely, it must be externally guaranteed that there is only one pusher. - pub fn push(&mut self, t: T) { + pub fn push(&self, t: T) { unsafe { // Acquire a node (which either uses a cached one or allocates a new // one), and then append this to the 'head' node. @@ -122,35 +123,35 @@ impl Queue { assert!((*n).value.is_none()); (*n).value = Some(t); (*n).next.store(0 as *mut Node, Relaxed); - (*self.head).next.store(n, Release); - self.head = n; + (**self.head.get()).next.store(n, Release); + *self.head.get() = n; } } - unsafe fn alloc(&mut self) -> *mut Node { + unsafe fn alloc(&self) -> *mut Node { // First try to see if we can consume the 'first' node for our uses. // We try to avoid as many atomic instructions as possible here, so // the addition to cache_subtractions is not atomic (plus we're the // only one subtracting from the cache). - if self.first != self.tail_copy { + if *self.first.get() != *self.tail_copy.get() { if self.cache_bound > 0 { let b = self.cache_subtractions.load(Relaxed); self.cache_subtractions.store(b + 1, Relaxed); } - let ret = self.first; - self.first = (*ret).next.load(Relaxed); + let ret = *self.first.get(); + *self.first.get() = (*ret).next.load(Relaxed); return ret; } // If the above fails, then update our copy of the tail and try // again. - self.tail_copy = self.tail_prev.load(Acquire); - if self.first != self.tail_copy { + *self.tail_copy.get() = self.tail_prev.load(Acquire); + if *self.first.get() != *self.tail_copy.get() { if self.cache_bound > 0 { let b = self.cache_subtractions.load(Relaxed); self.cache_subtractions.store(b + 1, Relaxed); } - let ret = self.first; - self.first = (*ret).next.load(Relaxed); + let ret = *self.first.get(); + *self.first.get() = (*ret).next.load(Relaxed); return ret; } // If all of that fails, then we have to allocate a new node @@ -160,19 +161,19 @@ impl Queue { /// Attempts to pop a value from this queue. Remember that to use this type /// safely you must ensure that there is only one popper at a time. - pub fn pop(&mut self) -> Option { + pub fn pop(&self) -> Option { unsafe { // The `tail` node is not actually a used node, but rather a // sentinel from where we should start popping from. Hence, look at // tail's next field and see if we can use it. If we do a pop, then // the current tail node is a candidate for going into the cache. - let tail = self.tail; + let tail = *self.tail.get(); let next = (*tail).next.load(Acquire); if next.is_null() { return None } assert!((*next).value.is_some()); let ret = (*next).value.take(); - self.tail = next; + *self.tail.get() = next; if self.cache_bound == 0 { self.tail_prev.store(tail, Release); } else { @@ -197,11 +198,11 @@ impl Queue { /// Attempts to peek at the head of the queue, returning `None` if the queue /// has no data currently - pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> { + pub fn peek<'a>(&'a self) -> Option<&'a mut T> { // This is essentially the same as above with all the popping bits // stripped out. unsafe { - let tail = self.tail; + let tail = *self.tail.get(); let next = (*tail).next.load(Acquire); if next.is_null() { return None } return (*next).value.as_mut(); @@ -213,7 +214,7 @@ impl Queue { impl Drop for Queue { fn drop(&mut self) { unsafe { - let mut cur = self.first; + let mut cur = *self.first.get(); while !cur.is_null() { let next = (*cur).next.load(Relaxed); let _n: Box> = mem::transmute(cur); From 84378b0b5af9ec09ce627fdd59353b408d7f7fb4 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 17:33:40 -0700 Subject: [PATCH 05/13] std: Use Arc instead of UnsafeArc in BlockedTask --- src/libstd/rt/task.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 31a2014530607..8968747d9908e 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -13,6 +13,8 @@ //! local storage, and logging. Even a 'freestanding' Rust would likely want //! to implement this. +use alloc::arc::Arc; + use cleanup; use clone::Clone; use comm::Sender; @@ -32,7 +34,6 @@ use rt::local_heap::LocalHeap; use rt::rtio::LocalIo; use rt::unwind::Unwinder; use str::SendStr; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicUint, SeqCst}; use task::{TaskResult, TaskOpts}; use unstable::finally::Finally; @@ -65,7 +66,7 @@ pub struct LocalStorage(pub Option); /// at any time. pub enum BlockedTask { Owned(Box), - Shared(UnsafeArc), + Shared(Arc), } pub enum DeathAction { @@ -82,7 +83,7 @@ pub struct Death { } pub struct BlockedTasks { - inner: UnsafeArc, + inner: Arc, } impl Task { @@ -313,10 +314,10 @@ impl BlockedTask { pub fn wake(self) -> Option> { match self { Owned(task) => Some(task), - Shared(arc) => unsafe { - match (*arc.get()).swap(0, SeqCst) { + Shared(arc) => { + match arc.swap(0, SeqCst) { 0 => None, - n => Some(mem::transmute(n)), + n => Some(unsafe { mem::transmute(n) }), } } } @@ -343,7 +344,7 @@ impl BlockedTask { let arc = match self { Owned(task) => { let flag = unsafe { AtomicUint::new(mem::transmute(task)) }; - UnsafeArc::new(flag) + Arc::new(flag) } Shared(arc) => arc.clone(), }; @@ -375,7 +376,7 @@ impl BlockedTask { if blocked_task_ptr & 0x1 == 0 { Owned(mem::transmute(blocked_task_ptr)) } else { - let ptr: Box> = + let ptr: Box> = mem::transmute(blocked_task_ptr & !1); Shared(*ptr) } From d49aef7c024dba42fe9e440c06065235fa8a73f7 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 17:36:04 -0700 Subject: [PATCH 06/13] std: Build Exclusive on Arc> This removes the usage of UnsafeArc --- src/libstd/unstable/sync.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs index 5be10fc27dfe5..f0f7e40ce09b8 100644 --- a/src/libstd/unstable/sync.rs +++ b/src/libstd/unstable/sync.rs @@ -8,9 +8,11 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use alloc::arc::Arc; + use clone::Clone; use kinds::Send; -use sync::arc::UnsafeArc; +use ty::Unsafe; use unstable::mutex::NativeMutex; struct ExData { @@ -30,7 +32,7 @@ struct ExData { * need to block or deschedule while accessing shared state, use extra::sync::RWArc. */ pub struct Exclusive { - x: UnsafeArc> + x: Arc>> } impl Clone for Exclusive { @@ -48,7 +50,7 @@ impl Exclusive { data: user_data }; Exclusive { - x: UnsafeArc::new(data) + x: Arc::new(Unsafe::new(data)) } } From 73729e94c87281dd7193dbdc86b4de2963b8fd72 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 17:50:57 -0700 Subject: [PATCH 07/13] std: Move comm primitives away from UnsafeArc They currently still use `&mut self`, this migration was aimed towards moving from UnsafeArc to Arc> --- src/libstd/comm/mod.rs | 49 +++++++++++++++++++---------------- src/libstd/comm/oneshot.rs | 2 +- src/libstd/sync/mpsc_queue.rs | 9 ++++--- src/libstd/sync/spsc_queue.rs | 9 ++++--- 4 files changed, 39 insertions(+), 30 deletions(-) diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index df0c6f3b8d397..fd5b92ba46913 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -271,6 +271,8 @@ // And now that you've seen all the races that I found and attempted to fix, // here's the code for you to find some more! +use alloc::arc::Arc; + use cell::Cell; use clone::Clone; use iter::Iterator; @@ -283,7 +285,6 @@ use owned::Box; use result::{Ok, Err, Result}; use rt::local::Local; use rt::task::{Task, BlockedTask}; -use sync::arc::UnsafeArc; use ty::Unsafe; pub use comm::select::{Select, Handle}; @@ -352,7 +353,7 @@ pub struct Sender { /// The sending-half of Rust's synchronous channel type. This half can only be /// owned by one task, but it can be cloned to send to other tasks. pub struct SyncSender { - inner: UnsafeArc>, + inner: Arc>>, // can't share in an arc marker: marker::NoShare, } @@ -386,10 +387,10 @@ pub enum TrySendError { } enum Flavor { - Oneshot(UnsafeArc>), - Stream(UnsafeArc>), - Shared(UnsafeArc>), - Sync(UnsafeArc>), + Oneshot(Arc>>), + Stream(Arc>>), + Shared(Arc>>), + Sync(Arc>>), } #[doc(hidden)] @@ -435,8 +436,8 @@ impl UnsafeFlavor for Receiver { /// println!("{}", rx.recv()); /// ``` pub fn channel() -> (Sender, Receiver) { - let (a, b) = UnsafeArc::new2(oneshot::Packet::new()); - (Sender::new(Oneshot(b)), Receiver::new(Oneshot(a))) + let a = Arc::new(Unsafe::new(oneshot::Packet::new())); + (Sender::new(Oneshot(a.clone())), Receiver::new(Oneshot(a))) } /// Creates a new synchronous, bounded channel. @@ -471,8 +472,8 @@ pub fn channel() -> (Sender, Receiver) { /// assert_eq!(rx.recv(), 2); /// ``` pub fn sync_channel(bound: uint) -> (SyncSender, Receiver) { - let (a, b) = UnsafeArc::new2(sync::Packet::new(bound)); - (SyncSender::new(a), Receiver::new(Sync(b))) + let a = Arc::new(Unsafe::new(sync::Packet::new(bound))); + (SyncSender::new(a.clone()), Receiver::new(Sync(a))) } //////////////////////////////////////////////////////////////////////////////// @@ -557,13 +558,13 @@ impl Sender { let (new_inner, ret) = match *unsafe { self.inner() } { Oneshot(ref p) => { - let p = p.get(); unsafe { + let p = p.get(); if !(*p).sent() { return (*p).send(t); } else { - let (a, b) = UnsafeArc::new2(stream::Packet::new()); - match (*p).upgrade(Receiver::new(Stream(b))) { + let a = Arc::new(Unsafe::new(stream::Packet::new())); + match (*p).upgrade(Receiver::new(Stream(a.clone()))) { oneshot::UpSuccess => { let ret = (*a.get()).send(t); (a, ret) @@ -598,17 +599,21 @@ impl Clone for Sender { fn clone(&self) -> Sender { let (packet, sleeper) = match *unsafe { self.inner() } { Oneshot(ref p) => { - let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { - oneshot::UpSuccess | oneshot::UpDisconnected => (b, None), - oneshot::UpWoke(task) => (b, Some(task)) + let a = Arc::new(Unsafe::new(shared::Packet::new())); + match unsafe { + (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) + } { + oneshot::UpSuccess | oneshot::UpDisconnected => (a, None), + oneshot::UpWoke(task) => (a, Some(task)) } } Stream(ref p) => { - let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { - stream::UpSuccess | stream::UpDisconnected => (b, None), - stream::UpWoke(task) => (b, Some(task)), + let a = Arc::new(Unsafe::new(shared::Packet::new())); + match unsafe { + (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) + } { + stream::UpSuccess | stream::UpDisconnected => (a, None), + stream::UpWoke(task) => (a, Some(task)), } } Shared(ref p) => { @@ -645,7 +650,7 @@ impl Drop for Sender { //////////////////////////////////////////////////////////////////////////////// impl SyncSender { - fn new(inner: UnsafeArc>) -> SyncSender { + fn new(inner: Arc>>) -> SyncSender { SyncSender { inner: inner, marker: marker::NoShare } } diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs index a7124e50b663e..f9e8fd1e534bf 100644 --- a/src/libstd/comm/oneshot.rs +++ b/src/libstd/comm/oneshot.rs @@ -15,7 +15,7 @@ /// this type is to have one and exactly one allocation when the chan/port pair /// is created. /// -/// Another possible optimization would be to not use an UnsafeArc box because +/// Another possible optimization would be to not use an Arc box because /// in theory we know when the shared packet can be deallocated (no real need /// for the atomic reference counting), but I was having trouble how to destroy /// the data early in a drop of a Port. diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index 23afb9487ec23..f2f95da18425b 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -158,9 +158,10 @@ impl Drop for Queue { mod tests { use prelude::*; + use alloc::arc::Arc; + use native; use super::{Queue, Data, Empty, Inconsistent}; - use sync::arc::UnsafeArc; #[test] fn test_full() { @@ -179,14 +180,14 @@ mod tests { Inconsistent | Data(..) => fail!() } let (tx, rx) = channel(); - let q = UnsafeArc::new(q); + let q = Arc::new(q); for _ in range(0, nthreads) { let tx = tx.clone(); let q = q.clone(); native::task::spawn(proc() { for i in range(0, nmsgs) { - unsafe { (*q.get()).push(i); } + q.push(i); } tx.send(()); }); @@ -194,7 +195,7 @@ mod tests { let mut i = 0u; while i < nthreads * nmsgs { - match unsafe { (*q.get()).pop() } { + match q.pop() { Empty | Inconsistent => {}, Data(_) => { i += 1 } } diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index b9827ee6b2a53..093933c82fc16 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -52,7 +52,7 @@ struct Node { } /// The single-producer single-consumer queue. This structure is not cloneable, -/// but it can be safely shared in an UnsafeArc if it is guaranteed that there +/// but it can be safely shared in an Arc if it is guaranteed that there /// is only one popper and one pusher touching the queue at any one point in /// time. pub struct Queue { @@ -227,9 +227,11 @@ impl Drop for Queue { #[cfg(test)] mod test { use prelude::*; + + use alloc::arc::Arc; use native; + use super::Queue; - use sync::arc::UnsafeArc; #[test] fn smoke() { @@ -274,7 +276,8 @@ mod test { stress_bound(1); fn stress_bound(bound: uint) { - let (a, b) = UnsafeArc::new2(Queue::new(bound)); + let a = Arc::new(Queue::new(bound)); + let b = a.clone(); let (tx, rx) = channel(); native::task::spawn(proc() { for _ in range(0, 100000) { From 4c8a4d241a984fdc0b8a015dceca2a006f2b7146 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 17:51:50 -0700 Subject: [PATCH 08/13] std: Remove UnsafeArc This type has been superseded by Arc>. The UnsafeArc type is a relic of an era that has long since past, and with the introduction of liballoc the standard library is able to use the Arc smart pointer. With little need left for UnsafeArc, it was removed. All existing code using UnsafeArc should either be reevaluated to whether it can use only Arc, or it should transition to Arc> [breaking-change] --- src/libstd/sync/arc.rs | 189 -------------------------- src/libstd/sync/deque.rs | 31 ++--- src/libstd/sync/mod.rs | 1 - src/libstd/sync/mpmc_bounded_queue.rs | 6 +- src/libstd/sync/mpsc_queue.rs | 4 +- src/libstd/sync/spsc_queue.rs | 10 +- 6 files changed, 23 insertions(+), 218 deletions(-) delete mode 100644 src/libstd/sync/arc.rs diff --git a/src/libstd/sync/arc.rs b/src/libstd/sync/arc.rs deleted file mode 100644 index 7dcfe62ffb8a6..0000000000000 --- a/src/libstd/sync/arc.rs +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Atomically reference counted data -//! -//! This modules contains the implementation of an atomically reference counted -//! pointer for the purpose of sharing data between tasks. This is obviously a -//! very unsafe primitive to use, but it has its use cases when implementing -//! concurrent data structures and similar tasks. -//! -//! Great care must be taken to ensure that data races do not arise through the -//! usage of `UnsafeArc`, and this often requires some form of external -//! synchronization. The only guarantee provided to you by this class is that -//! the underlying data will remain valid (not free'd) so long as the reference -//! count is greater than one. - -use clone::Clone; -use iter::Iterator; -use kinds::Send; -use mem; -use ops::Drop; -use owned::Box; -use ptr::RawPtr; -use sync::atomics::{fence, AtomicUint, Relaxed, Acquire, Release}; -use ty::Unsafe; -use vec::Vec; - -/// An atomically reference counted pointer. -/// -/// Enforces no shared-memory safety. -#[unsafe_no_drop_flag] -pub struct UnsafeArc { - data: *mut ArcData, -} - -struct ArcData { - count: AtomicUint, - data: Unsafe, -} - -unsafe fn new_inner(data: T, refcount: uint) -> *mut ArcData { - let data = box ArcData { - count: AtomicUint::new(refcount), - data: Unsafe::new(data) - }; - mem::transmute(data) -} - -impl UnsafeArc { - /// Creates a new `UnsafeArc` which wraps the given data. - pub fn new(data: T) -> UnsafeArc { - unsafe { UnsafeArc { data: new_inner(data, 1) } } - } - - /// As new(), but returns an extra pre-cloned handle. - pub fn new2(data: T) -> (UnsafeArc, UnsafeArc) { - unsafe { - let ptr = new_inner(data, 2); - (UnsafeArc { data: ptr }, UnsafeArc { data: ptr }) - } - } - - /// As new(), but returns a vector of as many pre-cloned handles as - /// requested. - pub fn newN(data: T, num_handles: uint) -> Vec> { - unsafe { - if num_handles == 0 { - vec![] // need to free data here - } else { - let ptr = new_inner(data, num_handles); - let v = Vec::from_fn(num_handles, |_| UnsafeArc { data: ptr }); - v - } - } - } - - /// Gets a pointer to the inner shared data. Note that care must be taken to - /// ensure that the outer `UnsafeArc` does not fall out of scope while this - /// pointer is in use, otherwise it could possibly contain a use-after-free. - #[inline] - pub fn get(&self) -> *mut T { - unsafe { - debug_assert!((*self.data).count.load(Relaxed) > 0); - return (*self.data).data.get(); - } - } - - /// Gets an immutable pointer to the inner shared data. This has the same - /// caveats as the `get` method. - #[inline] - pub fn get_immut(&self) -> *T { - unsafe { - debug_assert!((*self.data).count.load(Relaxed) > 0); - return (*self.data).data.get() as *T; - } - } - - /// checks if this is the only reference to the arc protected data - #[inline] - pub fn is_owned(&self) -> bool { - unsafe { - (*self.data).count.load(Relaxed) == 1 - } - } -} - -impl Clone for UnsafeArc { - fn clone(&self) -> UnsafeArc { - unsafe { - // Using a relaxed ordering is alright here, as knowledge of the original reference - // prevents other threads from erroneously deleting the object. - // - // As explained in the [Boost documentation][1], - // Increasing the reference counter can always be done with memory_order_relaxed: New - // references to an object can only be formed from an existing reference, and passing - // an existing reference from one thread to another must already provide any required - // synchronization. - // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) - let old_count = (*self.data).count.fetch_add(1, Relaxed); - debug_assert!(old_count >= 1); - return UnsafeArc { data: self.data }; - } - } -} - -#[unsafe_destructor] -impl Drop for UnsafeArc{ - fn drop(&mut self) { - unsafe { - // Happens when destructing an unwrapper's handle and from - // `#[unsafe_no_drop_flag]` - if self.data.is_null() { - return - } - // Because `fetch_sub` is already atomic, we do not need to synchronize with other - // threads unless we are going to delete the object. - let old_count = (*self.data).count.fetch_sub(1, Release); - debug_assert!(old_count >= 1); - if old_count == 1 { - // This fence is needed to prevent reordering of use of the data and deletion of - // the data. Because it is marked `Release`, the decreasing of the reference count - // sychronizes with this `Acquire` fence. This means that use of the data happens - // before decreasing the refernce count, which happens before this fence, which - // happens before the deletion of the data. - // - // As explained in the [Boost documentation][1], - // It is important to enforce any possible access to the object in one thread - // (through an existing reference) to *happen before* deleting the object in a - // different thread. This is achieved by a "release" operation after dropping a - // reference (any access to the object through this reference must obviously - // happened before), and an "acquire" operation before deleting the object. - // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) - fence(Acquire); - let _: Box> = mem::transmute(self.data); - } - } - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - use super::UnsafeArc; - use mem::size_of; - - #[test] - fn test_size() { - assert_eq!(size_of::>(), size_of::<*[int, ..10]>()); - } - - #[test] - fn arclike_newN() { - // Tests that the many-refcounts-at-once constructors don't leak. - let _ = UnsafeArc::new2("hello".to_owned().to_owned()); - let x = UnsafeArc::newN("hello".to_owned().to_owned(), 0); - assert_eq!(x.len(), 0) - let x = UnsafeArc::newN("hello".to_owned().to_owned(), 1); - assert_eq!(x.len(), 1) - let x = UnsafeArc::newN("hello".to_owned().to_owned(), 10); - assert_eq!(x.len(), 10) - } -} diff --git a/src/libstd/sync/deque.rs b/src/libstd/sync/deque.rs index 42a8bd886525b..c6446775b0c3e 100644 --- a/src/libstd/sync/deque.rs +++ b/src/libstd/sync/deque.rs @@ -417,8 +417,8 @@ mod tests { #[test] fn smoke() { - let mut pool = BufferPool::new(); - let (mut w, mut s) = pool.deque(); + let pool = BufferPool::new(); + let (w, s) = pool.deque(); assert_eq!(w.pop(), None); assert_eq!(s.steal(), Empty); w.push(1); @@ -432,10 +432,9 @@ mod tests { #[test] fn stealpush() { static AMT: int = 100000; - let mut pool = BufferPool::::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::::new(); + let (w, s) = pool.deque(); let t = Thread::start(proc() { - let mut s = s; let mut left = AMT; while left > 0 { match s.steal() { @@ -458,10 +457,9 @@ mod tests { #[test] fn stealpush_large() { static AMT: int = 100000; - let mut pool = BufferPool::<(int, int)>::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::<(int, int)>::new(); + let (w, s) = pool.deque(); let t = Thread::start(proc() { - let mut s = s; let mut left = AMT; while left > 0 { match s.steal() { @@ -479,7 +477,7 @@ mod tests { t.join(); } - fn stampede(mut w: Worker>, s: Stealer>, + fn stampede(w: Worker>, s: Stealer>, nthreads: int, amt: uint) { for _ in range(0, amt) { w.push(box 20); @@ -491,7 +489,6 @@ mod tests { let s = s.clone(); Thread::start(proc() { unsafe { - let mut s = s; while (*unsafe_remaining).load(SeqCst) > 0 { match s.steal() { Data(box 20) => { @@ -520,7 +517,7 @@ mod tests { #[test] fn run_stampede() { - let mut pool = BufferPool::>::new(); + let pool = BufferPool::>::new(); let (w, s) = pool.deque(); stampede(w, s, 8, 10000); } @@ -528,7 +525,7 @@ mod tests { #[test] fn many_stampede() { static AMT: uint = 4; - let mut pool = BufferPool::>::new(); + let pool = BufferPool::>::new(); let threads = range(0, AMT).map(|_| { let (w, s) = pool.deque(); Thread::start(proc() { @@ -547,14 +544,13 @@ mod tests { static NTHREADS: int = 8; static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; static mut HITS: AtomicUint = INIT_ATOMIC_UINT; - let mut pool = BufferPool::::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::::new(); + let (w, s) = pool.deque(); let threads = range(0, NTHREADS).map(|_| { let s = s.clone(); Thread::start(proc() { unsafe { - let mut s = s; loop { match s.steal() { Data(2) => { HITS.fetch_add(1, SeqCst); } @@ -606,8 +602,8 @@ mod tests { static AMT: int = 10000; static NTHREADS: int = 4; static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; - let mut pool = BufferPool::<(int, uint)>::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::<(int, uint)>::new(); + let (w, s) = pool.deque(); let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| { let s = s.clone(); @@ -617,7 +613,6 @@ mod tests { }; (Thread::start(proc() { unsafe { - let mut s = s; loop { match s.steal() { Data((1, 2)) => { diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs index 3213c538152c6..b2cf427edc812 100644 --- a/src/libstd/sync/mod.rs +++ b/src/libstd/sync/mod.rs @@ -15,7 +15,6 @@ //! and/or blocking at all, but rather provide the necessary tools to build //! other types of concurrent primitives. -pub mod arc; pub mod atomics; pub mod deque; pub mod mpmc_bounded_queue; diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs index 7fb98e140865d..ffad9c1c583d8 100644 --- a/src/libstd/sync/mpmc_bounded_queue.rs +++ b/src/libstd/sync/mpmc_bounded_queue.rs @@ -173,7 +173,7 @@ mod tests { fn test() { let nthreads = 8u; let nmsgs = 1000u; - let mut q = Queue::with_capacity(nthreads*nmsgs); + let q = Queue::with_capacity(nthreads*nmsgs); assert_eq!(None, q.pop()); let (tx, rx) = channel(); @@ -181,7 +181,7 @@ mod tests { let q = q.clone(); let tx = tx.clone(); native::task::spawn(proc() { - let mut q = q; + let q = q; for i in range(0, nmsgs) { assert!(q.push(i)); } @@ -195,7 +195,7 @@ mod tests { completion_rxs.push(rx); let q = q.clone(); native::task::spawn(proc() { - let mut q = q; + let q = q; let mut i = 0u; loop { match q.pop() { diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index f2f95da18425b..4db24e82d3709 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -165,7 +165,7 @@ mod tests { #[test] fn test_full() { - let mut q = Queue::new(); + let q = Queue::new(); q.push(box 1); q.push(box 2); } @@ -174,7 +174,7 @@ mod tests { fn test() { let nthreads = 8u; let nmsgs = 1000u; - let mut q = Queue::new(); + let q = Queue::new(); match q.pop() { Empty => {} Inconsistent | Data(..) => fail!() diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index 093933c82fc16..fb515c9db6e4a 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -235,7 +235,7 @@ mod test { #[test] fn smoke() { - let mut q = Queue::new(0); + let q = Queue::new(0); q.push(1); q.push(2); assert_eq!(q.pop(), Some(1)); @@ -250,14 +250,14 @@ mod test { #[test] fn drop_full() { - let mut q = Queue::new(0); + let q = Queue::new(0); q.push(box 1); q.push(box 2); } #[test] fn smoke_bound() { - let mut q = Queue::new(1); + let q = Queue::new(1); q.push(1); q.push(2); assert_eq!(q.pop(), Some(1)); @@ -282,7 +282,7 @@ mod test { native::task::spawn(proc() { for _ in range(0, 100000) { loop { - match unsafe { (*b.get()).pop() } { + match b.pop() { Some(1) => break, Some(_) => fail!(), None => {} @@ -292,7 +292,7 @@ mod test { tx.send(()); }); for _ in range(0, 100000) { - unsafe { (*a.get()).push(1); } + a.push(1); } rx.recv(); } From 88b322c5fdcdf5b3dc2bb635dd9696a58ec48ea2 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 18:00:52 -0700 Subject: [PATCH 09/13] native: Remove UnsafeArc in favor of just Arc --- src/libnative/io/file_unix.rs | 12 ++++-------- src/libnative/io/file_win32.rs | 16 ++++++---------- src/libnative/io/net.rs | 20 +++++++------------- src/libnative/io/pipe_unix.rs | 14 +++++++------- src/libnative/io/pipe_win32.rs | 10 +++++----- src/libnative/lib.rs | 1 + 6 files changed, 30 insertions(+), 43 deletions(-) diff --git a/src/libnative/io/file_unix.rs b/src/libnative/io/file_unix.rs index 046d2875d5531..5e357ec9cca0f 100644 --- a/src/libnative/io/file_unix.rs +++ b/src/libnative/io/file_unix.rs @@ -10,6 +10,7 @@ //! Blocking posix-based file I/O +use alloc::arc::Arc; use libc::{c_int, c_void}; use libc; use std::c_str::CString; @@ -17,7 +18,6 @@ use std::io::IoError; use std::io; use std::mem; use std::rt::rtio; -use std::sync::arc::UnsafeArc; use io::{IoResult, retry, keep_going}; @@ -29,7 +29,7 @@ struct Inner { } pub struct FileDesc { - inner: UnsafeArc + inner: Arc } impl FileDesc { @@ -42,7 +42,7 @@ impl FileDesc { /// Note that all I/O operations done on this object will be *blocking*, but /// they do not require the runtime to be active. pub fn new(fd: fd_t, close_on_drop: bool) -> FileDesc { - FileDesc { inner: UnsafeArc::new(Inner { + FileDesc { inner: Arc::new(Inner { fd: fd, close_on_drop: close_on_drop }) } @@ -79,11 +79,7 @@ impl FileDesc { } } - pub fn fd(&self) -> fd_t { - // This unsafety is fine because we're just reading off the file - // descriptor, no one is modifying this. - unsafe { (*self.inner.get()).fd } - } + pub fn fd(&self) -> fd_t { self.inner.fd } } impl io::Reader for FileDesc { diff --git a/src/libnative/io/file_win32.rs b/src/libnative/io/file_win32.rs index 3222c912dd085..3cc6cc2f47c0b 100644 --- a/src/libnative/io/file_win32.rs +++ b/src/libnative/io/file_win32.rs @@ -10,17 +10,17 @@ //! Blocking win32-based file I/O +use alloc::arc::Arc; +use libc::{c_int, c_void}; +use libc; use std::c_str::CString; use std::io::IoError; use std::io; -use libc::{c_int, c_void}; -use libc; use std::mem; use std::os::win32::{as_utf16_p, fill_utf16_buf_and_decode}; use std::ptr; use std::rt::rtio; use std::str; -use std::sync::arc::UnsafeArc; use std::vec; use io::IoResult; @@ -33,7 +33,7 @@ struct Inner { } pub struct FileDesc { - inner: UnsafeArc + inner: Arc } impl FileDesc { @@ -46,7 +46,7 @@ impl FileDesc { /// Note that all I/O operations done on this object will be *blocking*, but /// they do not require the runtime to be active. pub fn new(fd: fd_t, close_on_drop: bool) -> FileDesc { - FileDesc { inner: UnsafeArc::new(Inner { + FileDesc { inner: Arc::new(Inner { fd: fd, close_on_drop: close_on_drop }) } @@ -85,11 +85,7 @@ impl FileDesc { Ok(()) } - pub fn fd(&self) -> fd_t { - // This unsafety is fine because we're just reading off the file - // descriptor, no one is modifying this. - unsafe { (*self.inner.get()).fd } - } + pub fn fd(&self) -> fd_t { self.inner.fd } pub fn handle(&self) -> libc::HANDLE { unsafe { libc::get_osfhandle(self.fd()) as libc::HANDLE } diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index 40b66cc526f2a..a67d0439dbf40 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -8,12 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use alloc::arc::Arc; use libc; use std::io::net::ip; use std::io; use std::mem; use std::rt::rtio; -use std::sync::arc::UnsafeArc; use std::unstable::mutex; use super::{IoResult, retry, keep_going}; @@ -235,7 +235,7 @@ pub fn init() { //////////////////////////////////////////////////////////////////////////////// pub struct TcpStream { - inner: UnsafeArc, + inner: Arc, read_deadline: u64, write_deadline: u64, } @@ -282,16 +282,13 @@ impl TcpStream { fn new(inner: Inner) -> TcpStream { TcpStream { - inner: UnsafeArc::new(inner), + inner: Arc::new(inner), read_deadline: 0, write_deadline: 0, } } - pub fn fd(&self) -> sock_t { - // This unsafety is fine because it's just a read-only arc - unsafe { (*self.inner.get()).fd } - } + pub fn fd(&self) -> sock_t { self.inner.fd } fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY, @@ -536,7 +533,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { //////////////////////////////////////////////////////////////////////////////// pub struct UdpSocket { - inner: UnsafeArc, + inner: Arc, read_deadline: u64, write_deadline: u64, } @@ -545,7 +542,7 @@ impl UdpSocket { pub fn bind(addr: ip::SocketAddr) -> IoResult { let fd = try!(socket(addr, libc::SOCK_DGRAM)); let ret = UdpSocket { - inner: UnsafeArc::new(Inner::new(fd)), + inner: Arc::new(Inner::new(fd)), read_deadline: 0, write_deadline: 0, }; @@ -560,10 +557,7 @@ impl UdpSocket { } } - pub fn fd(&self) -> sock_t { - // unsafety is fine because it's just a read-only arc - unsafe { (*self.inner.get()).fd } - } + pub fn fd(&self) -> sock_t { self.inner.fd } pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> { setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST, diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index d66075f44190d..8742fc58af135 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -8,13 +8,13 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use alloc::arc::Arc; use libc; use std::c_str::CString; use std::intrinsics; use std::io; use std::mem; use std::rt::rtio; -use std::sync::arc::UnsafeArc; use std::unstable::mutex; use super::{IoResult, retry}; @@ -108,7 +108,7 @@ fn bind(addr: &CString, ty: libc::c_int) -> IoResult { //////////////////////////////////////////////////////////////////////////////// pub struct UnixStream { - inner: UnsafeArc, + inner: Arc, read_deadline: u64, write_deadline: u64, } @@ -117,11 +117,11 @@ impl UnixStream { pub fn connect(addr: &CString, timeout: Option) -> IoResult { connect(addr, libc::SOCK_STREAM, timeout).map(|inner| { - UnixStream::new(UnsafeArc::new(inner)) + UnixStream::new(Arc::new(inner)) }) } - fn new(inner: UnsafeArc) -> UnixStream { + fn new(inner: Arc) -> UnixStream { UnixStream { inner: inner, read_deadline: 0, @@ -129,7 +129,7 @@ impl UnixStream { } } - fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } } + fn fd(&self) -> fd_t { self.inner.fd } #[cfg(target_os = "linux")] fn lock_nonblocking(&self) {} @@ -138,7 +138,7 @@ impl UnixStream { fn lock_nonblocking<'a>(&'a self) -> net::Guard<'a> { let ret = net::Guard { fd: self.fd(), - guard: unsafe { (*self.inner.get()).lock.lock() }, + guard: self.inner.lock.lock(), }; assert!(util::set_nonblocking(self.fd(), true).is_ok()); ret @@ -254,7 +254,7 @@ impl UnixAcceptor { &mut size as *mut libc::socklen_t) as libc::c_int }) { -1 => Err(super::last_error()), - fd => Ok(UnixStream::new(UnsafeArc::new(Inner::new(fd)))) + fd => Ok(UnixStream::new(Arc::new(Inner::new(fd)))) } } } diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs index af80c7174f21c..c90c3824bbaa3 100644 --- a/src/libnative/io/pipe_win32.rs +++ b/src/libnative/io/pipe_win32.rs @@ -84,6 +84,7 @@ //! the test suite passing (the suite is in libstd), and that's good enough for //! me! +use alloc::arc::Arc; use libc; use std::c_str::CString; use std::intrinsics; @@ -92,7 +93,6 @@ use std::os::win32::as_utf16_p; use std::os; use std::ptr; use std::rt::rtio; -use std::sync::arc::UnsafeArc; use std::sync::atomics; use std::unstable::mutex; @@ -195,7 +195,7 @@ pub fn await(handle: libc::HANDLE, deadline: u64, //////////////////////////////////////////////////////////////////////////////// pub struct UnixStream { - inner: UnsafeArc, + inner: Arc, write: Option, read: Option, read_deadline: u64, @@ -273,7 +273,7 @@ impl UnixStream { Err(super::last_error()) } else { Ok(UnixStream { - inner: UnsafeArc::new(inner), + inner: Arc::new(inner), read: None, write: None, read_deadline: 0, @@ -317,7 +317,7 @@ impl UnixStream { }) } - fn handle(&self) -> libc::HANDLE { unsafe { (*self.inner.get()).handle } } + fn handle(&self) -> libc::HANDLE { self.inner.handle } fn read_closed(&self) -> bool { unsafe { (*self.inner.get()).read_closed.load(atomics::SeqCst) } @@ -683,7 +683,7 @@ impl UnixAcceptor { // Transfer ownership of our handle into this stream Ok(UnixStream { - inner: UnsafeArc::new(Inner::new(handle)), + inner: Arc::new(Inner::new(handle)), read: None, write: None, read_deadline: 0, diff --git a/src/libnative/lib.rs b/src/libnative/lib.rs index 8ba0613336924..634bf1dcedb07 100644 --- a/src/libnative/lib.rs +++ b/src/libnative/lib.rs @@ -57,6 +57,7 @@ // answer is that you don't need them) #![feature(macro_rules)] +extern crate alloc; extern crate libc; use std::os; From 5e10d373b597192f101b52060c95adaa83c48663 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 18:08:33 -0700 Subject: [PATCH 10/13] rustuv: Remove usage of UnsafeArc --- src/librustuv/access.rs | 9 +++++---- src/librustuv/lib.rs | 1 + src/librustuv/queue.rs | 33 +++++++++++++-------------------- src/librustuv/rc.rs | 7 ++++--- 4 files changed, 23 insertions(+), 27 deletions(-) diff --git a/src/librustuv/access.rs b/src/librustuv/access.rs index 81ddc9d32eb2c..63d9aa7ead0a0 100644 --- a/src/librustuv/access.rs +++ b/src/librustuv/access.rs @@ -14,15 +14,16 @@ /// It is assumed that all invocations of this struct happen on the same thread /// (the uv event loop). +use alloc::arc::Arc; use std::mem; use std::rt::local::Local; use std::rt::task::{BlockedTask, Task}; -use std::sync::arc::UnsafeArc; +use std::ty::Unsafe; use homing::HomingMissile; pub struct Access { - inner: UnsafeArc, + inner: Arc>, } pub struct Guard<'a> { @@ -39,11 +40,11 @@ struct Inner { impl Access { pub fn new() -> Access { Access { - inner: UnsafeArc::new(Inner { + inner: Arc::new(Unsafe::new(Inner { queue: vec![], held: false, closed: false, - }) + })) } } diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index a9b449e63be4c..0a6a305a3b780 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -46,6 +46,7 @@ via `close` and `delete` methods. #[cfg(test)] extern crate green; #[cfg(test)] extern crate realrustuv = "rustuv"; extern crate libc; +extern crate alloc; use libc::{c_int, c_void}; use std::fmt; diff --git a/src/librustuv/queue.rs b/src/librustuv/queue.rs index 57b5e7105b20e..98ae865cb1da3 100644 --- a/src/librustuv/queue.rs +++ b/src/librustuv/queue.rs @@ -20,10 +20,10 @@ #![allow(dead_code)] +use alloc::arc::Arc; use libc::c_void; use std::mem; use std::rt::task::BlockedTask; -use std::sync::arc::UnsafeArc; use std::unstable::mutex::NativeMutex; use mpsc = std::sync::mpsc_queue; @@ -46,20 +46,20 @@ struct State { /// This structure is intended to be stored next to the event loop, and it is /// used to create new `Queue` structures. pub struct QueuePool { - queue: UnsafeArc, + queue: Arc, refcnt: uint, } /// This type is used to send messages back to the original event loop. pub struct Queue { - queue: UnsafeArc, + queue: Arc, } extern fn async_cb(handle: *uvll::uv_async_t) { let pool: &mut QueuePool = unsafe { mem::transmute(uvll::get_data_for_uv_handle(handle)) }; - let state: &mut State = unsafe { mem::transmute(pool.queue.get()) }; + let state: &State = &*pool.queue; // Remember that there is no guarantee about how many times an async // callback is called with relation to the number of sends, so process the @@ -109,7 +109,7 @@ extern fn async_cb(handle: *uvll::uv_async_t) { impl QueuePool { pub fn new(loop_: &mut Loop) -> Box { let handle = UvHandle::alloc(None::, uvll::UV_ASYNC); - let state = UnsafeArc::new(State { + let state = Arc::new(State { handle: handle, lock: unsafe {NativeMutex::new()}, queue: mpsc::Queue::new(), @@ -132,24 +132,20 @@ impl QueuePool { pub fn queue(&mut self) -> Queue { unsafe { if self.refcnt == 0 { - uvll::uv_ref((*self.queue.get()).handle); + uvll::uv_ref(self.queue.handle); } self.refcnt += 1; } Queue { queue: self.queue.clone() } } - pub fn handle(&self) -> *uvll::uv_async_t { - unsafe { (*self.queue.get()).handle } - } + pub fn handle(&self) -> *uvll::uv_async_t { self.queue.handle } } impl Queue { pub fn push(&mut self, task: BlockedTask) { - unsafe { - (*self.queue.get()).queue.push(Task(task)); - uvll::uv_async_send((*self.queue.get()).handle); - } + self.queue.queue.push(Task(task)); + unsafe { uvll::uv_async_send(self.queue.handle); } } } @@ -160,9 +156,7 @@ impl Clone for Queue { // that the count is at least one (because we have a queue right here), // and if the queue is dropped later on it'll see the increment for the // decrement anyway. - unsafe { - (*self.queue.get()).queue.push(Increment); - } + self.queue.queue.push(Increment); Queue { queue: self.queue.clone() } } } @@ -172,10 +166,9 @@ impl Drop for Queue { // See the comments in the async_cb function for why there is a lock // that is acquired only on a drop. unsafe { - let state = self.queue.get(); - let _l = (*state).lock.lock(); - (*state).queue.push(Decrement); - uvll::uv_async_send((*state).handle); + let _l = self.queue.lock.lock(); + self.queue.queue.push(Decrement); + uvll::uv_async_send(self.queue.handle); } } } diff --git a/src/librustuv/rc.rs b/src/librustuv/rc.rs index 86c6c44238c06..2a1a6b9f26d47 100644 --- a/src/librustuv/rc.rs +++ b/src/librustuv/rc.rs @@ -16,16 +16,17 @@ /// the same underlying uv object, hence Rc is not used and this simple counter /// should suffice. -use std::sync::arc::UnsafeArc; +use alloc::arc::Arc; +use std::ty::Unsafe; pub struct Refcount { - rc: UnsafeArc, + rc: Arc>, } impl Refcount { /// Creates a new refcount of 1 pub fn new() -> Refcount { - Refcount { rc: UnsafeArc::new(1) } + Refcount { rc: Arc::new(Unsafe::new(1)) } } fn increment(&self) { From 05a453edb3f97aab4c15efdeae238aaea21849a5 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 18:08:44 -0700 Subject: [PATCH 11/13] green: Remove usage of UnsafeArc --- src/libgreen/lib.rs | 13 +++++++------ src/libgreen/message_queue.rs | 22 +++++++++++----------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index eec413635a50a..53e2574df5993 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -214,7 +214,9 @@ #[cfg(test)] extern crate rustuv; extern crate rand; extern crate libc; +extern crate alloc; +use alloc::arc::Arc; use std::mem::replace; use std::os; use std::rt::rtio; @@ -223,7 +225,6 @@ use std::rt; use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT}; use std::sync::deque; use std::task::TaskOpts; -use std::sync::arc::UnsafeArc; use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor}; use sleeper_list::SleeperList; @@ -375,7 +376,7 @@ pub struct SchedPool { /// sending on a channel once the entire pool has been drained of all tasks. #[deriving(Clone)] struct TaskState { - cnt: UnsafeArc, + cnt: Arc, done: Sender<()>, } @@ -537,21 +538,21 @@ impl TaskState { fn new() -> (Receiver<()>, TaskState) { let (tx, rx) = channel(); (rx, TaskState { - cnt: UnsafeArc::new(AtomicUint::new(0)), + cnt: Arc::new(AtomicUint::new(0)), done: tx, }) } fn increment(&mut self) { - unsafe { (*self.cnt.get()).fetch_add(1, SeqCst); } + self.cnt.fetch_add(1, SeqCst); } fn active(&self) -> bool { - unsafe { (*self.cnt.get()).load(SeqCst) != 0 } + self.cnt.load(SeqCst) != 0 } fn decrement(&mut self) { - let prev = unsafe { (*self.cnt.get()).fetch_sub(1, SeqCst) }; + let prev = self.cnt.fetch_sub(1, SeqCst); if prev == 1 { self.done.send(()); } diff --git a/src/libgreen/message_queue.rs b/src/libgreen/message_queue.rs index 50666b8c649bc..99dbf9c8919e6 100644 --- a/src/libgreen/message_queue.rs +++ b/src/libgreen/message_queue.rs @@ -8,8 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use alloc::arc::Arc; use mpsc = std::sync::mpsc_queue; -use std::sync::arc::UnsafeArc; pub enum PopResult { Inconsistent, @@ -18,29 +18,29 @@ pub enum PopResult { } pub fn queue() -> (Consumer, Producer) { - let (a, b) = UnsafeArc::new2(mpsc::Queue::new()); - (Consumer { inner: a }, Producer { inner: b }) + let a = Arc::new(mpsc::Queue::new()); + (Consumer { inner: a.clone() }, Producer { inner: a }) } pub struct Producer { - inner: UnsafeArc>, + inner: Arc>, } pub struct Consumer { - inner: UnsafeArc>, + inner: Arc>, } impl Consumer { - pub fn pop(&mut self) -> PopResult { - match unsafe { (*self.inner.get()).pop() } { + pub fn pop(&self) -> PopResult { + match self.inner.pop() { mpsc::Inconsistent => Inconsistent, mpsc::Empty => Empty, mpsc::Data(t) => Data(t), } } - pub fn casual_pop(&mut self) -> Option { - match unsafe { (*self.inner.get()).pop() } { + pub fn casual_pop(&self) -> Option { + match self.inner.pop() { mpsc::Inconsistent => None, mpsc::Empty => None, mpsc::Data(t) => Some(t), @@ -49,8 +49,8 @@ impl Consumer { } impl Producer { - pub fn push(&mut self, t: T) { - unsafe { (*self.inner.get()).push(t); } + pub fn push(&self, t: T) { + self.inner.push(t); } } From 54f6eacf34c1ec368750051832aba2735fbf0880 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Tue, 20 May 2014 18:51:29 -0700 Subject: [PATCH 12/13] green: Remove some unsafe code in BasicLoop --- src/libgreen/basic.rs | 52 ++++++++++++++++--------------------------- src/libgreen/lib.rs | 2 -- src/libgreen/sched.rs | 2 +- 3 files changed, 20 insertions(+), 36 deletions(-) diff --git a/src/libgreen/basic.rs b/src/libgreen/basic.rs index 9e42e2f67c485..1ebebbe555e8b 100644 --- a/src/libgreen/basic.rs +++ b/src/libgreen/basic.rs @@ -15,6 +15,8 @@ //! This implementation is also used as the fallback implementation of an event //! loop if no other one is provided (and M:N scheduling is desired). +use alloc::arc::Arc; +use std::sync::atomics; use std::mem; use std::rt::rtio::{EventLoop, IoFactory, RemoteCallback}; use std::rt::rtio::{PausableIdleCallback, Callback}; @@ -27,10 +29,11 @@ pub fn event_loop() -> Box { struct BasicLoop { work: Vec, // pending work - idle: Option<*mut BasicPausable>, // only one is allowed remotes: Vec<(uint, Box)>, next_remote: uint, messages: Exclusive>, + idle: Option>, + idle_active: Option>, } enum Message { RunRemote(uint), RemoveRemote(uint) } @@ -40,6 +43,7 @@ impl BasicLoop { BasicLoop { work: vec![], idle: None, + idle_active: None, next_remote: 0, remotes: vec![], messages: Exclusive::new(vec![]), @@ -92,20 +96,18 @@ impl BasicLoop { /// Run the idle callback if one is registered fn idle(&mut self) { - unsafe { - match self.idle { - Some(idle) => { - if (*idle).active { - (*idle).work.call(); - } + match self.idle { + Some(ref mut idle) => { + if self.idle_active.get_ref().load(atomics::SeqCst) { + idle.call(); } - None => {} } + None => {} } } fn has_idle(&self) -> bool { - unsafe { self.idle.is_some() && (**self.idle.get_ref()).active } + self.idle.is_some() && self.idle_active.get_ref().load(atomics::SeqCst) } } @@ -141,13 +143,11 @@ impl EventLoop for BasicLoop { // FIXME: Seems like a really weird requirement to have an event loop provide. fn pausable_idle_callback(&mut self, cb: Box) -> Box { - let callback = box BasicPausable::new(self, cb); rtassert!(self.idle.is_none()); - unsafe { - let cb_ptr: &*mut BasicPausable = mem::transmute(&callback); - self.idle = Some(*cb_ptr); - } - callback as Box + self.idle = Some(cb); + let a = Arc::new(atomics::AtomicBool::new(true)); + self.idle_active = Some(a.clone()); + box BasicPausable { active: a } as Box } fn remote_callback(&mut self, f: Box) @@ -196,35 +196,21 @@ impl Drop for BasicRemote { } struct BasicPausable { - eloop: *mut BasicLoop, - work: Box, - active: bool, -} - -impl BasicPausable { - fn new(eloop: &mut BasicLoop, cb: Box) -> BasicPausable { - BasicPausable { - active: false, - work: cb, - eloop: eloop, - } - } + active: Arc, } impl PausableIdleCallback for BasicPausable { fn pause(&mut self) { - self.active = false; + self.active.store(false, atomics::SeqCst); } fn resume(&mut self) { - self.active = true; + self.active.store(true, atomics::SeqCst); } } impl Drop for BasicPausable { fn drop(&mut self) { - unsafe { - (*self.eloop).idle = None; - } + self.active.store(false, atomics::SeqCst); } } diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index 53e2574df5993..39b6485716315 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -435,7 +435,6 @@ impl SchedPool { pool.sleepers.clone(), pool.task_state.clone()); pool.handles.push(sched.make_handle()); - let sched = sched; pool.threads.push(Thread::start(proc() { sched.bootstrap(); })); } @@ -497,7 +496,6 @@ impl SchedPool { self.task_state.clone()); let ret = sched.make_handle(); self.handles.push(sched.make_handle()); - let sched = sched; self.threads.push(Thread::start(proc() { sched.bootstrap() })); return ret; diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index 7faa9207bbb1f..d28e74a2b80b7 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -1142,7 +1142,7 @@ mod test { Thread::start(proc() { let sleepers = SleeperList::new(); - let mut pool = BufferPool::new(); + let pool = BufferPool::new(); let (normal_worker, normal_stealer) = pool.deque(); let (special_worker, special_stealer) = pool.deque(); let queues = vec![normal_stealer, special_stealer]; From fdf935a5249edd0be0f14385a099963e43c7a29b Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Tue, 20 May 2014 18:54:31 -0700 Subject: [PATCH 13/13] std,green: Mark some queue types as NoShare --- src/libgreen/message_queue.rs | 8 ++++++-- src/libnative/io/net.rs | 4 ++-- src/libnative/io/pipe_unix.rs | 2 +- src/libnative/io/pipe_win32.rs | 16 ++++++++-------- src/libstd/sync/deque.rs | 12 +++++++++--- 5 files changed, 26 insertions(+), 16 deletions(-) diff --git a/src/libgreen/message_queue.rs b/src/libgreen/message_queue.rs index 99dbf9c8919e6..137c493364520 100644 --- a/src/libgreen/message_queue.rs +++ b/src/libgreen/message_queue.rs @@ -10,6 +10,7 @@ use alloc::arc::Arc; use mpsc = std::sync::mpsc_queue; +use std::kinds::marker; pub enum PopResult { Inconsistent, @@ -19,15 +20,18 @@ pub enum PopResult { pub fn queue() -> (Consumer, Producer) { let a = Arc::new(mpsc::Queue::new()); - (Consumer { inner: a.clone() }, Producer { inner: a }) + (Consumer { inner: a.clone(), noshare: marker::NoShare }, + Producer { inner: a, noshare: marker::NoShare }) } pub struct Producer { inner: Arc>, + noshare: marker::NoShare, } pub struct Consumer { inner: Arc>, + noshare: marker::NoShare, } impl Consumer { @@ -56,6 +60,6 @@ impl Producer { impl Clone for Producer { fn clone(&self) -> Producer { - Producer { inner: self.inner.clone() } + Producer { inner: self.inner.clone(), noshare: marker::NoShare } } } diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index a67d0439dbf40..8bd8bc71a49f1 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -326,7 +326,7 @@ impl TcpStream { fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { let ret = Guard { fd: self.fd(), - guard: unsafe { (*self.inner.get()).lock.lock() }, + guard: unsafe { self.inner.lock.lock() }, }; assert!(util::set_nonblocking(self.fd(), true).is_ok()); ret @@ -597,7 +597,7 @@ impl UdpSocket { fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { let ret = Guard { fd: self.fd(), - guard: unsafe { (*self.inner.get()).lock.lock() }, + guard: unsafe { self.inner.lock.lock() }, }; assert!(util::set_nonblocking(self.fd(), true).is_ok()); ret diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index 8742fc58af135..a53a58b6cec43 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -138,7 +138,7 @@ impl UnixStream { fn lock_nonblocking<'a>(&'a self) -> net::Guard<'a> { let ret = net::Guard { fd: self.fd(), - guard: self.inner.lock.lock(), + guard: unsafe { self.inner.lock.lock() }, }; assert!(util::set_nonblocking(self.fd(), true).is_ok()); ret diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs index c90c3824bbaa3..c9dbdc8331bbe 100644 --- a/src/libnative/io/pipe_win32.rs +++ b/src/libnative/io/pipe_win32.rs @@ -320,11 +320,11 @@ impl UnixStream { fn handle(&self) -> libc::HANDLE { self.inner.handle } fn read_closed(&self) -> bool { - unsafe { (*self.inner.get()).read_closed.load(atomics::SeqCst) } + self.inner.read_closed.load(atomics::SeqCst) } fn write_closed(&self) -> bool { - unsafe { (*self.inner.get()).write_closed.load(atomics::SeqCst) } + self.inner.write_closed.load(atomics::SeqCst) } fn cancel_io(&self) -> IoResult<()> { @@ -353,7 +353,7 @@ impl rtio::RtioPipe for UnixStream { // acquire the lock. // // See comments in close_read() about why this lock is necessary. - let guard = unsafe { (*self.inner.get()).lock.lock() }; + let guard = unsafe { self.inner.lock.lock() }; if self.read_closed() { return Err(io::standard_error(io::EndOfFile)) } @@ -429,7 +429,7 @@ impl rtio::RtioPipe for UnixStream { // going after we woke up. // // See comments in close_read() about why this lock is necessary. - let guard = unsafe { (*self.inner.get()).lock.lock() }; + let guard = unsafe { self.inner.lock.lock() }; if self.write_closed() { return Err(io::standard_error(io::BrokenPipe)) } @@ -514,15 +514,15 @@ impl rtio::RtioPipe for UnixStream { // close_read() between steps 1 and 2. By atomically executing steps 1 // and 2 with a lock with respect to close_read(), we're guaranteed that // no thread will erroneously sit in a read forever. - let _guard = unsafe { (*self.inner.get()).lock.lock() }; - unsafe { (*self.inner.get()).read_closed.store(true, atomics::SeqCst) } + let _guard = unsafe { self.inner.lock.lock() }; + self.inner.read_closed.store(true, atomics::SeqCst); self.cancel_io() } fn close_write(&mut self) -> IoResult<()> { // see comments in close_read() for why this lock is necessary - let _guard = unsafe { (*self.inner.get()).lock.lock() }; - unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) } + let _guard = unsafe { self.inner.lock.lock() }; + self.inner.write_closed.store(true, atomics::SeqCst); self.cancel_io() } diff --git a/src/libstd/sync/deque.rs b/src/libstd/sync/deque.rs index c6446775b0c3e..a3fdc4d3eaff7 100644 --- a/src/libstd/sync/deque.rs +++ b/src/libstd/sync/deque.rs @@ -53,16 +53,17 @@ use alloc::arc::Arc; use clone::Clone; use iter::{range, Iterator}; use kinds::Send; +use kinds::marker; use mem::{forget, min_align_of, size_of, transmute}; use ops::Drop; use option::{Option, Some, None}; use owned::Box; use ptr::RawPtr; use ptr; +use rt::heap::{allocate, deallocate}; use slice::ImmutableVector; use sync::atomics::{AtomicInt, AtomicPtr, SeqCst}; use unstable::sync::Exclusive; -use rt::heap::{allocate, deallocate}; use vec::Vec; // Once the queue is less than 1/K full, then it will be downsized. Note that @@ -89,6 +90,7 @@ struct Deque { /// There may only be one worker per deque. pub struct Worker { deque: Arc>, + noshare: marker::NoShare, } /// The stealing half of the work-stealing deque. Stealers have access to the @@ -96,6 +98,7 @@ pub struct Worker { /// `steal` method. pub struct Stealer { deque: Arc>, + noshare: marker::NoShare, } /// When stealing some data, this is an enumeration of the possible outcomes. @@ -153,7 +156,8 @@ impl BufferPool { pub fn deque(&self) -> (Worker, Stealer) { let a = Arc::new(Deque::new(self.clone())); let b = a.clone(); - (Worker { deque: a }, Stealer { deque: b }) + (Worker { deque: a, noshare: marker::NoShare }, + Stealer { deque: b, noshare: marker::NoShare }) } fn alloc(&self, bits: int) -> Box> { @@ -219,7 +223,9 @@ impl Stealer { } impl Clone for Stealer { - fn clone(&self) -> Stealer { Stealer { deque: self.deque.clone() } } + fn clone(&self) -> Stealer { + Stealer { deque: self.deque.clone(), noshare: marker::NoShare } + } } // Almost all of this code can be found directly in the paper so I'm not