diff --git a/src/libstd/rt/args.rs b/src/libstd/rt/args.rs index 43e8096a8b113..7b27161ab5d7c 100644 --- a/src/libstd/rt/args.rs +++ b/src/libstd/rt/args.rs @@ -32,8 +32,8 @@ pub unsafe fn init(argc: int, argv: **u8) { imp::init(argc, argv) } pub unsafe fn init(argc: int, argv: **u8) { realargs::init(argc, argv) } /// One-time global cleanup. -#[cfg(not(test))] pub fn cleanup() { imp::cleanup() } -#[cfg(test)] pub fn cleanup() { realargs::cleanup() } +#[cfg(not(test))] pub unsafe fn cleanup() { imp::cleanup() } +#[cfg(test)] pub unsafe fn cleanup() { realargs::cleanup() } /// Take the global arguments from global storage. #[cfg(not(test))] pub fn take() -> Option<~[~str]> { imp::take() } @@ -74,14 +74,16 @@ mod imp { use vec; static mut global_args_ptr: uint = 0; + static mut lock: Mutex = MUTEX_INIT; pub unsafe fn init(argc: int, argv: **u8) { let args = load_argc_and_argv(argc, argv); put(args); } - pub fn cleanup() { + pub unsafe fn cleanup() { rtassert!(take().is_some()); + lock.destroy(); } pub fn take() -> Option<~[~str]> { @@ -108,7 +110,6 @@ mod imp { } fn with_lock(f: || -> T) -> T { - static mut lock: Mutex = MUTEX_INIT; (|| { unsafe { lock.lock(); diff --git a/src/libstd/rt/local_ptr.rs b/src/libstd/rt/local_ptr.rs index e0e8750e146fc..6355de36d43bb 100644 --- a/src/libstd/rt/local_ptr.rs +++ b/src/libstd/rt/local_ptr.rs @@ -41,27 +41,49 @@ pub static mut RT_TLS_PTR: *mut c_void = 0 as *mut c_void; #[cfg(stage0)] #[cfg(windows)] static mut RT_TLS_KEY: tls::Key = -1; +#[cfg(stage0)] +#[cfg(windows)] +static mut tls_lock: Mutex = MUTEX_INIT; +static mut tls_initialized: bool = false; /// Initialize the TLS key. Other ops will fail if this isn't executed first. #[inline(never)] #[cfg(stage0)] #[cfg(windows)] pub fn init_tls_key() { - static mut lock: Mutex = MUTEX_INIT; - static mut initialized: bool = false; - unsafe { - lock.lock(); - if !initialized { + tls_lock.lock(); + if !tls_initialized { tls::create(&mut RT_TLS_KEY); - initialized = true; + tls_initialized = true; } - lock.unlock(); + tls_lock.unlock(); } } #[cfg(not(stage0), not(windows))] -pub fn init_tls_key() {} +pub fn init_tls_key() { + unsafe { + tls_initialized = true; + } +} + +#[cfg(windows)] +pub unsafe fn cleanup() { + // No real use to acquiring a lock around these operations. All we're + // going to do is destroy the lock anyway which races locking itself. This + // is why the whole function is labeled as 'unsafe' + assert!(tls_initialized); + tls::destroy(RT_TLS_KEY); + tls_lock.destroy(); + tls_initialized = false; +} + +#[cfg(not(windows))] +pub unsafe fn cleanup() { + assert!(tls_initialized); + tls_initialized = false; +} /// Give a pointer to thread-local storage. /// diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 860b65b20c665..79b7dbf2aabf4 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -215,7 +215,8 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int { init(argc, argv); let exit_code = run(main); - cleanup(); + // unsafe is ok b/c we're sure that the runtime is gone + unsafe { cleanup(); } return exit_code; } @@ -228,7 +229,8 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int { pub fn start_on_main_thread(argc: int, argv: **u8, main: proc()) -> int { init(argc, argv); let exit_code = run_on_main_thread(main); - cleanup(); + // unsafe is ok b/c we're sure that the runtime is gone + unsafe { cleanup(); } return exit_code; } @@ -249,8 +251,17 @@ pub fn init(argc: int, argv: **u8) { } /// One-time runtime cleanup. -pub fn cleanup() { +/// +/// This function is unsafe because it performs no checks to ensure that the +/// runtime has completely ceased running. It is the responsibility of the +/// caller to ensure that the runtime is entirely shut down and nothing will be +/// poking around at the internal components. +/// +/// Invoking cleanup while portions of the runtime are still in use may cause +/// undefined behavior. +pub unsafe fn cleanup() { args::cleanup(); + local_ptr::cleanup(); } /// Execute the main function in a scheduler. diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 867d997e98d15..943b76dd1a0ec 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -336,7 +336,7 @@ pub fn spawntask_try(f: proc()) -> Result<(),()> { } /// Spawn a new task in a new scheduler and return a thread handle. -pub fn spawntask_thread(f: proc()) -> Thread { +pub fn spawntask_thread(f: proc()) -> Thread<()> { let f = Cell::new(f); diff --git a/src/libstd/rt/thread.rs b/src/libstd/rt/thread.rs index a0e66d2fd4eb4..9031147f8b139 100644 --- a/src/libstd/rt/thread.rs +++ b/src/libstd/rt/thread.rs @@ -8,13 +8,21 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +//! Native os-thread management +//! +//! This modules contains bindings necessary for managing OS-level threads. +//! These functions operate outside of the rust runtime, creating threads +//! which are not used for scheduling in any way. + #[allow(non_camel_case_types)]; use cast; +use kinds::Send; use libc; use ops::Drop; -use uint; +use option::{Option, Some, None}; use ptr; +use uint; #[cfg(windows)] use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T, @@ -22,112 +30,191 @@ use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T, #[cfg(windows)] type rust_thread = HANDLE; #[cfg(unix)] type rust_thread = libc::pthread_t; +#[cfg(windows)] type rust_thread_return = DWORD; +#[cfg(unix)] type rust_thread_return = *libc::c_void; -pub struct Thread { +type StartFn = extern "C" fn(*libc::c_void) -> rust_thread_return; + +/// This struct represents a native thread's state. This is used to join on an +/// existing thread created in the join-able state. +pub struct Thread { priv native: rust_thread, - priv joined: bool + priv joined: bool, + priv packet: ~Option, } static DEFAULT_STACK_SIZE: libc::size_t = 1024*1024; -#[cfg(windows)] type rust_thread_return = DWORD; -#[cfg(unix)] type rust_thread_return = *libc::c_void; +// This is the starting point of rust os threads. The first thing we do +// is make sure that we don't trigger __morestack (also why this has a +// no_split_stack annotation), and then we extract the main function +// and invoke it. +#[no_split_stack] +extern fn thread_start(main: *libc::c_void) -> rust_thread_return { + use rt::context; + unsafe { + context::record_stack_bounds(0, uint::max_value); + let f: ~proc() = cast::transmute(main); + (*f)(); + cast::transmute(0 as rust_thread_return) + } +} -impl Thread { - - pub fn start(main: proc()) -> Thread { - // This is the starting point of rust os threads. The first thing we do - // is make sure that we don't trigger __morestack (also why this has a - // no_split_stack annotation), and then we extract the main function - // and invoke it. - #[no_split_stack] - extern "C" fn thread_start(trampoline: *libc::c_void) -> rust_thread_return { - use rt::context; - unsafe { - context::record_stack_bounds(0, uint::max_value); - let f: ~proc() = cast::transmute(trampoline); - (*f)(); - } - unsafe { cast::transmute(0 as rust_thread_return) } - } +// There are two impl blocks b/c if T were specified at the top then it's just a +// pain to specify a type parameter on Thread::spawn (which doesn't need the +// type parameter). +impl Thread<()> { + + /// Starts execution of a new OS thread. + /// + /// This function will not wait for the thread to join, but a handle to the + /// thread will be returned. + /// + /// Note that the handle returned is used to acquire the return value of the + /// procedure `main`. The `join` function will wait for the thread to finish + /// and return the value that `main` generated. + /// + /// Also note that the `Thread` returned will *always* wait for the thread + /// to finish executing. This means that even if `join` is not explicitly + /// called, when the `Thread` falls out of scope its destructor will block + /// waiting for the OS thread. + pub fn start(main: proc() -> T) -> Thread { + + // We need the address of the packet to fill in to be stable so when + // `main` fills it in it's still valid, so allocate an extra ~ box to do + // so. + let packet = ~None; + let packet2: *mut Option = unsafe { + *cast::transmute::<&~Option, **mut Option>(&packet) + }; + let main: proc() = proc() unsafe { *packet2 = Some(main()); }; + let native = unsafe { native_thread_create(~main) }; - let native = native_thread_create(thread_start, ~main); Thread { native: native, joined: false, + packet: packet, } } - pub fn join(mut self) { + /// This will spawn a new thread, but it will not wait for the thread to + /// finish, nor is it possible to wait for the thread to finish. + /// + /// This corresponds to creating threads in the 'detached' state on unix + /// systems. Note that platforms may not keep the main program alive even if + /// there are detached thread still running around. + pub fn spawn(main: proc()) { + unsafe { + let handle = native_thread_create(~main); + native_thread_detach(handle); + } + } +} + +impl Thread { + /// Wait for this thread to finish, returning the result of the thread's + /// calculation. + pub fn join(mut self) -> T { assert!(!self.joined); - native_thread_join(self.native); + unsafe { native_thread_join(self.native) }; self.joined = true; + assert!(self.packet.is_some()); + self.packet.take_unwrap() } } -#[cfg(windows)] -fn native_thread_create(thread_start: extern "C" fn(*libc::c_void) -> rust_thread_return, - tramp: ~proc()) -> rust_thread { - unsafe { - let ptr: *mut libc::c_void = cast::transmute(tramp); - CreateThread(ptr::mut_null(), DEFAULT_STACK_SIZE, thread_start, ptr, 0, ptr::mut_null()) +#[unsafe_destructor] +impl Drop for Thread { + fn drop(&mut self) { + // This is required for correctness. If this is not done then the thread + // would fill in a return box which no longer exists. + if !self.joined { + unsafe { native_thread_join(self.native) }; + } } } #[cfg(windows)] -fn native_thread_join(native: rust_thread) { +unsafe fn native_thread_create(p: ~proc()) -> rust_thread { + let arg: *mut libc::c_void = cast::transmute(p); + CreateThread(ptr::mut_null(), DEFAULT_STACK_SIZE, thread_start, + arg, 0, ptr::mut_null()) +} + +#[cfg(windows)] +unsafe fn native_thread_join(native: rust_thread) { use libc::consts::os::extra::INFINITE; - unsafe { WaitForSingleObject(native, INFINITE); } + WaitForSingleObject(native, INFINITE); +} + +#[cfg(windows)] +unsafe fn native_thread_detach(native: rust_thread) { + assert!(libc::CloseHandle(native) != 0); } #[cfg(unix)] -fn native_thread_create(thread_start: extern "C" fn(*libc::c_void) -> rust_thread_return, - tramp: ~proc()) -> rust_thread { +unsafe fn native_thread_create(p: ~proc()) -> rust_thread { use unstable::intrinsics; - let mut native: libc::pthread_t = unsafe { intrinsics::uninit() }; - - unsafe { - use libc::consts::os::posix01::PTHREAD_CREATE_JOINABLE; + use libc::consts::os::posix01::PTHREAD_CREATE_JOINABLE; - let mut attr: libc::pthread_attr_t = intrinsics::uninit(); - assert!(pthread_attr_init(&mut attr) == 0); - assert!(pthread_attr_setstacksize(&mut attr, DEFAULT_STACK_SIZE) == 0); - assert!(pthread_attr_setdetachstate(&mut attr, PTHREAD_CREATE_JOINABLE) == 0); + let mut native: libc::pthread_t = intrinsics::uninit(); + let mut attr: libc::pthread_attr_t = intrinsics::uninit(); + assert_eq!(pthread_attr_init(&mut attr), 0); + assert_eq!(pthread_attr_setstacksize(&mut attr, DEFAULT_STACK_SIZE), 0); + assert_eq!(pthread_attr_setdetachstate(&mut attr, PTHREAD_CREATE_JOINABLE), 0); - let ptr: *libc::c_void = cast::transmute(tramp); - assert!(pthread_create(&mut native, &attr, thread_start, ptr) == 0); - } + let arg: *libc::c_void = cast::transmute(p); + assert_eq!(pthread_create(&mut native, &attr, thread_start, arg), 0); native } #[cfg(unix)] -fn native_thread_join(native: rust_thread) { - unsafe { assert!(pthread_join(native, ptr::null()) == 0) } +unsafe fn native_thread_join(native: rust_thread) { + assert_eq!(pthread_join(native, ptr::null()), 0); } -impl Drop for Thread { - fn drop(&mut self) { - assert!(self.joined); - } +#[cfg(unix)] +fn native_thread_detach(native: rust_thread) { + unsafe { assert_eq!(pthread_detach(native), 0) } } #[cfg(windows)] extern "system" { - fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES, dwStackSize: SIZE_T, - lpStartAddress: extern "C" fn(*libc::c_void) -> rust_thread_return, - lpParameter: LPVOID, dwCreationFlags: DWORD, lpThreadId: LPDWORD) -> HANDLE; + fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES, + dwStackSize: SIZE_T, + lpStartAddress: StartFn, + lpParameter: LPVOID, + dwCreationFlags: DWORD, + lpThreadId: LPDWORD) -> HANDLE; fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD; } #[cfg(unix)] extern { - fn pthread_create(native: *mut libc::pthread_t, attr: *libc::pthread_attr_t, - f: extern "C" fn(*libc::c_void) -> rust_thread_return, + fn pthread_create(native: *mut libc::pthread_t, + attr: *libc::pthread_attr_t, + f: StartFn, value: *libc::c_void) -> libc::c_int; - fn pthread_join(native: libc::pthread_t, value: **libc::c_void) -> libc::c_int; + fn pthread_join(native: libc::pthread_t, + value: **libc::c_void) -> libc::c_int; fn pthread_attr_init(attr: *mut libc::pthread_attr_t) -> libc::c_int; fn pthread_attr_setstacksize(attr: *mut libc::pthread_attr_t, stack_size: libc::size_t) -> libc::c_int; fn pthread_attr_setdetachstate(attr: *mut libc::pthread_attr_t, state: libc::c_int) -> libc::c_int; + fn pthread_detach(thread: libc::pthread_t) -> libc::c_int; +} + +#[cfg(test)] +mod tests { + use super::Thread; + + #[test] + fn smoke() { do Thread::start {}.join(); } + + #[test] + fn data() { assert_eq!(do Thread::start { 1 }.join(), 1); } + + #[test] + fn detached() { do Thread::spawn {} } } diff --git a/src/libstd/rt/thread_local_storage.rs b/src/libstd/rt/thread_local_storage.rs index 8fa64852846a8..62e1b6c50d65f 100644 --- a/src/libstd/rt/thread_local_storage.rs +++ b/src/libstd/rt/thread_local_storage.rs @@ -34,6 +34,11 @@ pub unsafe fn get(key: Key) -> *mut c_void { pthread_getspecific(key) } +#[cfg(unix)] +pub unsafe fn destroy(key: Key) { + assert_eq!(0, pthread_key_delete(key)); +} + #[cfg(target_os="macos")] #[allow(non_camel_case_types)] // foreign type type pthread_key_t = ::libc::c_ulong; @@ -47,6 +52,7 @@ type pthread_key_t = ::libc::c_uint; #[cfg(unix)] extern { fn pthread_key_create(key: *mut pthread_key_t, dtor: *u8) -> c_int; + fn pthread_key_delete(key: pthread_key_t) -> c_int; fn pthread_getspecific(key: pthread_key_t) -> *mut c_void; fn pthread_setspecific(key: pthread_key_t, value: *mut c_void) -> c_int; } @@ -71,9 +77,15 @@ pub unsafe fn get(key: Key) -> *mut c_void { TlsGetValue(key) } +#[cfg(windows)] +pub unsafe fn destroy(key: Key) { + assert!(TlsFree(key) != 0); +} + #[cfg(windows)] extern "system" { fn TlsAlloc() -> DWORD; + fn TlsFree(dwTlsIndex: DWORD) -> BOOL; fn TlsGetValue(dwTlsIndex: DWORD) -> LPVOID; fn TlsSetValue(dwTlsIndex: DWORD, lpTlsvalue: LPVOID) -> BOOL; } diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 6c1c28c980559..198fe596a896e 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -139,7 +139,7 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) { let join_task = do Task::build_child(None) { debug!("running join task"); let thread_port = thread_port_cell.take(); - let thread: Thread = thread_port.recv(); + let thread: Thread<()> = thread_port.recv(); thread.join(); };