diff --git a/src/libgreen/basic.rs b/src/libgreen/basic.rs index 9e42e2f67c485..1ebebbe555e8b 100644 --- a/src/libgreen/basic.rs +++ b/src/libgreen/basic.rs @@ -15,6 +15,8 @@ //! This implementation is also used as the fallback implementation of an event //! loop if no other one is provided (and M:N scheduling is desired). +use alloc::arc::Arc; +use std::sync::atomics; use std::mem; use std::rt::rtio::{EventLoop, IoFactory, RemoteCallback}; use std::rt::rtio::{PausableIdleCallback, Callback}; @@ -27,10 +29,11 @@ pub fn event_loop() -> Box { struct BasicLoop { work: Vec, // pending work - idle: Option<*mut BasicPausable>, // only one is allowed remotes: Vec<(uint, Box)>, next_remote: uint, messages: Exclusive>, + idle: Option>, + idle_active: Option>, } enum Message { RunRemote(uint), RemoveRemote(uint) } @@ -40,6 +43,7 @@ impl BasicLoop { BasicLoop { work: vec![], idle: None, + idle_active: None, next_remote: 0, remotes: vec![], messages: Exclusive::new(vec![]), @@ -92,20 +96,18 @@ impl BasicLoop { /// Run the idle callback if one is registered fn idle(&mut self) { - unsafe { - match self.idle { - Some(idle) => { - if (*idle).active { - (*idle).work.call(); - } + match self.idle { + Some(ref mut idle) => { + if self.idle_active.get_ref().load(atomics::SeqCst) { + idle.call(); } - None => {} } + None => {} } } fn has_idle(&self) -> bool { - unsafe { self.idle.is_some() && (**self.idle.get_ref()).active } + self.idle.is_some() && self.idle_active.get_ref().load(atomics::SeqCst) } } @@ -141,13 +143,11 @@ impl EventLoop for BasicLoop { // FIXME: Seems like a really weird requirement to have an event loop provide. fn pausable_idle_callback(&mut self, cb: Box) -> Box { - let callback = box BasicPausable::new(self, cb); rtassert!(self.idle.is_none()); - unsafe { - let cb_ptr: &*mut BasicPausable = mem::transmute(&callback); - self.idle = Some(*cb_ptr); - } - callback as Box + self.idle = Some(cb); + let a = Arc::new(atomics::AtomicBool::new(true)); + self.idle_active = Some(a.clone()); + box BasicPausable { active: a } as Box } fn remote_callback(&mut self, f: Box) @@ -196,35 +196,21 @@ impl Drop for BasicRemote { } struct BasicPausable { - eloop: *mut BasicLoop, - work: Box, - active: bool, -} - -impl BasicPausable { - fn new(eloop: &mut BasicLoop, cb: Box) -> BasicPausable { - BasicPausable { - active: false, - work: cb, - eloop: eloop, - } - } + active: Arc, } impl PausableIdleCallback for BasicPausable { fn pause(&mut self) { - self.active = false; + self.active.store(false, atomics::SeqCst); } fn resume(&mut self) { - self.active = true; + self.active.store(true, atomics::SeqCst); } } impl Drop for BasicPausable { fn drop(&mut self) { - unsafe { - (*self.eloop).idle = None; - } + self.active.store(false, atomics::SeqCst); } } diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index eec413635a50a..39b6485716315 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -214,7 +214,9 @@ #[cfg(test)] extern crate rustuv; extern crate rand; extern crate libc; +extern crate alloc; +use alloc::arc::Arc; use std::mem::replace; use std::os; use std::rt::rtio; @@ -223,7 +225,6 @@ use std::rt; use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT}; use std::sync::deque; use std::task::TaskOpts; -use std::sync::arc::UnsafeArc; use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor}; use sleeper_list::SleeperList; @@ -375,7 +376,7 @@ pub struct SchedPool { /// sending on a channel once the entire pool has been drained of all tasks. #[deriving(Clone)] struct TaskState { - cnt: UnsafeArc, + cnt: Arc, done: Sender<()>, } @@ -434,7 +435,6 @@ impl SchedPool { pool.sleepers.clone(), pool.task_state.clone()); pool.handles.push(sched.make_handle()); - let sched = sched; pool.threads.push(Thread::start(proc() { sched.bootstrap(); })); } @@ -496,7 +496,6 @@ impl SchedPool { self.task_state.clone()); let ret = sched.make_handle(); self.handles.push(sched.make_handle()); - let sched = sched; self.threads.push(Thread::start(proc() { sched.bootstrap() })); return ret; @@ -537,21 +536,21 @@ impl TaskState { fn new() -> (Receiver<()>, TaskState) { let (tx, rx) = channel(); (rx, TaskState { - cnt: UnsafeArc::new(AtomicUint::new(0)), + cnt: Arc::new(AtomicUint::new(0)), done: tx, }) } fn increment(&mut self) { - unsafe { (*self.cnt.get()).fetch_add(1, SeqCst); } + self.cnt.fetch_add(1, SeqCst); } fn active(&self) -> bool { - unsafe { (*self.cnt.get()).load(SeqCst) != 0 } + self.cnt.load(SeqCst) != 0 } fn decrement(&mut self) { - let prev = unsafe { (*self.cnt.get()).fetch_sub(1, SeqCst) }; + let prev = self.cnt.fetch_sub(1, SeqCst); if prev == 1 { self.done.send(()); } diff --git a/src/libgreen/message_queue.rs b/src/libgreen/message_queue.rs index 50666b8c649bc..137c493364520 100644 --- a/src/libgreen/message_queue.rs +++ b/src/libgreen/message_queue.rs @@ -8,8 +8,9 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use alloc::arc::Arc; use mpsc = std::sync::mpsc_queue; -use std::sync::arc::UnsafeArc; +use std::kinds::marker; pub enum PopResult { Inconsistent, @@ -18,29 +19,32 @@ pub enum PopResult { } pub fn queue() -> (Consumer, Producer) { - let (a, b) = UnsafeArc::new2(mpsc::Queue::new()); - (Consumer { inner: a }, Producer { inner: b }) + let a = Arc::new(mpsc::Queue::new()); + (Consumer { inner: a.clone(), noshare: marker::NoShare }, + Producer { inner: a, noshare: marker::NoShare }) } pub struct Producer { - inner: UnsafeArc>, + inner: Arc>, + noshare: marker::NoShare, } pub struct Consumer { - inner: UnsafeArc>, + inner: Arc>, + noshare: marker::NoShare, } impl Consumer { - pub fn pop(&mut self) -> PopResult { - match unsafe { (*self.inner.get()).pop() } { + pub fn pop(&self) -> PopResult { + match self.inner.pop() { mpsc::Inconsistent => Inconsistent, mpsc::Empty => Empty, mpsc::Data(t) => Data(t), } } - pub fn casual_pop(&mut self) -> Option { - match unsafe { (*self.inner.get()).pop() } { + pub fn casual_pop(&self) -> Option { + match self.inner.pop() { mpsc::Inconsistent => None, mpsc::Empty => None, mpsc::Data(t) => Some(t), @@ -49,13 +53,13 @@ impl Consumer { } impl Producer { - pub fn push(&mut self, t: T) { - unsafe { (*self.inner.get()).push(t); } + pub fn push(&self, t: T) { + self.inner.push(t); } } impl Clone for Producer { fn clone(&self) -> Producer { - Producer { inner: self.inner.clone() } + Producer { inner: self.inner.clone(), noshare: marker::NoShare } } } diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index 7faa9207bbb1f..d28e74a2b80b7 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -1142,7 +1142,7 @@ mod test { Thread::start(proc() { let sleepers = SleeperList::new(); - let mut pool = BufferPool::new(); + let pool = BufferPool::new(); let (normal_worker, normal_stealer) = pool.deque(); let (special_worker, special_stealer) = pool.deque(); let queues = vec![normal_stealer, special_stealer]; diff --git a/src/libnative/io/file_unix.rs b/src/libnative/io/file_unix.rs index 046d2875d5531..5e357ec9cca0f 100644 --- a/src/libnative/io/file_unix.rs +++ b/src/libnative/io/file_unix.rs @@ -10,6 +10,7 @@ //! Blocking posix-based file I/O +use alloc::arc::Arc; use libc::{c_int, c_void}; use libc; use std::c_str::CString; @@ -17,7 +18,6 @@ use std::io::IoError; use std::io; use std::mem; use std::rt::rtio; -use std::sync::arc::UnsafeArc; use io::{IoResult, retry, keep_going}; @@ -29,7 +29,7 @@ struct Inner { } pub struct FileDesc { - inner: UnsafeArc + inner: Arc } impl FileDesc { @@ -42,7 +42,7 @@ impl FileDesc { /// Note that all I/O operations done on this object will be *blocking*, but /// they do not require the runtime to be active. pub fn new(fd: fd_t, close_on_drop: bool) -> FileDesc { - FileDesc { inner: UnsafeArc::new(Inner { + FileDesc { inner: Arc::new(Inner { fd: fd, close_on_drop: close_on_drop }) } @@ -79,11 +79,7 @@ impl FileDesc { } } - pub fn fd(&self) -> fd_t { - // This unsafety is fine because we're just reading off the file - // descriptor, no one is modifying this. - unsafe { (*self.inner.get()).fd } - } + pub fn fd(&self) -> fd_t { self.inner.fd } } impl io::Reader for FileDesc { diff --git a/src/libnative/io/file_win32.rs b/src/libnative/io/file_win32.rs index 3222c912dd085..3cc6cc2f47c0b 100644 --- a/src/libnative/io/file_win32.rs +++ b/src/libnative/io/file_win32.rs @@ -10,17 +10,17 @@ //! Blocking win32-based file I/O +use alloc::arc::Arc; +use libc::{c_int, c_void}; +use libc; use std::c_str::CString; use std::io::IoError; use std::io; -use libc::{c_int, c_void}; -use libc; use std::mem; use std::os::win32::{as_utf16_p, fill_utf16_buf_and_decode}; use std::ptr; use std::rt::rtio; use std::str; -use std::sync::arc::UnsafeArc; use std::vec; use io::IoResult; @@ -33,7 +33,7 @@ struct Inner { } pub struct FileDesc { - inner: UnsafeArc + inner: Arc } impl FileDesc { @@ -46,7 +46,7 @@ impl FileDesc { /// Note that all I/O operations done on this object will be *blocking*, but /// they do not require the runtime to be active. pub fn new(fd: fd_t, close_on_drop: bool) -> FileDesc { - FileDesc { inner: UnsafeArc::new(Inner { + FileDesc { inner: Arc::new(Inner { fd: fd, close_on_drop: close_on_drop }) } @@ -85,11 +85,7 @@ impl FileDesc { Ok(()) } - pub fn fd(&self) -> fd_t { - // This unsafety is fine because we're just reading off the file - // descriptor, no one is modifying this. - unsafe { (*self.inner.get()).fd } - } + pub fn fd(&self) -> fd_t { self.inner.fd } pub fn handle(&self) -> libc::HANDLE { unsafe { libc::get_osfhandle(self.fd()) as libc::HANDLE } diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index 40b66cc526f2a..8bd8bc71a49f1 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -8,12 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use alloc::arc::Arc; use libc; use std::io::net::ip; use std::io; use std::mem; use std::rt::rtio; -use std::sync::arc::UnsafeArc; use std::unstable::mutex; use super::{IoResult, retry, keep_going}; @@ -235,7 +235,7 @@ pub fn init() { //////////////////////////////////////////////////////////////////////////////// pub struct TcpStream { - inner: UnsafeArc, + inner: Arc, read_deadline: u64, write_deadline: u64, } @@ -282,16 +282,13 @@ impl TcpStream { fn new(inner: Inner) -> TcpStream { TcpStream { - inner: UnsafeArc::new(inner), + inner: Arc::new(inner), read_deadline: 0, write_deadline: 0, } } - pub fn fd(&self) -> sock_t { - // This unsafety is fine because it's just a read-only arc - unsafe { (*self.inner.get()).fd } - } + pub fn fd(&self) -> sock_t { self.inner.fd } fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY, @@ -329,7 +326,7 @@ impl TcpStream { fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { let ret = Guard { fd: self.fd(), - guard: unsafe { (*self.inner.get()).lock.lock() }, + guard: unsafe { self.inner.lock.lock() }, }; assert!(util::set_nonblocking(self.fd(), true).is_ok()); ret @@ -536,7 +533,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { //////////////////////////////////////////////////////////////////////////////// pub struct UdpSocket { - inner: UnsafeArc, + inner: Arc, read_deadline: u64, write_deadline: u64, } @@ -545,7 +542,7 @@ impl UdpSocket { pub fn bind(addr: ip::SocketAddr) -> IoResult { let fd = try!(socket(addr, libc::SOCK_DGRAM)); let ret = UdpSocket { - inner: UnsafeArc::new(Inner::new(fd)), + inner: Arc::new(Inner::new(fd)), read_deadline: 0, write_deadline: 0, }; @@ -560,10 +557,7 @@ impl UdpSocket { } } - pub fn fd(&self) -> sock_t { - // unsafety is fine because it's just a read-only arc - unsafe { (*self.inner.get()).fd } - } + pub fn fd(&self) -> sock_t { self.inner.fd } pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> { setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST, @@ -603,7 +597,7 @@ impl UdpSocket { fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { let ret = Guard { fd: self.fd(), - guard: unsafe { (*self.inner.get()).lock.lock() }, + guard: unsafe { self.inner.lock.lock() }, }; assert!(util::set_nonblocking(self.fd(), true).is_ok()); ret diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index d66075f44190d..a53a58b6cec43 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -8,13 +8,13 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use alloc::arc::Arc; use libc; use std::c_str::CString; use std::intrinsics; use std::io; use std::mem; use std::rt::rtio; -use std::sync::arc::UnsafeArc; use std::unstable::mutex; use super::{IoResult, retry}; @@ -108,7 +108,7 @@ fn bind(addr: &CString, ty: libc::c_int) -> IoResult { //////////////////////////////////////////////////////////////////////////////// pub struct UnixStream { - inner: UnsafeArc, + inner: Arc, read_deadline: u64, write_deadline: u64, } @@ -117,11 +117,11 @@ impl UnixStream { pub fn connect(addr: &CString, timeout: Option) -> IoResult { connect(addr, libc::SOCK_STREAM, timeout).map(|inner| { - UnixStream::new(UnsafeArc::new(inner)) + UnixStream::new(Arc::new(inner)) }) } - fn new(inner: UnsafeArc) -> UnixStream { + fn new(inner: Arc) -> UnixStream { UnixStream { inner: inner, read_deadline: 0, @@ -129,7 +129,7 @@ impl UnixStream { } } - fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } } + fn fd(&self) -> fd_t { self.inner.fd } #[cfg(target_os = "linux")] fn lock_nonblocking(&self) {} @@ -138,7 +138,7 @@ impl UnixStream { fn lock_nonblocking<'a>(&'a self) -> net::Guard<'a> { let ret = net::Guard { fd: self.fd(), - guard: unsafe { (*self.inner.get()).lock.lock() }, + guard: unsafe { self.inner.lock.lock() }, }; assert!(util::set_nonblocking(self.fd(), true).is_ok()); ret @@ -254,7 +254,7 @@ impl UnixAcceptor { &mut size as *mut libc::socklen_t) as libc::c_int }) { -1 => Err(super::last_error()), - fd => Ok(UnixStream::new(UnsafeArc::new(Inner::new(fd)))) + fd => Ok(UnixStream::new(Arc::new(Inner::new(fd)))) } } } diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs index af80c7174f21c..c9dbdc8331bbe 100644 --- a/src/libnative/io/pipe_win32.rs +++ b/src/libnative/io/pipe_win32.rs @@ -84,6 +84,7 @@ //! the test suite passing (the suite is in libstd), and that's good enough for //! me! +use alloc::arc::Arc; use libc; use std::c_str::CString; use std::intrinsics; @@ -92,7 +93,6 @@ use std::os::win32::as_utf16_p; use std::os; use std::ptr; use std::rt::rtio; -use std::sync::arc::UnsafeArc; use std::sync::atomics; use std::unstable::mutex; @@ -195,7 +195,7 @@ pub fn await(handle: libc::HANDLE, deadline: u64, //////////////////////////////////////////////////////////////////////////////// pub struct UnixStream { - inner: UnsafeArc, + inner: Arc, write: Option, read: Option, read_deadline: u64, @@ -273,7 +273,7 @@ impl UnixStream { Err(super::last_error()) } else { Ok(UnixStream { - inner: UnsafeArc::new(inner), + inner: Arc::new(inner), read: None, write: None, read_deadline: 0, @@ -317,14 +317,14 @@ impl UnixStream { }) } - fn handle(&self) -> libc::HANDLE { unsafe { (*self.inner.get()).handle } } + fn handle(&self) -> libc::HANDLE { self.inner.handle } fn read_closed(&self) -> bool { - unsafe { (*self.inner.get()).read_closed.load(atomics::SeqCst) } + self.inner.read_closed.load(atomics::SeqCst) } fn write_closed(&self) -> bool { - unsafe { (*self.inner.get()).write_closed.load(atomics::SeqCst) } + self.inner.write_closed.load(atomics::SeqCst) } fn cancel_io(&self) -> IoResult<()> { @@ -353,7 +353,7 @@ impl rtio::RtioPipe for UnixStream { // acquire the lock. // // See comments in close_read() about why this lock is necessary. - let guard = unsafe { (*self.inner.get()).lock.lock() }; + let guard = unsafe { self.inner.lock.lock() }; if self.read_closed() { return Err(io::standard_error(io::EndOfFile)) } @@ -429,7 +429,7 @@ impl rtio::RtioPipe for UnixStream { // going after we woke up. // // See comments in close_read() about why this lock is necessary. - let guard = unsafe { (*self.inner.get()).lock.lock() }; + let guard = unsafe { self.inner.lock.lock() }; if self.write_closed() { return Err(io::standard_error(io::BrokenPipe)) } @@ -514,15 +514,15 @@ impl rtio::RtioPipe for UnixStream { // close_read() between steps 1 and 2. By atomically executing steps 1 // and 2 with a lock with respect to close_read(), we're guaranteed that // no thread will erroneously sit in a read forever. - let _guard = unsafe { (*self.inner.get()).lock.lock() }; - unsafe { (*self.inner.get()).read_closed.store(true, atomics::SeqCst) } + let _guard = unsafe { self.inner.lock.lock() }; + self.inner.read_closed.store(true, atomics::SeqCst); self.cancel_io() } fn close_write(&mut self) -> IoResult<()> { // see comments in close_read() for why this lock is necessary - let _guard = unsafe { (*self.inner.get()).lock.lock() }; - unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) } + let _guard = unsafe { self.inner.lock.lock() }; + self.inner.write_closed.store(true, atomics::SeqCst); self.cancel_io() } @@ -683,7 +683,7 @@ impl UnixAcceptor { // Transfer ownership of our handle into this stream Ok(UnixStream { - inner: UnsafeArc::new(Inner::new(handle)), + inner: Arc::new(Inner::new(handle)), read: None, write: None, read_deadline: 0, diff --git a/src/libnative/lib.rs b/src/libnative/lib.rs index 8ba0613336924..634bf1dcedb07 100644 --- a/src/libnative/lib.rs +++ b/src/libnative/lib.rs @@ -57,6 +57,7 @@ // answer is that you don't need them) #![feature(macro_rules)] +extern crate alloc; extern crate libc; use std::os; diff --git a/src/librustuv/access.rs b/src/librustuv/access.rs index 81ddc9d32eb2c..63d9aa7ead0a0 100644 --- a/src/librustuv/access.rs +++ b/src/librustuv/access.rs @@ -14,15 +14,16 @@ /// It is assumed that all invocations of this struct happen on the same thread /// (the uv event loop). +use alloc::arc::Arc; use std::mem; use std::rt::local::Local; use std::rt::task::{BlockedTask, Task}; -use std::sync::arc::UnsafeArc; +use std::ty::Unsafe; use homing::HomingMissile; pub struct Access { - inner: UnsafeArc, + inner: Arc>, } pub struct Guard<'a> { @@ -39,11 +40,11 @@ struct Inner { impl Access { pub fn new() -> Access { Access { - inner: UnsafeArc::new(Inner { + inner: Arc::new(Unsafe::new(Inner { queue: vec![], held: false, closed: false, - }) + })) } } diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index a9b449e63be4c..0a6a305a3b780 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -46,6 +46,7 @@ via `close` and `delete` methods. #[cfg(test)] extern crate green; #[cfg(test)] extern crate realrustuv = "rustuv"; extern crate libc; +extern crate alloc; use libc::{c_int, c_void}; use std::fmt; diff --git a/src/librustuv/queue.rs b/src/librustuv/queue.rs index 57b5e7105b20e..98ae865cb1da3 100644 --- a/src/librustuv/queue.rs +++ b/src/librustuv/queue.rs @@ -20,10 +20,10 @@ #![allow(dead_code)] +use alloc::arc::Arc; use libc::c_void; use std::mem; use std::rt::task::BlockedTask; -use std::sync::arc::UnsafeArc; use std::unstable::mutex::NativeMutex; use mpsc = std::sync::mpsc_queue; @@ -46,20 +46,20 @@ struct State { /// This structure is intended to be stored next to the event loop, and it is /// used to create new `Queue` structures. pub struct QueuePool { - queue: UnsafeArc, + queue: Arc, refcnt: uint, } /// This type is used to send messages back to the original event loop. pub struct Queue { - queue: UnsafeArc, + queue: Arc, } extern fn async_cb(handle: *uvll::uv_async_t) { let pool: &mut QueuePool = unsafe { mem::transmute(uvll::get_data_for_uv_handle(handle)) }; - let state: &mut State = unsafe { mem::transmute(pool.queue.get()) }; + let state: &State = &*pool.queue; // Remember that there is no guarantee about how many times an async // callback is called with relation to the number of sends, so process the @@ -109,7 +109,7 @@ extern fn async_cb(handle: *uvll::uv_async_t) { impl QueuePool { pub fn new(loop_: &mut Loop) -> Box { let handle = UvHandle::alloc(None::, uvll::UV_ASYNC); - let state = UnsafeArc::new(State { + let state = Arc::new(State { handle: handle, lock: unsafe {NativeMutex::new()}, queue: mpsc::Queue::new(), @@ -132,24 +132,20 @@ impl QueuePool { pub fn queue(&mut self) -> Queue { unsafe { if self.refcnt == 0 { - uvll::uv_ref((*self.queue.get()).handle); + uvll::uv_ref(self.queue.handle); } self.refcnt += 1; } Queue { queue: self.queue.clone() } } - pub fn handle(&self) -> *uvll::uv_async_t { - unsafe { (*self.queue.get()).handle } - } + pub fn handle(&self) -> *uvll::uv_async_t { self.queue.handle } } impl Queue { pub fn push(&mut self, task: BlockedTask) { - unsafe { - (*self.queue.get()).queue.push(Task(task)); - uvll::uv_async_send((*self.queue.get()).handle); - } + self.queue.queue.push(Task(task)); + unsafe { uvll::uv_async_send(self.queue.handle); } } } @@ -160,9 +156,7 @@ impl Clone for Queue { // that the count is at least one (because we have a queue right here), // and if the queue is dropped later on it'll see the increment for the // decrement anyway. - unsafe { - (*self.queue.get()).queue.push(Increment); - } + self.queue.queue.push(Increment); Queue { queue: self.queue.clone() } } } @@ -172,10 +166,9 @@ impl Drop for Queue { // See the comments in the async_cb function for why there is a lock // that is acquired only on a drop. unsafe { - let state = self.queue.get(); - let _l = (*state).lock.lock(); - (*state).queue.push(Decrement); - uvll::uv_async_send((*state).handle); + let _l = self.queue.lock.lock(); + self.queue.queue.push(Decrement); + uvll::uv_async_send(self.queue.handle); } } } diff --git a/src/librustuv/rc.rs b/src/librustuv/rc.rs index 86c6c44238c06..2a1a6b9f26d47 100644 --- a/src/librustuv/rc.rs +++ b/src/librustuv/rc.rs @@ -16,16 +16,17 @@ /// the same underlying uv object, hence Rc is not used and this simple counter /// should suffice. -use std::sync::arc::UnsafeArc; +use alloc::arc::Arc; +use std::ty::Unsafe; pub struct Refcount { - rc: UnsafeArc, + rc: Arc>, } impl Refcount { /// Creates a new refcount of 1 pub fn new() -> Refcount { - Refcount { rc: UnsafeArc::new(1) } + Refcount { rc: Arc::new(Unsafe::new(1)) } } fn increment(&self) { diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index df0c6f3b8d397..fd5b92ba46913 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -271,6 +271,8 @@ // And now that you've seen all the races that I found and attempted to fix, // here's the code for you to find some more! +use alloc::arc::Arc; + use cell::Cell; use clone::Clone; use iter::Iterator; @@ -283,7 +285,6 @@ use owned::Box; use result::{Ok, Err, Result}; use rt::local::Local; use rt::task::{Task, BlockedTask}; -use sync::arc::UnsafeArc; use ty::Unsafe; pub use comm::select::{Select, Handle}; @@ -352,7 +353,7 @@ pub struct Sender { /// The sending-half of Rust's synchronous channel type. This half can only be /// owned by one task, but it can be cloned to send to other tasks. pub struct SyncSender { - inner: UnsafeArc>, + inner: Arc>>, // can't share in an arc marker: marker::NoShare, } @@ -386,10 +387,10 @@ pub enum TrySendError { } enum Flavor { - Oneshot(UnsafeArc>), - Stream(UnsafeArc>), - Shared(UnsafeArc>), - Sync(UnsafeArc>), + Oneshot(Arc>>), + Stream(Arc>>), + Shared(Arc>>), + Sync(Arc>>), } #[doc(hidden)] @@ -435,8 +436,8 @@ impl UnsafeFlavor for Receiver { /// println!("{}", rx.recv()); /// ``` pub fn channel() -> (Sender, Receiver) { - let (a, b) = UnsafeArc::new2(oneshot::Packet::new()); - (Sender::new(Oneshot(b)), Receiver::new(Oneshot(a))) + let a = Arc::new(Unsafe::new(oneshot::Packet::new())); + (Sender::new(Oneshot(a.clone())), Receiver::new(Oneshot(a))) } /// Creates a new synchronous, bounded channel. @@ -471,8 +472,8 @@ pub fn channel() -> (Sender, Receiver) { /// assert_eq!(rx.recv(), 2); /// ``` pub fn sync_channel(bound: uint) -> (SyncSender, Receiver) { - let (a, b) = UnsafeArc::new2(sync::Packet::new(bound)); - (SyncSender::new(a), Receiver::new(Sync(b))) + let a = Arc::new(Unsafe::new(sync::Packet::new(bound))); + (SyncSender::new(a.clone()), Receiver::new(Sync(a))) } //////////////////////////////////////////////////////////////////////////////// @@ -557,13 +558,13 @@ impl Sender { let (new_inner, ret) = match *unsafe { self.inner() } { Oneshot(ref p) => { - let p = p.get(); unsafe { + let p = p.get(); if !(*p).sent() { return (*p).send(t); } else { - let (a, b) = UnsafeArc::new2(stream::Packet::new()); - match (*p).upgrade(Receiver::new(Stream(b))) { + let a = Arc::new(Unsafe::new(stream::Packet::new())); + match (*p).upgrade(Receiver::new(Stream(a.clone()))) { oneshot::UpSuccess => { let ret = (*a.get()).send(t); (a, ret) @@ -598,17 +599,21 @@ impl Clone for Sender { fn clone(&self) -> Sender { let (packet, sleeper) = match *unsafe { self.inner() } { Oneshot(ref p) => { - let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { - oneshot::UpSuccess | oneshot::UpDisconnected => (b, None), - oneshot::UpWoke(task) => (b, Some(task)) + let a = Arc::new(Unsafe::new(shared::Packet::new())); + match unsafe { + (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) + } { + oneshot::UpSuccess | oneshot::UpDisconnected => (a, None), + oneshot::UpWoke(task) => (a, Some(task)) } } Stream(ref p) => { - let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { - stream::UpSuccess | stream::UpDisconnected => (b, None), - stream::UpWoke(task) => (b, Some(task)), + let a = Arc::new(Unsafe::new(shared::Packet::new())); + match unsafe { + (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) + } { + stream::UpSuccess | stream::UpDisconnected => (a, None), + stream::UpWoke(task) => (a, Some(task)), } } Shared(ref p) => { @@ -645,7 +650,7 @@ impl Drop for Sender { //////////////////////////////////////////////////////////////////////////////// impl SyncSender { - fn new(inner: UnsafeArc>) -> SyncSender { + fn new(inner: Arc>>) -> SyncSender { SyncSender { inner: inner, marker: marker::NoShare } } diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs index a7124e50b663e..f9e8fd1e534bf 100644 --- a/src/libstd/comm/oneshot.rs +++ b/src/libstd/comm/oneshot.rs @@ -15,7 +15,7 @@ /// this type is to have one and exactly one allocation when the chan/port pair /// is created. /// -/// Another possible optimization would be to not use an UnsafeArc box because +/// Another possible optimization would be to not use an Arc box because /// in theory we know when the shared packet can be deallocated (no real need /// for the atomic reference counting), but I was having trouble how to destroy /// the data early in a drop of a Port. diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 31a2014530607..8968747d9908e 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -13,6 +13,8 @@ //! local storage, and logging. Even a 'freestanding' Rust would likely want //! to implement this. +use alloc::arc::Arc; + use cleanup; use clone::Clone; use comm::Sender; @@ -32,7 +34,6 @@ use rt::local_heap::LocalHeap; use rt::rtio::LocalIo; use rt::unwind::Unwinder; use str::SendStr; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicUint, SeqCst}; use task::{TaskResult, TaskOpts}; use unstable::finally::Finally; @@ -65,7 +66,7 @@ pub struct LocalStorage(pub Option); /// at any time. pub enum BlockedTask { Owned(Box), - Shared(UnsafeArc), + Shared(Arc), } pub enum DeathAction { @@ -82,7 +83,7 @@ pub struct Death { } pub struct BlockedTasks { - inner: UnsafeArc, + inner: Arc, } impl Task { @@ -313,10 +314,10 @@ impl BlockedTask { pub fn wake(self) -> Option> { match self { Owned(task) => Some(task), - Shared(arc) => unsafe { - match (*arc.get()).swap(0, SeqCst) { + Shared(arc) => { + match arc.swap(0, SeqCst) { 0 => None, - n => Some(mem::transmute(n)), + n => Some(unsafe { mem::transmute(n) }), } } } @@ -343,7 +344,7 @@ impl BlockedTask { let arc = match self { Owned(task) => { let flag = unsafe { AtomicUint::new(mem::transmute(task)) }; - UnsafeArc::new(flag) + Arc::new(flag) } Shared(arc) => arc.clone(), }; @@ -375,7 +376,7 @@ impl BlockedTask { if blocked_task_ptr & 0x1 == 0 { Owned(mem::transmute(blocked_task_ptr)) } else { - let ptr: Box> = + let ptr: Box> = mem::transmute(blocked_task_ptr & !1); Shared(*ptr) } diff --git a/src/libstd/sync/arc.rs b/src/libstd/sync/arc.rs deleted file mode 100644 index 7dcfe62ffb8a6..0000000000000 --- a/src/libstd/sync/arc.rs +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Atomically reference counted data -//! -//! This modules contains the implementation of an atomically reference counted -//! pointer for the purpose of sharing data between tasks. This is obviously a -//! very unsafe primitive to use, but it has its use cases when implementing -//! concurrent data structures and similar tasks. -//! -//! Great care must be taken to ensure that data races do not arise through the -//! usage of `UnsafeArc`, and this often requires some form of external -//! synchronization. The only guarantee provided to you by this class is that -//! the underlying data will remain valid (not free'd) so long as the reference -//! count is greater than one. - -use clone::Clone; -use iter::Iterator; -use kinds::Send; -use mem; -use ops::Drop; -use owned::Box; -use ptr::RawPtr; -use sync::atomics::{fence, AtomicUint, Relaxed, Acquire, Release}; -use ty::Unsafe; -use vec::Vec; - -/// An atomically reference counted pointer. -/// -/// Enforces no shared-memory safety. -#[unsafe_no_drop_flag] -pub struct UnsafeArc { - data: *mut ArcData, -} - -struct ArcData { - count: AtomicUint, - data: Unsafe, -} - -unsafe fn new_inner(data: T, refcount: uint) -> *mut ArcData { - let data = box ArcData { - count: AtomicUint::new(refcount), - data: Unsafe::new(data) - }; - mem::transmute(data) -} - -impl UnsafeArc { - /// Creates a new `UnsafeArc` which wraps the given data. - pub fn new(data: T) -> UnsafeArc { - unsafe { UnsafeArc { data: new_inner(data, 1) } } - } - - /// As new(), but returns an extra pre-cloned handle. - pub fn new2(data: T) -> (UnsafeArc, UnsafeArc) { - unsafe { - let ptr = new_inner(data, 2); - (UnsafeArc { data: ptr }, UnsafeArc { data: ptr }) - } - } - - /// As new(), but returns a vector of as many pre-cloned handles as - /// requested. - pub fn newN(data: T, num_handles: uint) -> Vec> { - unsafe { - if num_handles == 0 { - vec![] // need to free data here - } else { - let ptr = new_inner(data, num_handles); - let v = Vec::from_fn(num_handles, |_| UnsafeArc { data: ptr }); - v - } - } - } - - /// Gets a pointer to the inner shared data. Note that care must be taken to - /// ensure that the outer `UnsafeArc` does not fall out of scope while this - /// pointer is in use, otherwise it could possibly contain a use-after-free. - #[inline] - pub fn get(&self) -> *mut T { - unsafe { - debug_assert!((*self.data).count.load(Relaxed) > 0); - return (*self.data).data.get(); - } - } - - /// Gets an immutable pointer to the inner shared data. This has the same - /// caveats as the `get` method. - #[inline] - pub fn get_immut(&self) -> *T { - unsafe { - debug_assert!((*self.data).count.load(Relaxed) > 0); - return (*self.data).data.get() as *T; - } - } - - /// checks if this is the only reference to the arc protected data - #[inline] - pub fn is_owned(&self) -> bool { - unsafe { - (*self.data).count.load(Relaxed) == 1 - } - } -} - -impl Clone for UnsafeArc { - fn clone(&self) -> UnsafeArc { - unsafe { - // Using a relaxed ordering is alright here, as knowledge of the original reference - // prevents other threads from erroneously deleting the object. - // - // As explained in the [Boost documentation][1], - // Increasing the reference counter can always be done with memory_order_relaxed: New - // references to an object can only be formed from an existing reference, and passing - // an existing reference from one thread to another must already provide any required - // synchronization. - // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) - let old_count = (*self.data).count.fetch_add(1, Relaxed); - debug_assert!(old_count >= 1); - return UnsafeArc { data: self.data }; - } - } -} - -#[unsafe_destructor] -impl Drop for UnsafeArc{ - fn drop(&mut self) { - unsafe { - // Happens when destructing an unwrapper's handle and from - // `#[unsafe_no_drop_flag]` - if self.data.is_null() { - return - } - // Because `fetch_sub` is already atomic, we do not need to synchronize with other - // threads unless we are going to delete the object. - let old_count = (*self.data).count.fetch_sub(1, Release); - debug_assert!(old_count >= 1); - if old_count == 1 { - // This fence is needed to prevent reordering of use of the data and deletion of - // the data. Because it is marked `Release`, the decreasing of the reference count - // sychronizes with this `Acquire` fence. This means that use of the data happens - // before decreasing the refernce count, which happens before this fence, which - // happens before the deletion of the data. - // - // As explained in the [Boost documentation][1], - // It is important to enforce any possible access to the object in one thread - // (through an existing reference) to *happen before* deleting the object in a - // different thread. This is achieved by a "release" operation after dropping a - // reference (any access to the object through this reference must obviously - // happened before), and an "acquire" operation before deleting the object. - // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) - fence(Acquire); - let _: Box> = mem::transmute(self.data); - } - } - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - use super::UnsafeArc; - use mem::size_of; - - #[test] - fn test_size() { - assert_eq!(size_of::>(), size_of::<*[int, ..10]>()); - } - - #[test] - fn arclike_newN() { - // Tests that the many-refcounts-at-once constructors don't leak. - let _ = UnsafeArc::new2("hello".to_owned().to_owned()); - let x = UnsafeArc::newN("hello".to_owned().to_owned(), 0); - assert_eq!(x.len(), 0) - let x = UnsafeArc::newN("hello".to_owned().to_owned(), 1); - assert_eq!(x.len(), 1) - let x = UnsafeArc::newN("hello".to_owned().to_owned(), 10); - assert_eq!(x.len(), 10) - } -} diff --git a/src/libstd/sync/deque.rs b/src/libstd/sync/deque.rs index 30b95ffb34f69..a3fdc4d3eaff7 100644 --- a/src/libstd/sync/deque.rs +++ b/src/libstd/sync/deque.rs @@ -48,20 +48,22 @@ // FIXME: all atomic operations in this module use a SeqCst ordering. That is // probably overkill +use alloc::arc::Arc; + use clone::Clone; use iter::{range, Iterator}; use kinds::Send; +use kinds::marker; use mem::{forget, min_align_of, size_of, transmute}; use ops::Drop; use option::{Option, Some, None}; use owned::Box; use ptr::RawPtr; use ptr; +use rt::heap::{allocate, deallocate}; use slice::ImmutableVector; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicInt, AtomicPtr, SeqCst}; use unstable::sync::Exclusive; -use rt::heap::{allocate, deallocate}; use vec::Vec; // Once the queue is less than 1/K full, then it will be downsized. Note that @@ -87,14 +89,16 @@ struct Deque { /// /// There may only be one worker per deque. pub struct Worker { - deque: UnsafeArc>, + deque: Arc>, + noshare: marker::NoShare, } /// The stealing half of the work-stealing deque. Stealers have access to the /// opposite end of the deque from the worker, and they only have access to the /// `steal` method. pub struct Stealer { - deque: UnsafeArc>, + deque: Arc>, + noshare: marker::NoShare, } /// When stealing some data, this is an enumeration of the possible outcomes. @@ -149,12 +153,14 @@ impl BufferPool { /// Allocates a new work-stealing deque which will send/receiving memory to /// and from this buffer pool. - pub fn deque(&mut self) -> (Worker, Stealer) { - let (a, b) = UnsafeArc::new2(Deque::new(self.clone())); - (Worker { deque: a }, Stealer { deque: b }) + pub fn deque(&self) -> (Worker, Stealer) { + let a = Arc::new(Deque::new(self.clone())); + let b = a.clone(); + (Worker { deque: a, noshare: marker::NoShare }, + Stealer { deque: b, noshare: marker::NoShare }) } - fn alloc(&mut self, bits: int) -> Box> { + fn alloc(&self, bits: int) -> Box> { unsafe { self.pool.with(|pool| { match pool.iter().position(|x| x.size() >= (1 << bits)) { @@ -165,7 +171,7 @@ impl BufferPool { } } - fn free(&mut self, buf: Box>) { + fn free(&self, buf: Box>) { unsafe { let mut buf = Some(buf); self.pool.with(|pool| { @@ -185,46 +191,48 @@ impl Clone for BufferPool { impl Worker { /// Pushes data onto the front of this work queue. - pub fn push(&mut self, t: T) { - unsafe { (*self.deque.get()).push(t) } + pub fn push(&self, t: T) { + unsafe { self.deque.push(t) } } /// Pops data off the front of the work queue, returning `None` on an empty /// queue. - pub fn pop(&mut self) -> Option { - unsafe { (*self.deque.get()).pop() } + pub fn pop(&self) -> Option { + unsafe { self.deque.pop() } } /// Gets access to the buffer pool that this worker is attached to. This can /// be used to create more deques which share the same buffer pool as this /// deque. - pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool { - unsafe { &mut (*self.deque.get()).pool } + pub fn pool<'a>(&'a self) -> &'a BufferPool { + &self.deque.pool } } impl Stealer { /// Steals work off the end of the queue (opposite of the worker's end) - pub fn steal(&mut self) -> Stolen { - unsafe { (*self.deque.get()).steal() } + pub fn steal(&self) -> Stolen { + unsafe { self.deque.steal() } } /// Gets access to the buffer pool that this stealer is attached to. This /// can be used to create more deques which share the same buffer pool as /// this deque. - pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool { - unsafe { &mut (*self.deque.get()).pool } + pub fn pool<'a>(&'a self) -> &'a BufferPool { + &self.deque.pool } } impl Clone for Stealer { - fn clone(&self) -> Stealer { Stealer { deque: self.deque.clone() } } + fn clone(&self) -> Stealer { + Stealer { deque: self.deque.clone(), noshare: marker::NoShare } + } } // Almost all of this code can be found directly in the paper so I'm not // personally going to heavily comment what's going on here. impl Deque { - fn new(mut pool: BufferPool) -> Deque { + fn new(pool: BufferPool) -> Deque { let buf = pool.alloc(MIN_BITS); Deque { bottom: AtomicInt::new(0), @@ -234,7 +242,7 @@ impl Deque { } } - unsafe fn push(&mut self, data: T) { + unsafe fn push(&self, data: T) { let mut b = self.bottom.load(SeqCst); let t = self.top.load(SeqCst); let mut a = self.array.load(SeqCst); @@ -250,7 +258,7 @@ impl Deque { self.bottom.store(b + 1, SeqCst); } - unsafe fn pop(&mut self) -> Option { + unsafe fn pop(&self) -> Option { let b = self.bottom.load(SeqCst); let a = self.array.load(SeqCst); let b = b - 1; @@ -276,7 +284,7 @@ impl Deque { } } - unsafe fn steal(&mut self) -> Stolen { + unsafe fn steal(&self) -> Stolen { let t = self.top.load(SeqCst); let old = self.array.load(SeqCst); let b = self.bottom.load(SeqCst); @@ -298,7 +306,7 @@ impl Deque { } } - unsafe fn maybe_shrink(&mut self, b: int, t: int) { + unsafe fn maybe_shrink(&self, b: int, t: int) { let a = self.array.load(SeqCst); if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) { self.swap_buffer(b, a, (*a).resize(b, t, -1)); @@ -312,7 +320,7 @@ impl Deque { // after this method has called 'free' on it. The continued usage is simply // a read followed by a forget, but we must make sure that the memory can // continue to be read after we flag this buffer for reclamation. - unsafe fn swap_buffer(&mut self, b: int, old: *mut Buffer, + unsafe fn swap_buffer(&self, b: int, old: *mut Buffer, buf: Buffer) -> *mut Buffer { let newbuf: *mut Buffer = transmute(box buf); self.array.store(newbuf, SeqCst); @@ -373,7 +381,7 @@ impl Buffer { // Unsafe because this unsafely overwrites possibly uninitialized or // initialized data. - unsafe fn put(&mut self, i: int, t: T) { + unsafe fn put(&self, i: int, t: T) { let ptr = self.storage.offset(i & self.mask()); ptr::copy_nonoverlapping_memory(ptr as *mut T, &t as *T, 1); forget(t); @@ -382,7 +390,7 @@ impl Buffer { // Again, unsafe because this has incredibly dubious ownership violations. // It is assumed that this buffer is immediately dropped. unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer { - let mut buf = Buffer::new(self.log_size + delta); + let buf = Buffer::new(self.log_size + delta); for i in range(t, b) { buf.put(i, self.get(i)); } @@ -415,8 +423,8 @@ mod tests { #[test] fn smoke() { - let mut pool = BufferPool::new(); - let (mut w, mut s) = pool.deque(); + let pool = BufferPool::new(); + let (w, s) = pool.deque(); assert_eq!(w.pop(), None); assert_eq!(s.steal(), Empty); w.push(1); @@ -430,10 +438,9 @@ mod tests { #[test] fn stealpush() { static AMT: int = 100000; - let mut pool = BufferPool::::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::::new(); + let (w, s) = pool.deque(); let t = Thread::start(proc() { - let mut s = s; let mut left = AMT; while left > 0 { match s.steal() { @@ -456,10 +463,9 @@ mod tests { #[test] fn stealpush_large() { static AMT: int = 100000; - let mut pool = BufferPool::<(int, int)>::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::<(int, int)>::new(); + let (w, s) = pool.deque(); let t = Thread::start(proc() { - let mut s = s; let mut left = AMT; while left > 0 { match s.steal() { @@ -477,7 +483,7 @@ mod tests { t.join(); } - fn stampede(mut w: Worker>, s: Stealer>, + fn stampede(w: Worker>, s: Stealer>, nthreads: int, amt: uint) { for _ in range(0, amt) { w.push(box 20); @@ -489,7 +495,6 @@ mod tests { let s = s.clone(); Thread::start(proc() { unsafe { - let mut s = s; while (*unsafe_remaining).load(SeqCst) > 0 { match s.steal() { Data(box 20) => { @@ -518,7 +523,7 @@ mod tests { #[test] fn run_stampede() { - let mut pool = BufferPool::>::new(); + let pool = BufferPool::>::new(); let (w, s) = pool.deque(); stampede(w, s, 8, 10000); } @@ -526,7 +531,7 @@ mod tests { #[test] fn many_stampede() { static AMT: uint = 4; - let mut pool = BufferPool::>::new(); + let pool = BufferPool::>::new(); let threads = range(0, AMT).map(|_| { let (w, s) = pool.deque(); Thread::start(proc() { @@ -545,14 +550,13 @@ mod tests { static NTHREADS: int = 8; static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; static mut HITS: AtomicUint = INIT_ATOMIC_UINT; - let mut pool = BufferPool::::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::::new(); + let (w, s) = pool.deque(); let threads = range(0, NTHREADS).map(|_| { let s = s.clone(); Thread::start(proc() { unsafe { - let mut s = s; loop { match s.steal() { Data(2) => { HITS.fetch_add(1, SeqCst); } @@ -604,8 +608,8 @@ mod tests { static AMT: int = 10000; static NTHREADS: int = 4; static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; - let mut pool = BufferPool::<(int, uint)>::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::<(int, uint)>::new(); + let (w, s) = pool.deque(); let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| { let s = s.clone(); @@ -615,7 +619,6 @@ mod tests { }; (Thread::start(proc() { unsafe { - let mut s = s; loop { match s.steal() { Data((1, 2)) => { diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs index 3213c538152c6..b2cf427edc812 100644 --- a/src/libstd/sync/mod.rs +++ b/src/libstd/sync/mod.rs @@ -15,7 +15,6 @@ //! and/or blocking at all, but rather provide the necessary tools to build //! other types of concurrent primitives. -pub mod arc; pub mod atomics; pub mod deque; pub mod mpmc_bounded_queue; diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs index 2df5031b4826c..ffad9c1c583d8 100644 --- a/src/libstd/sync/mpmc_bounded_queue.rs +++ b/src/libstd/sync/mpmc_bounded_queue.rs @@ -29,13 +29,15 @@ // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue +use alloc::arc::Arc; + use clone::Clone; use kinds::Send; use num::next_power_of_two; use option::{Option, Some, None}; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicUint,Relaxed,Release,Acquire}; use vec::Vec; +use ty::Unsafe; struct Node { sequence: AtomicUint, @@ -44,7 +46,7 @@ struct Node { struct State { pad0: [u8, ..64], - buffer: Vec>, + buffer: Vec>>, mask: uint, pad1: [u8, ..64], enqueue_pos: AtomicUint, @@ -54,7 +56,7 @@ struct State { } pub struct Queue { - state: UnsafeArc>, + state: Arc>, } impl State { @@ -70,7 +72,7 @@ impl State { capacity }; let buffer = Vec::from_fn(capacity, |i| { - Node { sequence:AtomicUint::new(i), value: None } + Unsafe::new(Node { sequence:AtomicUint::new(i), value: None }) }); State{ pad0: [0, ..64], @@ -84,19 +86,21 @@ impl State { } } - fn push(&mut self, value: T) -> bool { + fn push(&self, value: T) -> bool { let mask = self.mask; let mut pos = self.enqueue_pos.load(Relaxed); loop { - let node = self.buffer.get_mut(pos & mask); - let seq = node.sequence.load(Acquire); + let node = self.buffer.get(pos & mask); + let seq = unsafe { (*node.get()).sequence.load(Acquire) }; let diff: int = seq as int - pos as int; if diff == 0 { let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed); if enqueue_pos == pos { - node.value = Some(value); - node.sequence.store(pos+1, Release); + unsafe { + (*node.get()).value = Some(value); + (*node.get()).sequence.store(pos+1, Release); + } break } else { pos = enqueue_pos; @@ -110,19 +114,21 @@ impl State { true } - fn pop(&mut self) -> Option { + fn pop(&self) -> Option { let mask = self.mask; let mut pos = self.dequeue_pos.load(Relaxed); loop { - let node = self.buffer.get_mut(pos & mask); - let seq = node.sequence.load(Acquire); + let node = self.buffer.get(pos & mask); + let seq = unsafe { (*node.get()).sequence.load(Acquire) }; let diff: int = seq as int - (pos + 1) as int; if diff == 0 { let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed); if dequeue_pos == pos { - let value = node.value.take(); - node.sequence.store(pos + mask + 1, Release); - return value + unsafe { + let value = (*node.get()).value.take(); + (*node.get()).sequence.store(pos + mask + 1, Release); + return value + } } else { pos = dequeue_pos; } @@ -138,24 +144,22 @@ impl State { impl Queue { pub fn with_capacity(capacity: uint) -> Queue { Queue{ - state: UnsafeArc::new(State::with_capacity(capacity)) + state: Arc::new(State::with_capacity(capacity)) } } - pub fn push(&mut self, value: T) -> bool { - unsafe { (*self.state.get()).push(value) } + pub fn push(&self, value: T) -> bool { + self.state.push(value) } - pub fn pop(&mut self) -> Option { - unsafe { (*self.state.get()).pop() } + pub fn pop(&self) -> Option { + self.state.pop() } } impl Clone for Queue { fn clone(&self) -> Queue { - Queue { - state: self.state.clone() - } + Queue { state: self.state.clone() } } } @@ -169,7 +173,7 @@ mod tests { fn test() { let nthreads = 8u; let nmsgs = 1000u; - let mut q = Queue::with_capacity(nthreads*nmsgs); + let q = Queue::with_capacity(nthreads*nmsgs); assert_eq!(None, q.pop()); let (tx, rx) = channel(); @@ -177,7 +181,7 @@ mod tests { let q = q.clone(); let tx = tx.clone(); native::task::spawn(proc() { - let mut q = q; + let q = q; for i in range(0, nmsgs) { assert!(q.push(i)); } @@ -191,7 +195,7 @@ mod tests { completion_rxs.push(rx); let q = q.clone(); native::task::spawn(proc() { - let mut q = q; + let q = q; let mut i = 0u; loop { match q.pop() { diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index 4cdcd05e9b450..4db24e82d3709 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -45,6 +45,7 @@ use option::{Option, None, Some}; use owned::Box; use ptr::RawPtr; use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; +use ty::Unsafe; /// A result of the `pop` function. pub enum PopResult { @@ -69,7 +70,7 @@ struct Node { /// popper at a time (many pushers are allowed). pub struct Queue { head: AtomicPtr>, - tail: *mut Node, + tail: Unsafe<*mut Node>, } impl Node { @@ -88,12 +89,12 @@ impl Queue { let stub = unsafe { Node::new(None) }; Queue { head: AtomicPtr::new(stub), - tail: stub, + tail: Unsafe::new(stub), } } /// Pushes a new value onto this queue. - pub fn push(&mut self, t: T) { + pub fn push(&self, t: T) { unsafe { let n = Node::new(Some(t)); let prev = self.head.swap(n, AcqRel); @@ -111,13 +112,13 @@ impl Queue { /// /// This inconsistent state means that this queue does indeed have data, but /// it does not currently have access to it at this time. - pub fn pop(&mut self) -> PopResult { + pub fn pop(&self) -> PopResult { unsafe { - let tail = self.tail; + let tail = *self.tail.get(); let next = (*tail).next.load(Acquire); if !next.is_null() { - self.tail = next; + *self.tail.get() = next; assert!((*tail).value.is_none()); assert!((*next).value.is_some()); let ret = (*next).value.take_unwrap(); @@ -131,7 +132,7 @@ impl Queue { /// Attempts to pop data from this queue, but doesn't attempt too hard. This /// will canonicalize inconsistent states to a `None` value. - pub fn casual_pop(&mut self) -> Option { + pub fn casual_pop(&self) -> Option { match self.pop() { Data(t) => Some(t), Empty | Inconsistent => None, @@ -143,7 +144,7 @@ impl Queue { impl Drop for Queue { fn drop(&mut self) { unsafe { - let mut cur = self.tail; + let mut cur = *self.tail.get(); while !cur.is_null() { let next = (*cur).next.load(Relaxed); let _: Box> = mem::transmute(cur); @@ -157,13 +158,14 @@ impl Drop for Queue { mod tests { use prelude::*; + use alloc::arc::Arc; + use native; use super::{Queue, Data, Empty, Inconsistent}; - use sync::arc::UnsafeArc; #[test] fn test_full() { - let mut q = Queue::new(); + let q = Queue::new(); q.push(box 1); q.push(box 2); } @@ -172,20 +174,20 @@ mod tests { fn test() { let nthreads = 8u; let nmsgs = 1000u; - let mut q = Queue::new(); + let q = Queue::new(); match q.pop() { Empty => {} Inconsistent | Data(..) => fail!() } let (tx, rx) = channel(); - let q = UnsafeArc::new(q); + let q = Arc::new(q); for _ in range(0, nthreads) { let tx = tx.clone(); let q = q.clone(); native::task::spawn(proc() { for i in range(0, nmsgs) { - unsafe { (*q.get()).push(i); } + q.push(i); } tx.send(()); }); @@ -193,7 +195,7 @@ mod tests { let mut i = 0u; while i < nthreads * nmsgs { - match unsafe { (*q.get()).pop() } { + match q.pop() { Empty | Inconsistent => {}, Data(_) => { i += 1 } } diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index ed6d690def06a..fb515c9db6e4a 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -40,6 +40,7 @@ use option::{Some, None, Option}; use owned::Box; use ptr::RawPtr; use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; +use ty::Unsafe; // Node within the linked list queue of messages to send struct Node { @@ -51,18 +52,18 @@ struct Node { } /// The single-producer single-consumer queue. This structure is not cloneable, -/// but it can be safely shared in an UnsafeArc if it is guaranteed that there +/// but it can be safely shared in an Arc if it is guaranteed that there /// is only one popper and one pusher touching the queue at any one point in /// time. pub struct Queue { // consumer fields - tail: *mut Node, // where to pop from + tail: Unsafe<*mut Node>, // where to pop from tail_prev: AtomicPtr>, // where to pop from // producer fields - head: *mut Node, // where to push to - first: *mut Node, // where to get new nodes from - tail_copy: *mut Node, // between first/tail + head: Unsafe<*mut Node>, // where to push to + first: Unsafe<*mut Node>, // where to get new nodes from + tail_copy: Unsafe<*mut Node>, // between first/tail // Cache maintenance fields. Additions and subtractions are stored // separately in order to allow them to use nonatomic addition/subtraction. @@ -101,11 +102,11 @@ impl Queue { let n2 = Node::new(); unsafe { (*n1).next.store(n2, Relaxed) } Queue { - tail: n2, + tail: Unsafe::new(n2), tail_prev: AtomicPtr::new(n1), - head: n2, - first: n1, - tail_copy: n1, + head: Unsafe::new(n2), + first: Unsafe::new(n1), + tail_copy: Unsafe::new(n1), cache_bound: bound, cache_additions: AtomicUint::new(0), cache_subtractions: AtomicUint::new(0), @@ -114,7 +115,7 @@ impl Queue { /// Pushes a new value onto this queue. Note that to use this function /// safely, it must be externally guaranteed that there is only one pusher. - pub fn push(&mut self, t: T) { + pub fn push(&self, t: T) { unsafe { // Acquire a node (which either uses a cached one or allocates a new // one), and then append this to the 'head' node. @@ -122,35 +123,35 @@ impl Queue { assert!((*n).value.is_none()); (*n).value = Some(t); (*n).next.store(0 as *mut Node, Relaxed); - (*self.head).next.store(n, Release); - self.head = n; + (**self.head.get()).next.store(n, Release); + *self.head.get() = n; } } - unsafe fn alloc(&mut self) -> *mut Node { + unsafe fn alloc(&self) -> *mut Node { // First try to see if we can consume the 'first' node for our uses. // We try to avoid as many atomic instructions as possible here, so // the addition to cache_subtractions is not atomic (plus we're the // only one subtracting from the cache). - if self.first != self.tail_copy { + if *self.first.get() != *self.tail_copy.get() { if self.cache_bound > 0 { let b = self.cache_subtractions.load(Relaxed); self.cache_subtractions.store(b + 1, Relaxed); } - let ret = self.first; - self.first = (*ret).next.load(Relaxed); + let ret = *self.first.get(); + *self.first.get() = (*ret).next.load(Relaxed); return ret; } // If the above fails, then update our copy of the tail and try // again. - self.tail_copy = self.tail_prev.load(Acquire); - if self.first != self.tail_copy { + *self.tail_copy.get() = self.tail_prev.load(Acquire); + if *self.first.get() != *self.tail_copy.get() { if self.cache_bound > 0 { let b = self.cache_subtractions.load(Relaxed); self.cache_subtractions.store(b + 1, Relaxed); } - let ret = self.first; - self.first = (*ret).next.load(Relaxed); + let ret = *self.first.get(); + *self.first.get() = (*ret).next.load(Relaxed); return ret; } // If all of that fails, then we have to allocate a new node @@ -160,19 +161,19 @@ impl Queue { /// Attempts to pop a value from this queue. Remember that to use this type /// safely you must ensure that there is only one popper at a time. - pub fn pop(&mut self) -> Option { + pub fn pop(&self) -> Option { unsafe { // The `tail` node is not actually a used node, but rather a // sentinel from where we should start popping from. Hence, look at // tail's next field and see if we can use it. If we do a pop, then // the current tail node is a candidate for going into the cache. - let tail = self.tail; + let tail = *self.tail.get(); let next = (*tail).next.load(Acquire); if next.is_null() { return None } assert!((*next).value.is_some()); let ret = (*next).value.take(); - self.tail = next; + *self.tail.get() = next; if self.cache_bound == 0 { self.tail_prev.store(tail, Release); } else { @@ -197,11 +198,11 @@ impl Queue { /// Attempts to peek at the head of the queue, returning `None` if the queue /// has no data currently - pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> { + pub fn peek<'a>(&'a self) -> Option<&'a mut T> { // This is essentially the same as above with all the popping bits // stripped out. unsafe { - let tail = self.tail; + let tail = *self.tail.get(); let next = (*tail).next.load(Acquire); if next.is_null() { return None } return (*next).value.as_mut(); @@ -213,7 +214,7 @@ impl Queue { impl Drop for Queue { fn drop(&mut self) { unsafe { - let mut cur = self.first; + let mut cur = *self.first.get(); while !cur.is_null() { let next = (*cur).next.load(Relaxed); let _n: Box> = mem::transmute(cur); @@ -226,13 +227,15 @@ impl Drop for Queue { #[cfg(test)] mod test { use prelude::*; + + use alloc::arc::Arc; use native; + use super::Queue; - use sync::arc::UnsafeArc; #[test] fn smoke() { - let mut q = Queue::new(0); + let q = Queue::new(0); q.push(1); q.push(2); assert_eq!(q.pop(), Some(1)); @@ -247,14 +250,14 @@ mod test { #[test] fn drop_full() { - let mut q = Queue::new(0); + let q = Queue::new(0); q.push(box 1); q.push(box 2); } #[test] fn smoke_bound() { - let mut q = Queue::new(1); + let q = Queue::new(1); q.push(1); q.push(2); assert_eq!(q.pop(), Some(1)); @@ -273,12 +276,13 @@ mod test { stress_bound(1); fn stress_bound(bound: uint) { - let (a, b) = UnsafeArc::new2(Queue::new(bound)); + let a = Arc::new(Queue::new(bound)); + let b = a.clone(); let (tx, rx) = channel(); native::task::spawn(proc() { for _ in range(0, 100000) { loop { - match unsafe { (*b.get()).pop() } { + match b.pop() { Some(1) => break, Some(_) => fail!(), None => {} @@ -288,7 +292,7 @@ mod test { tx.send(()); }); for _ in range(0, 100000) { - unsafe { (*a.get()).push(1); } + a.push(1); } rx.recv(); } diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs index 5be10fc27dfe5..f0f7e40ce09b8 100644 --- a/src/libstd/unstable/sync.rs +++ b/src/libstd/unstable/sync.rs @@ -8,9 +8,11 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use alloc::arc::Arc; + use clone::Clone; use kinds::Send; -use sync::arc::UnsafeArc; +use ty::Unsafe; use unstable::mutex::NativeMutex; struct ExData { @@ -30,7 +32,7 @@ struct ExData { * need to block or deschedule while accessing shared state, use extra::sync::RWArc. */ pub struct Exclusive { - x: UnsafeArc> + x: Arc>> } impl Clone for Exclusive { @@ -48,7 +50,7 @@ impl Exclusive { data: user_data }; Exclusive { - x: UnsafeArc::new(data) + x: Arc::new(Unsafe::new(data)) } }