diff --git a/src/libcore/task.rs b/src/libcore/task.rs index ec6f98fbf45c3..637a684b8d8f2 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -44,6 +44,7 @@ export get_task; export spawn; export spawn_joinable; export spawn_connected; +export spawn_sched; export connected_fn; export connected_task; export currently_unwinding; @@ -62,10 +63,15 @@ type rust_closure = { #[link_name = "rustrt"] #[abi = "cdecl"] native mod rustrt { + fn rust_get_sched_id() -> sched_id; + fn rust_new_sched(num_threads: c::uintptr_t) -> sched_id; + fn get_task_id() -> task_id; fn rust_get_task() -> *rust_task; fn new_task() -> task_id; + fn rust_new_task_in_sched(id: sched_id) -> task_id; + fn drop_task(task_id: *rust_task); fn get_task_pointer(id: task_id) -> *rust_task; @@ -85,6 +91,7 @@ type rust_task = resource rust_task_ptr(task: *rust_task) { rustrt::drop_task(task); } +type sched_id = int; type task_id = int; /* @@ -111,14 +118,17 @@ Returns: A handle to the new task */ fn spawn(+f: fn~()) -> task { - spawn_inner(f, none) + spawn_inner(f, none, new_task_in_this_sched) } -fn spawn_inner(-f: fn~(), - notify: option>) -> task unsafe { +fn spawn_inner( + -f: fn~(), + notify: option>, + new_task: fn() -> task_id +) -> task unsafe { let closure: *rust_closure = unsafe::reinterpret_cast(ptr::addr_of(f)); #debug("spawn: closure={%x,%x}", (*closure).fnptr, (*closure).envptr); - let id = rustrt::new_task(); + let id = new_task(); // set up notifications if they are enabled. option::may(notify) {|c| @@ -132,6 +142,39 @@ fn spawn_inner(-f: fn~(), ret id; } +fn new_task_in_this_sched() -> task_id { + rustrt::new_task() +} + +fn new_task_in_new_sched(num_threads: uint) -> task_id { + let sched_id = rustrt::rust_new_sched(num_threads); + rustrt::rust_new_task_in_sched(sched_id) +} + +/* +Function: spawn_sched + +Creates a new scheduler and executes a task on it. Tasks subsequently +spawned by that task will also execute on the new scheduler. When +there are no more tasks to execute the scheduler terminates. + +Arguments: + +num_threads - The number of OS threads to dedicate schedule tasks on +f - A unique closure to execute as a task on the new scheduler + +Failure: + +The number of threads must be greater than 0 + +*/ +fn spawn_sched(num_threads: uint, +f: fn~()) -> task { + if num_threads < 1u { + fail "Can not create a scheduler with no threads"; + } + spawn_inner(f, none, bind new_task_in_new_sched(num_threads)) +} + /* Type: joinable_task @@ -142,7 +185,7 @@ type joinable_task = (task, comm::port); fn spawn_joinable(+f: fn~()) -> joinable_task { let notify_port = comm::port(); let notify_chan = comm::chan(notify_port); - let task = spawn_inner(f, some(notify_chan)); + let task = spawn_inner(f, some(notify_chan), new_task_in_this_sched); ret (task, notify_port); /* resource notify_rsrc(data: (comm::chan, @@ -411,6 +454,124 @@ mod tests { _ { fail; } } } + + #[test] + #[should_fail] + #[ignore(cfg(target_os = "win32"))] + fn spawn_sched_no_threads() { + spawn_sched(0u) {|| }; + } + + #[test] + fn spawn_sched_1() { + let po = comm::port(); + let ch = comm::chan(po); + + fn f(i: int, ch: comm::chan<()>) { + let parent_sched_id = rustrt::rust_get_sched_id(); + + spawn_sched(1u) {|| + let child_sched_id = rustrt::rust_get_sched_id(); + assert parent_sched_id != child_sched_id; + + if (i == 0) { + comm::send(ch, ()); + } else { + f(i - 1, ch); + } + }; + + } + f(10, ch); + comm::recv(po); + } + + #[test] + fn spawn_sched_childs_on_same_sched() { + let po = comm::port(); + let ch = comm::chan(po); + + spawn_sched(1u) {|| + let parent_sched_id = rustrt::rust_get_sched_id(); + spawn {|| + let child_sched_id = rustrt::rust_get_sched_id(); + // This should be on the same scheduler + assert parent_sched_id == child_sched_id; + comm::send(ch, ()); + }; + }; + + comm::recv(po); + } + + #[nolink] + native mod rt { + fn rust_dbg_lock_create() -> *ctypes::void; + fn rust_dbg_lock_destroy(lock: *ctypes::void); + fn rust_dbg_lock_lock(lock: *ctypes::void); + fn rust_dbg_lock_unlock(lock: *ctypes::void); + fn rust_dbg_lock_wait(lock: *ctypes::void); + fn rust_dbg_lock_signal(lock: *ctypes::void); + } + + #[test] + fn spawn_sched_blocking() { + + // Testing that a task in one scheduler can block natively + // without affecting other schedulers + iter::repeat(20u) {|| + + let start_po = comm::port(); + let start_ch = comm::chan(start_po); + let fin_po = comm::port(); + let fin_ch = comm::chan(fin_po); + + let lock = rt::rust_dbg_lock_create(); + + spawn_sched(1u) {|| + rt::rust_dbg_lock_lock(lock); + + comm::send(start_ch, ()); + + // Block the scheduler thread + rt::rust_dbg_lock_wait(lock); + rt::rust_dbg_lock_unlock(lock); + + comm::send(fin_ch, ()); + }; + + // Wait until the other task has its lock + comm::recv(start_po); + + fn pingpong(po: comm::port, ch: comm::chan) { + let val = 20; + while val > 0 { + val = comm::recv(po); + comm::send(ch, val - 1); + } + } + + let setup_po = comm::port(); + let setup_ch = comm::chan(setup_po); + let parent_po = comm::port(); + let parent_ch = comm::chan(parent_po); + spawn {|| + let child_po = comm::port(); + comm::send(setup_ch, comm::chan(child_po)); + pingpong(child_po, parent_ch); + }; + + let child_ch = comm::recv(setup_po); + comm::send(child_ch, 20); + pingpong(parent_po, child_ch); + rt::rust_dbg_lock_lock(lock); + rt::rust_dbg_lock_signal(lock); + rt::rust_dbg_lock_unlock(lock); + comm::recv(fin_po); + rt::rust_dbg_lock_destroy(lock); + } + } + } diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index dacbcee1045f1..121f3d4bcfc8b 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -78,8 +78,9 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { check_claims = env->check_claims; rust_srv *srv = new rust_srv(env); - rust_kernel *kernel = new rust_kernel(srv, env->num_sched_threads); - rust_scheduler *sched = kernel->get_default_scheduler(); + rust_kernel *kernel = new rust_kernel(srv); + rust_sched_id sched_id = kernel->create_scheduler(env->num_sched_threads); + rust_scheduler *sched = kernel->get_scheduler_by_id(sched_id); rust_task_id root_id = sched->create_task(NULL, "main", MAIN_STACK_SIZE); rust_task *root_task = kernel->get_task_by_id(root_id); I(kernel, root_task != NULL); @@ -98,7 +99,7 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { root_task->deref(); root_task = NULL; - int ret = kernel->start_schedulers(); + int ret = kernel->wait_for_schedulers(); delete args; delete kernel; delete srv; diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 76f80192e89b0..192951675189d 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -431,16 +431,43 @@ nano_time(uint64_t *ns) { *ns = t.time_ns(); } +extern "C" CDECL rust_sched_id +rust_get_sched_id() { + rust_task *task = rust_task_thread::get_task(); + return task->sched->get_id(); +} + +extern "C" CDECL rust_sched_id +rust_new_sched(uintptr_t threads) { + rust_task *task = rust_task_thread::get_task(); + A(task->thread, threads > 0, + "Can't create a scheduler with no threads, silly!"); + return task->kernel->create_scheduler(threads); +} + extern "C" CDECL rust_task_id get_task_id() { rust_task *task = rust_task_thread::get_task(); return task->user.id; } +static rust_task_id +new_task_common(rust_scheduler *sched, rust_task *parent) { + return sched->create_task(parent, NULL); +} + extern "C" CDECL rust_task_id new_task() { rust_task *task = rust_task_thread::get_task(); - return task->sched->create_task(task, NULL); + return new_task_common(task->sched, task); +} + +extern "C" CDECL rust_task_id +rust_new_task_in_sched(rust_sched_id id) { + rust_task *task = rust_task_thread::get_task(); + rust_scheduler *sched = task->kernel->get_scheduler_by_id(id); + // FIXME: What if we didn't get the scheduler? + return new_task_common(sched, task); } extern "C" CDECL void @@ -599,6 +626,46 @@ rust_log_console_off() { log_console_off(task->kernel->env); } +extern "C" CDECL lock_and_signal * +rust_dbg_lock_create() { + return new lock_and_signal(); +} + +extern "C" CDECL void +rust_dbg_lock_destroy(lock_and_signal *lock) { + rust_task *task = rust_task_thread::get_task(); + I(task->thread, lock); + delete lock; +} + +extern "C" CDECL void +rust_dbg_lock_lock(lock_and_signal *lock) { + rust_task *task = rust_task_thread::get_task(); + I(task->thread, lock); + lock->lock(); +} + +extern "C" CDECL void +rust_dbg_lock_unlock(lock_and_signal *lock) { + rust_task *task = rust_task_thread::get_task(); + I(task->thread, lock); + lock->unlock(); +} + +extern "C" CDECL void +rust_dbg_lock_wait(lock_and_signal *lock) { + rust_task *task = rust_task_thread::get_task(); + I(task->thread, lock); + lock->wait(); +} + +extern "C" CDECL void +rust_dbg_lock_signal(lock_and_signal *lock) { + rust_task *task = rust_task_thread::get_task(); + I(task->thread, lock); + lock->signal(); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_internal.h b/src/rt/rust_internal.h index beb772edbe249..b6bd20fc98bd8 100644 --- a/src/rt/rust_internal.h +++ b/src/rt/rust_internal.h @@ -61,6 +61,7 @@ struct stk_seg; struct type_desc; struct frame_glue_fns; +typedef intptr_t rust_sched_id; typedef intptr_t rust_task_id; typedef intptr_t rust_port_id; @@ -103,15 +104,14 @@ static size_t const BUF_BYTES = 2048; void ref() { ++ref_count; } \ void deref() { if (--ref_count == 0) { dtor; } } -#define RUST_ATOMIC_REFCOUNT() \ - private: \ - intptr_t ref_count; \ -public: \ - void ref() { \ - intptr_t old = sync::increment(ref_count); \ - assert(old > 0); \ - } \ - void deref() { if(0 == sync::decrement(ref_count)) { delete this; } } +#define RUST_ATOMIC_REFCOUNT() \ +public: \ + intptr_t ref_count; \ + void ref() { \ + intptr_t old = sync::increment(ref_count); \ + assert(old > 0); \ + } \ + void deref() { if(0 == sync::decrement(ref_count)) { delete_this(); } } template struct task_owned { inline void *operator new(size_t size, rust_task *task, const char *tag); diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index f8e7e41971e62..d014877c2786c 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -1,3 +1,4 @@ +#include #include "rust_internal.h" #include "rust_util.h" #include "rust_scheduler.h" @@ -7,17 +8,17 @@ #define KLOG_ERR_(field, ...) \ KLOG_LVL(this, field, log_err, __VA_ARGS__) -rust_kernel::rust_kernel(rust_srv *srv, size_t num_threads) : +rust_kernel::rust_kernel(rust_srv *srv) : _region(srv, true), _log(srv, NULL), srv(srv), live_tasks(0), max_task_id(0), rval(0), + live_schedulers(0), + max_sched_id(0), env(srv->env) { - sched = new (this, "rust_scheduler") - rust_scheduler(this, srv, num_threads); } void @@ -41,10 +42,6 @@ rust_kernel::fatal(char const *fmt, ...) { va_end(args); } -rust_kernel::~rust_kernel() { - delete sched; -} - void * rust_kernel::malloc(size_t size, const char *tag) { return _region.malloc(size, tag); @@ -59,17 +56,67 @@ void rust_kernel::free(void *mem) { _region.free(mem); } -int rust_kernel::start_schedulers() -{ +rust_sched_id +rust_kernel::create_scheduler(size_t num_threads) { + I(this, !sched_lock.lock_held_by_current_thread()); + rust_sched_id id; + rust_scheduler *sched; + { + scoped_lock with(sched_lock); + id = max_sched_id++; + K(srv, id != INTPTR_MAX, "Hit the maximum scheduler id"); + sched = new (this, "rust_scheduler") + rust_scheduler(this, srv, num_threads, id); + bool is_new = sched_table + .insert(std::pair(id, sched)).second; + A(this, is_new, "Reusing a sched id?"); + live_schedulers++; + } sched->start_task_threads(); - return rval; + return id; } rust_scheduler * -rust_kernel::get_default_scheduler() { - return sched; +rust_kernel::get_scheduler_by_id(rust_sched_id id) { + I(this, !sched_lock.lock_held_by_current_thread()); + scoped_lock with(sched_lock); + sched_map::iterator iter = sched_table.find(id); + if (iter != sched_table.end()) { + return iter->second; + } else { + return NULL; + } +} + +void +rust_kernel::release_scheduler_id(rust_sched_id id) { + I(this, !sched_lock.lock_held_by_current_thread()); + scoped_lock with(sched_lock); + sched_map::iterator iter = sched_table.find(id); + I(this, iter != sched_table.end()); + rust_scheduler *sched = iter->second; + sched_table.erase(iter); + delete sched; + live_schedulers--; + if (live_schedulers == 0) { + // We're all done. Tell the main thread to continue + sched_lock.signal(); + } +} + +int +rust_kernel::wait_for_schedulers() +{ + I(this, !sched_lock.lock_held_by_current_thread()); + scoped_lock with(sched_lock); + // Schedulers could possibly have already exited + if (live_schedulers != 0) { + sched_lock.wait(); + } + return rval; } +// FIXME: Fix all these FIXMEs void rust_kernel::fail() { // FIXME: On windows we're getting "Application has requested the @@ -79,7 +126,29 @@ rust_kernel::fail() { #if defined(__WIN32__) exit(rval); #endif - sched->kill_all_tasks(); + // Copy the list of schedulers so that we don't hold the lock while + // running kill_all_tasks. + // FIXME: There's a lot that happens under kill_all_tasks, and I don't + // know that holding sched_lock here is ok, but we need to hold the + // sched lock to prevent the scheduler from being destroyed while + // we are using it. Probably we need to make rust_scheduler atomicly + // reference counted. + std::vector scheds; + { + scoped_lock with(sched_lock); + for (sched_map::iterator iter = sched_table.begin(); + iter != sched_table.end(); iter++) { + scheds.push_back(iter->second); + } + } + + // FIXME: This is not a foolproof way to kill all tasks while ensuring + // that no new tasks or schedulers are created in the meantime that + // keep the scheduler alive. + for (std::vector::iterator iter = scheds.begin(); + iter != scheds.end(); iter++) { + (*iter)->kill_all_tasks(); + } } void @@ -106,11 +175,6 @@ rust_kernel::release_task_id(rust_task_id id) { new_live_tasks = --live_tasks; } KLOG_("Total outstanding tasks: %d", new_live_tasks); - if (new_live_tasks == 0) { - // There are no more tasks and there never will be. - // Tell all the schedulers to exit. - sched->exit(); - } } rust_task * diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 047cd73bf0389..2b7662220fd1c 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -2,12 +2,15 @@ #ifndef RUST_KERNEL_H #define RUST_KERNEL_H +#include #include "memory_region.h" #include "rust_log.h" struct rust_task_thread; struct rust_scheduler; +typedef std::map sched_map; + /** * A global object shared by all thread domains. Most of the data structures * in this class are synchronized since they are accessed from multiple @@ -20,8 +23,6 @@ class rust_kernel { public: rust_srv *srv; private: - rust_scheduler *sched; - // Protects live_tasks, max_task_id and task_table lock_and_signal task_lock; // Tracks the number of tasks that are being managed by @@ -35,12 +36,20 @@ class rust_kernel { lock_and_signal rval_lock; int rval; + // Protects live_schedulers, max_sched_id and sched_table + lock_and_signal sched_lock; + // Tracks the number of schedulers currently running. + // When this hits 0 we will signal the sched_lock and the + // kernel will terminate. + uintptr_t live_schedulers; + rust_sched_id max_sched_id; + sched_map sched_table; + public: struct rust_env *env; - rust_kernel(rust_srv *srv, size_t num_threads); - ~rust_kernel(); + rust_kernel(rust_srv *srv); void log(uint32_t level, char const *fmt, ...); void fatal(char const *fmt, ...); @@ -51,8 +60,11 @@ class rust_kernel { void fail(); - int start_schedulers(); - rust_scheduler* get_default_scheduler(); + rust_sched_id create_scheduler(size_t num_threads); + rust_scheduler* get_scheduler_by_id(rust_sched_id id); + // Called by a scheduler to indicate that it is terminating + void release_scheduler_id(rust_sched_id id); + int wait_for_schedulers(); #ifdef __WIN32__ void win32_require(LPCTSTR fn, BOOL ok); diff --git a/src/rt/rust_scheduler.cpp b/src/rt/rust_scheduler.cpp index 3e306cc418d4f..629df420bd1b9 100644 --- a/src/rt/rust_scheduler.cpp +++ b/src/rt/rust_scheduler.cpp @@ -3,11 +3,15 @@ rust_scheduler::rust_scheduler(rust_kernel *kernel, rust_srv *srv, - size_t num_threads) : + size_t num_threads, + rust_sched_id id) : kernel(kernel), srv(srv), env(srv->env), - num_threads(num_threads) + live_threads(num_threads), + live_tasks(0), + num_threads(num_threads), + id(id) { isaac_init(kernel, &rctx); create_task_threads(); @@ -55,15 +59,15 @@ rust_scheduler::destroy_task_threads() { void rust_scheduler::start_task_threads() { + // Copy num_threads because it's possible for the last thread + // to terminate and have the kernel delete us before we + // hit the last check against num_threads, in which case + // we would be accessing invalid memory. + uintptr_t num_threads = this->num_threads; for(size_t i = 0; i < num_threads; ++i) { rust_task_thread *thread = threads[i]; thread->start(); } - - for(size_t i = 0; i < num_threads; ++i) { - rust_task_thread *thread = threads[i]; - thread->join(); - } } void @@ -81,6 +85,7 @@ rust_scheduler::create_task(rust_task *spawner, const char *name, { scoped_lock with(lock); thread_no = isaac_rand(&rctx) % num_threads; + live_tasks++; } rust_task_thread *thread = threads[thread_no]; return thread->create_task(spawner, name, init_stack_sz); @@ -91,9 +96,28 @@ rust_scheduler::create_task(rust_task *spawner, const char *name) { return create_task(spawner, name, env->min_stack_size); } +void +rust_scheduler::release_task() { + bool need_exit = false; + { + scoped_lock with(lock); + live_tasks--; + if (live_tasks == 0) { + need_exit = true; + } + } + if (need_exit) { + // There are no more tasks on this scheduler. Time to leave + exit(); + } +} + void rust_scheduler::exit() { - for(size_t i = 0; i < num_threads; ++i) { + // Take a copy of num_threads. After the last thread exits this + // scheduler will get destroyed, and our fields will cease to exist. + size_t current_num_threads = num_threads; + for(size_t i = 0; i < current_num_threads; ++i) { threads[i]->exit(); } } @@ -102,3 +126,16 @@ size_t rust_scheduler::number_of_threads() { return num_threads; } + +void +rust_scheduler::release_task_thread() { + I(this, !lock.lock_held_by_current_thread()); + uintptr_t new_live_threads; + { + scoped_lock with(lock); + new_live_threads = --live_threads; + } + if (new_live_threads == 0) { + kernel->release_scheduler_id(id); + } +} diff --git a/src/rt/rust_scheduler.h b/src/rt/rust_scheduler.h index 03eb3186ed342..5a931cad348e8 100644 --- a/src/rt/rust_scheduler.h +++ b/src/rt/rust_scheduler.h @@ -10,11 +10,18 @@ class rust_scheduler : public kernel_owned { rust_srv *srv; rust_env *env; private: + // Protects the random number context and live_threads lock_and_signal lock; - array_list threads; + // When this hits zero we'll tell the kernel to release us + uintptr_t live_threads; + // When this hits zero we'll tell the threads to exit + uintptr_t live_tasks; randctx rctx; + + array_list threads; const size_t num_threads; - int rval; + + rust_sched_id id; void create_task_threads(); void destroy_task_threads(); @@ -22,8 +29,11 @@ class rust_scheduler : public kernel_owned { rust_task_thread *create_task_thread(int id); void destroy_task_thread(rust_task_thread *thread); + void exit(); + public: - rust_scheduler(rust_kernel *kernel, rust_srv *srv, size_t num_threads); + rust_scheduler(rust_kernel *kernel, rust_srv *srv, size_t num_threads, + rust_sched_id id); ~rust_scheduler(); void start_task_threads(); @@ -32,8 +42,15 @@ class rust_scheduler : public kernel_owned { const char *name, size_t init_stack_sz); rust_task_id create_task(rust_task *spawner, const char *name); - void exit(); + + void release_task(); + size_t number_of_threads(); + // Called by each thread when it terminates. When all threads + // terminate the scheduler does as well. + void release_task_thread(); + + rust_sched_id get_id() { return id; } }; #endif /* RUST_SCHEDULER_H */ diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 136b00e1f56ef..9443d4f370602 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -268,7 +268,8 @@ rust_task::rust_task(rust_task_thread *thread, rust_task_list *state, } } -rust_task::~rust_task() +void +rust_task::delete_this() { I(thread, !thread->lock.lock_held_by_current_thread()); I(thread, port_table.is_empty()); @@ -291,6 +292,8 @@ rust_task::~rust_task() while (stk != NULL) { del_stk(this, stk); } + + thread->release_task(this); } struct spawn_args { diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index 7e407b382000d..33e0da5af337a 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -122,6 +122,11 @@ rust_task : public kernel_owned, rust_cond // The amount of stack we're using, excluding red zones size_t total_stack_sz; +private: + // Called when the atomic refcount reaches zero + void delete_this(); +public: + // Only a pointer to 'name' is kept, so it must live as long as this task. rust_task(rust_task_thread *thread, rust_task_list *state, @@ -129,8 +134,6 @@ rust_task : public kernel_owned, rust_cond const char *name, size_t init_stack_sz); - ~rust_task(); - void start(spawn_fn spawnee_fn, rust_opaque_box *env, void *args); diff --git a/src/rt/rust_task_thread.cpp b/src/rt/rust_task_thread.cpp index a3bf729ad2f6d..236eaaab98e75 100644 --- a/src/rt/rust_task_thread.cpp +++ b/src/rt/rust_task_thread.cpp @@ -136,16 +136,30 @@ rust_task_thread::reap_dead_tasks() { for (size_t i = 0; i < dead_tasks_len; ++i) { rust_task *task = dead_tasks_copy[i]; - if (task) { - kernel->release_task_id(task->user.id); - task->deref(); - } + // Release the task from the kernel so nobody else can get at it + kernel->release_task_id(task->user.id); + // Deref the task, which may cause it to request us to release it + task->deref(); } srv->free(dead_tasks_copy); lock.lock(); } +void +rust_task_thread::release_task(rust_task *task) { + // Nobody should have a ref to the task at this point + I(this, task->ref_count == 0); + // Kernel should not know about the task any more + I(this, kernel->get_task_by_id(task->user.id) == NULL); + // Now delete the task, which will require using this thread's + // memory region. + delete task; + // Now release the task from the scheduler, which may trigger this + // thread to exit + sched->release_task(); +} + /** * Schedules a running task for execution. Only running tasks can be * activated. Blocked tasks have to be unblocked before they can be @@ -296,6 +310,7 @@ rust_task_thread::create_task(rust_task *spawner, const char *name, void rust_task_thread::run() { this->start_main_loop(); + sched->release_task_thread(); } #ifndef _WIN32 diff --git a/src/rt/rust_task_thread.h b/src/rt/rust_task_thread.h index e8fb70fee9343..541d95f646056 100644 --- a/src/rt/rust_task_thread.h +++ b/src/rt/rust_task_thread.h @@ -122,6 +122,9 @@ struct rust_task_thread : public kernel_owned, static rust_task *get_task(); + // Called by each task when they are ready to be destroyed + void release_task(rust_task *task); + // Tells the scheduler to exit it's scheduling loop and thread void exit(); }; diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 81f8a9c25f787..b0acbacf4e31d 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -24,6 +24,9 @@ rand_free rand_new rand_next refcount +rust_get_sched_id +rust_new_sched +rust_new_task_in_sched rust_path_is_dir rust_path_exists rust_getcwd @@ -97,3 +100,9 @@ rust_uvtmp_read_start rust_uvtmp_timer rust_uvtmp_delete_buf rust_uvtmp_get_req_id +rust_dbg_lock_create +rust_dbg_lock_destroy +rust_dbg_lock_lock +rust_dbg_lock_unlock +rust_dbg_lock_wait +rust_dbg_lock_signal diff --git a/src/rt/sync/sync.cpp b/src/rt/sync/sync.cpp index fd8c48936a46e..31162d35b0311 100644 --- a/src/rt/sync/sync.cpp +++ b/src/rt/sync/sync.cpp @@ -19,7 +19,7 @@ void sync::sleep(size_t timeout_in_ms) { #endif } -rust_thread::rust_thread() : _is_running(false), thread(0) { +rust_thread::rust_thread() : thread(0) { } #if defined(__WIN32__) @@ -46,7 +46,6 @@ rust_thread::start() { pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_create(&thread, &attr, rust_thread_start, (void *) this); #endif - _is_running = true; } void @@ -59,10 +58,4 @@ rust_thread::join() { pthread_join(thread, NULL); #endif thread = 0; - _is_running = false; -} - -bool -rust_thread::is_running() { - return _is_running; } diff --git a/src/rt/sync/sync.h b/src/rt/sync/sync.h index 8298f4028818d..9c911e2c929a4 100644 --- a/src/rt/sync/sync.h +++ b/src/rt/sync/sync.h @@ -37,8 +37,6 @@ class sync { * Thread utility class. Derive and implement your own run() method. */ class rust_thread { -private: - volatile bool _is_running; public: #if defined(__WIN32__) HANDLE thread; @@ -54,8 +52,6 @@ class rust_thread { void join(); - bool is_running(); - virtual ~rust_thread() {} // quiet the compiler }; diff --git a/src/test/run-pass/rt-sched-1.rs b/src/test/run-pass/rt-sched-1.rs new file mode 100644 index 0000000000000..67cdf6d5c9e06 --- /dev/null +++ b/src/test/run-pass/rt-sched-1.rs @@ -0,0 +1,36 @@ +// Tests of the runtime's scheduler interface + +type sched_id = int; +type task_id = int; + +type task = *ctypes::void; +type closure = *ctypes::void; + +native mod rustrt { + fn rust_new_sched(num_threads: uint) -> sched_id; + fn rust_get_sched_id() -> sched_id; + fn rust_new_task_in_sched(id: sched_id) -> task_id; + fn start_task(id: task_id, f: closure); +} + +fn main() unsafe { + let po = comm::port(); + let ch = comm::chan(po); + let parent_sched_id = rustrt::rust_get_sched_id(); + #error("parent %?", parent_sched_id); + let num_threads = 1u; + let new_sched_id = rustrt::rust_new_sched(num_threads); + #error("new_sched_id %?", new_sched_id); + let new_task_id = rustrt::rust_new_task_in_sched(new_sched_id); + let f = fn~() { + let child_sched_id = rustrt::rust_get_sched_id(); + #error("child_sched_id %?", child_sched_id); + assert child_sched_id != parent_sched_id; + assert child_sched_id == new_sched_id; + comm::send(ch, ()); + }; + let fptr = unsafe::reinterpret_cast(ptr::addr_of(f)); + rustrt::start_task(new_task_id, fptr); + unsafe::leak(f); + comm::recv(po); +} \ No newline at end of file