From 586b7ed3060bdbbf568f9585c0d40c5d7223377c Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Fri, 17 Jun 2011 16:19:04 -0700 Subject: [PATCH 1/7] Added a parallel fibonacci program. It doesn't actually run in parallel yet, but it will give us something fun to test threading with. --- src/test/bench/shootout/pfib.rs | 41 +++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 src/test/bench/shootout/pfib.rs diff --git a/src/test/bench/shootout/pfib.rs b/src/test/bench/shootout/pfib.rs new file mode 100644 index 0000000000000..60f2d3779c194 --- /dev/null +++ b/src/test/bench/shootout/pfib.rs @@ -0,0 +1,41 @@ +// -*- rust -*- + +/* + A parallel version of fibonacci numbers. +*/ + +fn recv[T](&port[T] p) -> T { + let T x; + p |> x; + ret x; +} + +fn fib(int n) -> int { + fn pfib(chan[int] c, int n) { + if (n == 0) { + c <| 0; + } + else if (n <= 2) { + c <| 1; + } + else { + let port[int] p = port(); + + auto t1 = spawn pfib(chan(p), n - 1); + auto t2 = spawn pfib(chan(p), n - 2); + + c <| recv(p) + recv(p); + } + } + + let port[int] p = port(); + auto t = spawn pfib(chan(p), n); + ret recv(p); +} + +fn main() { + assert (fib(8) == 21); + assert (fib(15) == 610); + log fib(8); + log fib(15); +} From dbc51a2d2ffb1e06f2b482586abe59905ac48aad Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Mon, 20 Jun 2011 17:19:50 -0700 Subject: [PATCH 2/7] Basic multithreading support. The infinite loops test successfully maxes out the CPU. --- src/rt/rust.cpp | 2 +- src/rt/rust_dom.cpp | 85 +++++++++++++++++++++++------ src/rt/rust_dom.h | 16 +++++- src/rt/rust_task.cpp | 29 +++++++--- src/rt/rust_task.h | 6 ++ src/rt/sync/lock_and_signal.cpp | 23 ++++++++ src/rt/sync/lock_and_signal.h | 14 +++++ src/rt/test/rust_test_runtime.cpp | 2 +- src/rt/util/array_list.h | 1 + src/rt/util/indexed_list.h | 1 + src/test/run-pass/infinite-loops.rs | 29 ++++++++++ 11 files changed, 178 insertions(+), 30 deletions(-) create mode 100644 src/test/run-pass/infinite-loops.rs diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index c7cece2ef8152..f1666b2e7a754 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -95,7 +95,7 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { dom->root_task->start(main_fn, (uintptr_t)args->args); - int ret = dom->start_main_loop(); + int ret = dom->start_main_loops(8); delete args; kernel->destroy_domain(dom); kernel->join_all_domains(); diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index 1a5e14635099f..7b5467ddeb17a 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -47,16 +47,14 @@ rust_dom::~rust_dom() { void rust_dom::activate(rust_task *task) { - curr_task = task; - context ctx; task->ctx.next = &ctx; DLOG(this, task, "descheduling..."); + scheduler_lock.unlock(); task->ctx.swap(ctx); + scheduler_lock.lock(); DLOG(this, task, "task has returned"); - - curr_task = NULL; } void @@ -211,10 +209,13 @@ rust_dom::schedule_task() { // FIXME: in the face of failing tasks, this is not always right. // I(this, n_live_tasks() > 0); if (running_tasks.length() > 0) { - size_t i = rand(&rctx); - i %= running_tasks.length(); - if (running_tasks[i]->yield_timer.has_timed_out()) { - return (rust_task *)running_tasks[i]; + size_t k = rand(&rctx); + // Look around for a runnable task, starting at k. + for(size_t j = 0; j < running_tasks.length(); ++j) { + size_t i = (j + k) % running_tasks.length(); + if (running_tasks[i]->can_schedule()) { + return (rust_task *)running_tasks[i]; + } } } return NULL; @@ -261,15 +262,20 @@ rust_dom::log_state() { * drop to zero. */ int -rust_dom::start_main_loop() { +rust_dom::start_main_loop(int id) { + scheduler_lock.lock(); + // Make sure someone is watching, to pull us out of infinite loops. rust_timer timer(this); - DLOG(this, dom, "started domain loop"); + DLOG(this, dom, "started domain loop %d", id); while (number_of_live_tasks() > 0) { A(this, kernel->is_deadlocked() == false, "deadlock"); + DLOG(this, dom, "worker %d, number_of_live_tasks = %d", + id, number_of_live_tasks()); + drain_incoming_message_queue(true); rust_task *scheduled_task = schedule_task(); @@ -281,8 +287,11 @@ rust_dom::start_main_loop() { if (scheduled_task == NULL) { log_state(); DLOG(this, task, - "all tasks are blocked, scheduler yielding ..."); + "all tasks are blocked, scheduler id %d yielding ...", + id); + scheduler_lock.unlock(); sync::sleep(100); + scheduler_lock.lock(); DLOG(this, task, "scheduler resuming ..."); continue; @@ -303,15 +312,21 @@ rust_dom::start_main_loop() { interrupt_flag = 0; + DLOG(this, task, + "Running task %p on worker %d", + scheduled_task, id); + scheduled_task->active = true; activate(scheduled_task); + scheduled_task->active = false; DLOG(this, task, - "returned from task %s @0x%" PRIxPTR - " in state '%s', sp=0x%" PRIxPTR, - scheduled_task->name, - (uintptr_t)scheduled_task, - scheduled_task->state->name, - scheduled_task->rust_sp); + "returned from task %s @0x%" PRIxPTR + " in state '%s', sp=0x%, worker id=%d" PRIxPTR, + scheduled_task->name, + (uintptr_t)scheduled_task, + scheduled_task->state->name, + scheduled_task->rust_sp, + id); /* // These invariants are no longer valid, as rust_sp is not @@ -341,10 +356,32 @@ rust_dom::start_main_loop() { reap_dead_tasks(); } - DLOG(this, dom, "finished main-loop (dom.rval = %d)", rval); + DLOG(this, dom, "finished main-loop %d (dom.rval = %d)", id, rval); + + scheduler_lock.unlock(); return rval; } +int rust_dom::start_main_loops(int num_threads) +{ + dom_worker *worker = NULL; + + // -1, because this thread will also be a worker. + for(int i = 0; i < num_threads - 1; ++i) { + worker = new dom_worker(i + 1, this); + worker->start(); + threads.push(worker); + } + + start_main_loop(0); + + while(threads.pop(&worker)) { + worker->join(); + delete worker; + } + + return rval; +} rust_crate_cache * rust_dom::get_cache() { @@ -353,14 +390,26 @@ rust_dom::get_cache() { rust_task * rust_dom::create_task(rust_task *spawner, const char *name) { + scheduler_lock.lock(); rust_task *task = new (this) rust_task (this, &newborn_tasks, spawner, name); DLOG(this, task, "created task: " PTR ", spawner: %s, name: %s", task, spawner ? spawner->name : "null", name); newborn_tasks.append(task); + scheduler_lock.unlock(); return task; } +rust_dom::dom_worker::dom_worker(int id, rust_dom *owner) + : id(id), owner(owner) +{ +} + +void rust_dom::dom_worker::run() +{ + owner->start_main_loop(id); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h index 2b88a2c713f71..7f9fa7a2901ed 100644 --- a/src/rt/rust_dom.h +++ b/src/rt/rust_dom.h @@ -96,11 +96,25 @@ struct rust_dom : public kernel_owned, rc_base void reap_dead_tasks(); rust_task *schedule_task(); - int start_main_loop(); + int start_main_loop(int id); + int start_main_loops(int num_threads); void log_state(); rust_task *create_task(rust_task *spawner, const char *name); + + class dom_worker : public rust_thread { + int id; + rust_dom *owner; + + public: + dom_worker(int id, rust_dom *owner); + + virtual void run(); + }; + + lock_and_signal scheduler_lock; + array_list threads; }; inline rust_log & diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index bb8261bb78a0d..f42f40510bd99 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -70,7 +70,8 @@ rust_task::rust_task(rust_dom *dom, rust_task_list *state, list_index(-1), rendezvous_ptr(0), alarm(this), - handle(NULL) + handle(NULL), + active(false) { LOGPTR(dom, "new task", (uintptr_t)this); DLOG(dom, task, "sizeof(task) = %d (0x%x)", sizeof *this, sizeof *this); @@ -123,17 +124,12 @@ struct spawn_args { uintptr_t, uintptr_t); }; -// TODO: rewrite this in LLVM assembly so we can be sure the calling -// conventions will match. extern "C" CDECL void task_start_wrapper(spawn_args *a) { rust_task *task = a->task; int rval = 42; - // This is used by the context switching code. LLVM generates fastcall - // functions, but ucontext needs cdecl functions. This massages the - // calling conventions into the right form. a->f(&rval, task, a->a3, a->a4); LOG(task, task, "task exited with value %d", rval); @@ -174,7 +170,10 @@ rust_task::start(uintptr_t spawnee_fn, ctx.call((void *)task_start_wrapper, a, sp); yield_timer.reset(0); - transition(&dom->newborn_tasks, &dom->running_tasks); + { + scoped_lock sync(dom->scheduler_lock); + transition(&dom->newborn_tasks, &dom->running_tasks); + } } void @@ -425,7 +424,10 @@ rust_task::block(rust_cond *on, const char* name) { A(dom, cond == NULL, "Cannot block an already blocked task."); A(dom, on != NULL, "Cannot block on a NULL object."); - transition(&dom->running_tasks, &dom->blocked_tasks); + { + scoped_lock sync(dom->scheduler_lock); + transition(&dom->running_tasks, &dom->blocked_tasks); + } cond = on; cond_name = name; } @@ -437,7 +439,10 @@ rust_task::wakeup(rust_cond *from) { (uintptr_t) cond, (uintptr_t) from); A(dom, cond == from, "Cannot wake up blocked task on wrong condition."); - transition(&dom->blocked_tasks, &dom->running_tasks); + { + scoped_lock sync(dom->scheduler_lock); + transition(&dom->blocked_tasks, &dom->running_tasks); + } I(dom, cond == from); cond = NULL; cond_name = "none"; @@ -445,6 +450,7 @@ rust_task::wakeup(rust_cond *from) { void rust_task::die() { + scoped_lock sync(dom->scheduler_lock); transition(&dom->running_tasks, &dom->dead_tasks); } @@ -482,6 +488,11 @@ rust_task::get_handle() { return handle; } +bool rust_task::can_schedule() +{ + return yield_timer.has_timed_out() && !active; +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index a022a348667ab..db3c0367adb53 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -50,6 +50,10 @@ rust_task : public maybe_proxy, rust_handle *handle; context ctx; + + // This flag indicates that a worker is either currently running the task + // or is about to run this task. + bool active; // Only a pointer to 'name' is kept, so it must live as long as this task. rust_task(rust_dom *dom, @@ -111,6 +115,8 @@ rust_task : public maybe_proxy, frame_glue_fns *get_frame_glue_fns(uintptr_t fp); rust_crate_cache * get_crate_cache(); + + bool can_schedule(); }; // diff --git a/src/rt/sync/lock_and_signal.cpp b/src/rt/sync/lock_and_signal.cpp index 4f2622858261b..337a1efa4bc90 100644 --- a/src/rt/sync/lock_and_signal.cpp +++ b/src/rt/sync/lock_and_signal.cpp @@ -41,6 +41,8 @@ void lock_and_signal::lock() { EnterCriticalSection(&_cs); #else CHECKED(pthread_mutex_lock(&_mutex)); + _holding_thread = pthread_self(); + _locked = true; #endif } @@ -48,6 +50,7 @@ void lock_and_signal::unlock() { #if defined(__WIN32__) LeaveCriticalSection(&_cs); #else + _locked = false; CHECKED(pthread_mutex_unlock(&_mutex)); #endif } @@ -100,6 +103,26 @@ void lock_and_signal::signal_all() { #endif } +bool lock_and_signal::lock_held_by_current_thread() +{ +#if defined(__WIN32__) + // TODO: implement this functionality for win32. + return false; +#else + return _locked && _holding_thread == pthread_self(); +#endif +} + +scoped_lock::scoped_lock(lock_and_signal &lock) + : lock(lock) +{ + lock.lock(); +} + +scoped_lock::~scoped_lock() +{ + lock.unlock(); +} // // Local Variables: diff --git a/src/rt/sync/lock_and_signal.h b/src/rt/sync/lock_and_signal.h index 1f1cfb4124b23..2794027d5c73f 100644 --- a/src/rt/sync/lock_and_signal.h +++ b/src/rt/sync/lock_and_signal.h @@ -1,3 +1,4 @@ +// -*- c++ -*- #ifndef LOCK_AND_SIGNAL_H #define LOCK_AND_SIGNAL_H @@ -8,6 +9,9 @@ class lock_and_signal { #else pthread_cond_t _cond; pthread_mutex_t _mutex; + + pthread_t _holding_thread; + bool _locked; #endif public: lock_and_signal(); @@ -19,6 +23,16 @@ class lock_and_signal { void timed_wait(size_t timeout_in_ns); void signal(); void signal_all(); + + bool lock_held_by_current_thread(); +}; + +class scoped_lock { + lock_and_signal &lock; + +public: + scoped_lock(lock_and_signal &lock); + ~scoped_lock(); }; #endif /* LOCK_AND_SIGNAL_H */ diff --git a/src/rt/test/rust_test_runtime.cpp b/src/rt/test/rust_test_runtime.cpp index 0fed35dad8249..18a957edd603f 100644 --- a/src/rt/test/rust_test_runtime.cpp +++ b/src/rt/test/rust_test_runtime.cpp @@ -53,7 +53,7 @@ rust_task_test::worker::run() { kernel->create_domain("test"); rust_dom *domain = handle->referent(); domain->root_task->start((uintptr_t)&task_entry, (uintptr_t)NULL); - domain->start_main_loop(); + domain->start_main_loop(0); kernel->destroy_domain(domain); } diff --git a/src/rt/util/array_list.h b/src/rt/util/array_list.h index 9ad4b2080417f..495594cf7e68a 100644 --- a/src/rt/util/array_list.h +++ b/src/rt/util/array_list.h @@ -1,3 +1,4 @@ +// -*- c++ -*- #ifndef ARRAY_LIST_H #define ARRAY_LIST_H diff --git a/src/rt/util/indexed_list.h b/src/rt/util/indexed_list.h index 173e9ede2ba07..0c36604328fab 100644 --- a/src/rt/util/indexed_list.h +++ b/src/rt/util/indexed_list.h @@ -1,3 +1,4 @@ +// -*- c++ -*- #ifndef INDEXED_LIST_H #define INDEXED_LIST_H diff --git a/src/test/run-pass/infinite-loops.rs b/src/test/run-pass/infinite-loops.rs new file mode 100644 index 0000000000000..be9fed9ea1c83 --- /dev/null +++ b/src/test/run-pass/infinite-loops.rs @@ -0,0 +1,29 @@ +/* + A simple way to make sure threading works. This should use all the + CPU cycles an any machines that we're likely to see for a while. +*/ + +// xfail-stage0 +// xfail-stage1 +// xfail-stage2 +// xfail-stage3 + +use std; +import std::task::join; + +fn loop(int n) { + let task t1; + let task t2; + + if(n > 0) { + t1 = spawn loop(n - 1); + t2 = spawn loop(n - 1); + } + + while(true) {} +} + +fn main() { + let task t = spawn loop(5); + join(t); +} From 7cc9bd6e8286b689cbffdb535000a2d76bc64ecc Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Mon, 20 Jun 2011 18:01:38 -0700 Subject: [PATCH 3/7] Added an environment variable to control how many threads to use. --- src/rt/rust.cpp | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index f1666b2e7a754..62cb6fe3bf3e2 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -71,6 +71,19 @@ command_line_args : public dom_owned } }; +int get_num_threads() +{ + char *env = getenv("RUST_THREADS"); + if(env) { + int num = atoi(env); + if(num > 0) + return num; + } + // TODO: in this case, determine the number of CPUs present on the + // machine. + return 1; +} + /** * Main entry point into the Rust runtime. Here we create a Rust service, * initialize the kernel, create the root domain and run it. @@ -95,7 +108,11 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { dom->root_task->start(main_fn, (uintptr_t)args->args); - int ret = dom->start_main_loops(8); + int num_threads = get_num_threads(); + + DLOG(dom, dom, "Using %d worker threads.", num_threads); + + int ret = dom->start_main_loops(num_threads); delete args; kernel->destroy_domain(dom); kernel->join_all_domains(); From c2898961044f9f4d08fcb66dcc3dcfd7bac41c38 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Tue, 21 Jun 2011 15:10:55 -0700 Subject: [PATCH 4/7] Added some locking to ports to prevent the case where two threads simultaneously wake up a task blocked on a certain port. --- src/rt/rust_chan.cpp | 3 +++ src/rt/rust_port.h | 2 ++ src/rt/rust_task.cpp | 17 ++++------------- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index fdc7f2704d315..4349794e658f7 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -31,6 +31,7 @@ rust_chan::~rust_chan() { void rust_chan::associate(maybe_proxy *port) { this->port = port; if (port->is_proxy() == false) { + scoped_lock sync(port->referent()->lock); LOG(task, task, "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, this, port); @@ -49,6 +50,7 @@ void rust_chan::disassociate() { A(task->dom, is_associated(), "Channel must be associated with a port."); if (port->is_proxy() == false) { + scoped_lock sync(port->referent()->lock); LOG(task, task, "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, this, port->referent()); @@ -81,6 +83,7 @@ void rust_chan::send(void *sptr) { buffer.dequeue(NULL); } else { rust_port *target_port = port->referent(); + scoped_lock sync(target_port->lock); if (target_port->task->blocked_on(target_port)) { DLOG(dom, comm, "dequeued in rendezvous_ptr"); buffer.dequeue(target_port->task->rendezvous_ptr); diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h index 7a58f839c44f0..144a3b45cecd4 100644 --- a/src/rt/rust_port.h +++ b/src/rt/rust_port.h @@ -13,6 +13,8 @@ class rust_port : public maybe_proxy, // Data sent to this port from remote tasks is buffered in this channel. rust_chan *remote_channel; + lock_and_signal lock; + rust_port(rust_task *task, size_t unit_sz); ~rust_port(); void log_state(); diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index f42f40510bd99..c26e793b979b4 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -170,10 +170,7 @@ rust_task::start(uintptr_t spawnee_fn, ctx.call((void *)task_start_wrapper, a, sp); yield_timer.reset(0); - { - scoped_lock sync(dom->scheduler_lock); - transition(&dom->newborn_tasks, &dom->running_tasks); - } + transition(&dom->newborn_tasks, &dom->running_tasks); } void @@ -408,6 +405,7 @@ rust_task::free(void *p, bool is_gc) void rust_task::transition(rust_task_list *src, rust_task_list *dst) { + scoped_lock sync(dom->scheduler_lock); DLOG(dom, task, "task %s " PTR " state change '%s' -> '%s' while in '%s'", name, (uintptr_t)this, src->name, dst->name, state->name); @@ -424,10 +422,7 @@ rust_task::block(rust_cond *on, const char* name) { A(dom, cond == NULL, "Cannot block an already blocked task."); A(dom, on != NULL, "Cannot block on a NULL object."); - { - scoped_lock sync(dom->scheduler_lock); - transition(&dom->running_tasks, &dom->blocked_tasks); - } + transition(&dom->running_tasks, &dom->blocked_tasks); cond = on; cond_name = name; } @@ -439,10 +434,7 @@ rust_task::wakeup(rust_cond *from) { (uintptr_t) cond, (uintptr_t) from); A(dom, cond == from, "Cannot wake up blocked task on wrong condition."); - { - scoped_lock sync(dom->scheduler_lock); - transition(&dom->blocked_tasks, &dom->running_tasks); - } + transition(&dom->blocked_tasks, &dom->running_tasks); I(dom, cond == from); cond = NULL; cond_name = "none"; @@ -450,7 +442,6 @@ rust_task::wakeup(rust_cond *from) { void rust_task::die() { - scoped_lock sync(dom->scheduler_lock); transition(&dom->running_tasks, &dom->dead_tasks); } From 124762bcf4c983f5b6112d68af9d5a9cb5a4bda0 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Tue, 21 Jun 2011 18:08:34 -0700 Subject: [PATCH 5/7] Fixed a few concurrency bugs. Still not perfect, but overall it seems much more reliable. --- src/rt/rust_dom.cpp | 9 +++++++-- src/rt/rust_srv.cpp | 3 ++- src/rt/rust_task.h | 2 +- src/rt/rust_upcall.cpp | 9 ++++++--- src/test/bench/shootout/pfib.rs | 8 ++++---- 5 files changed, 20 insertions(+), 11 deletions(-) diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index 7b5467ddeb17a..ac258f5c97329 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -166,9 +166,11 @@ rust_dom::number_of_live_tasks() { */ void rust_dom::reap_dead_tasks() { + I(this, scheduler_lock.lock_held_by_current_thread()); for (size_t i = 0; i < dead_tasks.length(); ) { rust_task *task = dead_tasks[i]; - if (task->ref_count == 0) { + // Make sure this task isn't still running somewhere else... + if (task->ref_count == 0 && task->can_schedule()) { I(this, task->tasks_waiting_to_join.is_empty()); dead_tasks.remove(task); DLOG(this, task, @@ -315,13 +317,14 @@ rust_dom::start_main_loop(int id) { DLOG(this, task, "Running task %p on worker %d", scheduled_task, id); + I(this, !scheduled_task->active); scheduled_task->active = true; activate(scheduled_task); scheduled_task->active = false; DLOG(this, task, "returned from task %s @0x%" PRIxPTR - " in state '%s', sp=0x%, worker id=%d" PRIxPTR, + " in state '%s', sp=0x%x, worker id=%d" PRIxPTR, scheduled_task->name, (uintptr_t)scheduled_task, scheduled_task->state->name, @@ -349,7 +352,9 @@ rust_dom::start_main_loop(int id) { "scheduler yielding ...", dead_tasks.length()); log_state(); + scheduler_lock.unlock(); sync::yield(); + scheduler_lock.lock(); } else { drain_incoming_message_queue(true); } diff --git a/src/rt/rust_srv.cpp b/src/rt/rust_srv.cpp index 64befd6ee42d6..7f49b52019d81 100644 --- a/src/rt/rust_srv.cpp +++ b/src/rt/rust_srv.cpp @@ -54,7 +54,8 @@ rust_srv::fatal(const char *expression, "fatal, '%s' failed, %s:%d %s", expression, file, (int)line, buf); log(msg); - exit(1); + abort(); + //exit(1); } void diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index db3c0367adb53..3f9a0660300c6 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -53,7 +53,7 @@ rust_task : public maybe_proxy, // This flag indicates that a worker is either currently running the task // or is about to run this task. - bool active; + volatile bool active; // Only a pointer to 'name' is kept, so it must live as long as this task. rust_task(rust_dom *dom, diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 7fb6bc4c84dc5..ba29203ca1d56 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -223,9 +223,12 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { // on the port. Remember the rendezvous location so that any sender // task can write to it before waking up this task. - LOG(task, comm, "<=== waiting for rendezvous data ==="); - task->rendezvous_ptr = dptr; - task->block(port, "waiting for rendezvous data"); + { + scoped_lock sync(port->lock); + LOG(task, comm, "<=== waiting for rendezvous data ==="); + task->rendezvous_ptr = dptr; + task->block(port, "waiting for rendezvous data"); + } task->yield(3); } diff --git a/src/test/bench/shootout/pfib.rs b/src/test/bench/shootout/pfib.rs index 60f2d3779c194..f7fda8b811025 100644 --- a/src/test/bench/shootout/pfib.rs +++ b/src/test/bench/shootout/pfib.rs @@ -34,8 +34,8 @@ fn fib(int n) -> int { } fn main() { - assert (fib(8) == 21); - assert (fib(15) == 610); - log fib(8); - log fib(15); + assert (fib(8) == 21); + assert (fib(15) == 610); + log fib(8); + log fib(15); } From 698a690f04dab0d4feb5cbf3eedae4422f794f3f Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Wed, 22 Jun 2011 15:44:47 -0700 Subject: [PATCH 6/7] Conservatively serialize nearly all upcalls. Successfuly ran make check with RUST_THREADS=8, so we're probably fairly safe now. In the future we can relax the synchronization to get better performance. --- src/rt/rust_builtin.cpp | 5 +++ src/rt/rust_chan.cpp | 7 ++-- src/rt/rust_dom.cpp | 6 ++-- src/rt/rust_port.h | 2 -- src/rt/rust_task.cpp | 30 ++++++++++------ src/rt/rust_upcall.cpp | 64 ++++++++++++++++++++++----------- src/rt/sync/lock_and_signal.cpp | 4 ++- 7 files changed, 78 insertions(+), 40 deletions(-) diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 74b1075f70180..27fe45e42d7a1 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -391,12 +391,17 @@ task_yield(rust_task *task) { extern "C" CDECL void task_join(rust_task *task, rust_task *join_task) { + task->dom->scheduler_lock.lock(); // If the other task is already dying, we don't have to wait for it. if (join_task->dead() == false) { join_task->tasks_waiting_to_join.push(task); task->block(join_task, "joining local task"); + task->dom->scheduler_lock.unlock(); task->yield(2); } + else { + task->dom->scheduler_lock.unlock(); + } } /* Debug builtins for std.dbg. */ diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index 4349794e658f7..cc03c227acda9 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -10,6 +10,7 @@ rust_chan::rust_chan(rust_task *task, task(task), port(port), buffer(task->dom, unit_sz) { + ++task->ref_count; if (port) { associate(port); } @@ -23,6 +24,7 @@ rust_chan::~rust_chan() { A(task->dom, is_associated() == false, "Channel must be disassociated before being freed."); + --task->ref_count; } /** @@ -31,10 +33,10 @@ rust_chan::~rust_chan() { void rust_chan::associate(maybe_proxy *port) { this->port = port; if (port->is_proxy() == false) { - scoped_lock sync(port->referent()->lock); LOG(task, task, "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, this, port); + ++this->ref_count; this->port->referent()->chans.push(this); } } @@ -50,10 +52,10 @@ void rust_chan::disassociate() { A(task->dom, is_associated(), "Channel must be associated with a port."); if (port->is_proxy() == false) { - scoped_lock sync(port->referent()->lock); LOG(task, task, "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, this, port->referent()); + --this->ref_count; port->referent()->chans.swap_delete(this); } @@ -83,7 +85,6 @@ void rust_chan::send(void *sptr) { buffer.dequeue(NULL); } else { rust_port *target_port = port->referent(); - scoped_lock sync(target_port->lock); if (target_port->task->blocked_on(target_port)) { DLOG(dom, comm, "dequeued in rendezvous_ptr"); buffer.dequeue(target_port->task->rendezvous_ptr); diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index ac258f5c97329..5a2175f36a261 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -268,7 +268,7 @@ rust_dom::start_main_loop(int id) { scheduler_lock.lock(); // Make sure someone is watching, to pull us out of infinite loops. - rust_timer timer(this); + //rust_timer timer(this); DLOG(this, dom, "started domain loop %d", id); @@ -395,13 +395,13 @@ rust_dom::get_cache() { rust_task * rust_dom::create_task(rust_task *spawner, const char *name) { - scheduler_lock.lock(); + //scheduler_lock.lock(); rust_task *task = new (this) rust_task (this, &newborn_tasks, spawner, name); DLOG(this, task, "created task: " PTR ", spawner: %s, name: %s", task, spawner ? spawner->name : "null", name); newborn_tasks.append(task); - scheduler_lock.unlock(); + //scheduler_lock.unlock(); return task; } diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h index 144a3b45cecd4..7a58f839c44f0 100644 --- a/src/rt/rust_port.h +++ b/src/rt/rust_port.h @@ -13,8 +13,6 @@ class rust_port : public maybe_proxy, // Data sent to this port from remote tasks is buffered in this channel. rust_chan *remote_channel; - lock_and_signal lock; - rust_port(rust_task *task, size_t unit_sz); ~rust_port(); void log_state(); diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index c26e793b979b4..4b300c1a52471 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -131,19 +131,24 @@ void task_start_wrapper(spawn_args *a) int rval = 42; a->f(&rval, task, a->a3, a->a4); - + LOG(task, task, "task exited with value %d", rval); - // TODO: the old exit glue does some magical argument copying stuff. This - // is probably still needed. + { + scoped_lock with(task->dom->scheduler_lock); + + // TODO: the old exit glue does some magical argument copying + // stuff. This is probably still needed. + + // This is duplicated from upcall_exit, which is probably dead code by + // now. + LOG(task, task, "task ref_count: %d", task->ref_count); + A(task->dom, task->ref_count >= 0, + "Task ref_count should not be negative on exit!"); + task->die(); + task->notify_tasks_waiting_to_join(); - // This is duplicated from upcall_exit, which is probably dead code by - // now. - LOG(task, task, "task ref_count: %d", task->ref_count); - A(task->dom, task->ref_count >= 0, - "Task ref_count should not be negative on exit!"); - task->die(); - task->notify_tasks_waiting_to_join(); + } task->yield(1); } @@ -154,6 +159,9 @@ rust_task::start(uintptr_t spawnee_fn, LOGPTR(dom, "from spawnee", spawnee_fn); I(dom, stk->data != NULL); + I(dom, !dom->scheduler_lock.lock_held_by_current_thread()); + + scoped_lock with(dom->scheduler_lock); char *sp = (char *)rust_sp; @@ -405,7 +413,7 @@ rust_task::free(void *p, bool is_gc) void rust_task::transition(rust_task_list *src, rust_task_list *dst) { - scoped_lock sync(dom->scheduler_lock); + I(dom, dom->scheduler_lock.lock_held_by_current_thread()); DLOG(dom, task, "task %s " PTR " state change '%s' -> '%s' while in '%s'", name, (uintptr_t)this, src->name, dst->name, state->name); diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index ba29203ca1d56..885f82ddac393 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -23,6 +23,7 @@ str_buf(rust_task *task, rust_str *s); extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) { + I(task->dom, false); LOG_UPCALL_ENTRY(task); task->grow(n_frame_bytes); } @@ -74,6 +75,7 @@ extern "C" CDECL rust_port* upcall_new_port(rust_task *task, size_t unit_sz) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; + scoped_lock with(dom->scheduler_lock); LOG(task, comm, "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)", (uintptr_t) task, task->name, unit_sz); return new (dom) rust_port(task, unit_sz); @@ -82,6 +84,7 @@ upcall_new_port(rust_task *task, size_t unit_sz) { extern "C" CDECL void upcall_del_port(rust_task *task, rust_port *port) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); LOG(task, comm, "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port); I(task->dom, !port->ref_count); delete port; @@ -121,6 +124,7 @@ upcall_flush_chan(rust_task *task, rust_chan *chan) { extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); LOG(task, comm, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); @@ -143,7 +147,7 @@ void upcall_del_chan(rust_task *task, rust_chan *chan) { // here is that we can get ourselves in a deadlock if the // parent task tries to join us. // - // 2. We can leave the channel in a "dormnat" state by not freeing + // 2. We can leave the channel in a "dormant" state by not freeing // it and letting the receiver task delete it for us instead. if (chan->buffer.is_empty() == false) { return; @@ -162,6 +166,7 @@ extern "C" CDECL rust_chan * upcall_clone_chan(rust_task *task, maybe_proxy *target, rust_chan *chan) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); size_t unit_sz = chan->buffer.unit_sz; maybe_proxy *port = chan->port; rust_task *target_task = NULL; @@ -203,28 +208,30 @@ upcall_sleep(rust_task *task, size_t time_in_us) { extern "C" CDECL void upcall_send(rust_task *task, rust_chan *chan, void *sptr) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); chan->send(sptr); LOG(task, comm, "=== sent data ===>"); } extern "C" CDECL void upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { - LOG_UPCALL_ENTRY(task); - LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR - ", size: 0x%" PRIxPTR ", chan_no: %d", - (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, - port->chans.length()); + { + LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); - if (port->receive(dptr)) { - return; - } + LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR + ", size: 0x%" PRIxPTR ", chan_no: %d", + (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, + port->chans.length()); - // No data was buffered on any incoming channel, so block this task - // on the port. Remember the rendezvous location so that any sender - // task can write to it before waking up this task. + if (port->receive(dptr)) { + return; + } + + // No data was buffered on any incoming channel, so block this task + // on the port. Remember the rendezvous location so that any sender + // task can write to it before waking up this task. - { - scoped_lock sync(port->lock); LOG(task, comm, "<=== waiting for rendezvous data ==="); task->rendezvous_ptr = dptr; task->block(port, "waiting for rendezvous data"); @@ -248,6 +255,7 @@ upcall_fail(rust_task *task, extern "C" CDECL void upcall_kill(rust_task *task, maybe_proxy *target) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); if (target->is_proxy()) { notify_message:: send(notify_message::KILL, "kill", task->get_handle(), @@ -264,18 +272,22 @@ upcall_kill(rust_task *task, maybe_proxy *target) { */ extern "C" CDECL void upcall_exit(rust_task *task) { - LOG_UPCALL_ENTRY(task); - LOG(task, task, "task ref_count: %d", task->ref_count); - A(task->dom, task->ref_count >= 0, - "Task ref_count should not be negative on exit!"); - task->die(); - task->notify_tasks_waiting_to_join(); + { + LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); + LOG(task, task, "task ref_count: %d", task->ref_count); + A(task->dom, task->ref_count >= 0, + "Task ref_count should not be negative on exit!"); + task->die(); + task->notify_tasks_waiting_to_join(); + } task->yield(1); } extern "C" CDECL uintptr_t upcall_malloc(rust_task *task, size_t nbytes, type_desc *td) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); LOG(task, mem, "upcall malloc(%" PRIdPTR ", 0x%" PRIxPTR ")" @@ -296,6 +308,7 @@ upcall_malloc(rust_task *task, size_t nbytes, type_desc *td) { extern "C" CDECL void upcall_free(rust_task *task, void* ptr, uintptr_t is_gc) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); rust_dom *dom = task->dom; DLOG(dom, mem, "upcall free(0x%" PRIxPTR ", is_gc=%" PRIdPTR ")", @@ -306,6 +319,7 @@ upcall_free(rust_task *task, void* ptr, uintptr_t is_gc) { extern "C" CDECL uintptr_t upcall_mark(rust_task *task, void* ptr) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); rust_dom *dom = task->dom; if (ptr) { @@ -336,6 +350,7 @@ rust_str *make_str(rust_task *task, char const *s, size_t fill) { extern "C" CDECL rust_str * upcall_new_str(rust_task *task, char const *s, size_t fill) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); return make_str(task, s, fill); } @@ -343,6 +358,7 @@ upcall_new_str(rust_task *task, char const *s, size_t fill) { extern "C" CDECL rust_str * upcall_dup_str(rust_task *task, rust_str *str) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); return make_str(task, (char const *)str->data, str->fill); } @@ -350,6 +366,7 @@ upcall_dup_str(rust_task *task, rust_str *str) { extern "C" CDECL rust_vec * upcall_new_vec(rust_task *task, size_t fill, type_desc *td) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); rust_dom *dom = task->dom; DLOG(dom, mem, "upcall new_vec(%" PRIdPTR ")", fill); size_t alloc = next_power_of_two(sizeof(rust_vec) + fill); @@ -454,6 +471,7 @@ upcall_vec_append(rust_task *task, type_desc *t, type_desc *elem_t, rust_vec **dst_ptr, rust_vec *src, bool skip_null) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); rust_vec *dst = *dst_ptr; uintptr_t need_copy; size_t n_src_bytes = skip_null ? src->fill - 1 : src->fill; @@ -483,6 +501,7 @@ upcall_get_type_desc(rust_task *task, size_t n_descs, type_desc const **descs) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); LOG(task, cache, "upcall get_type_desc with size=%" PRIdPTR ", align=%" PRIdPTR ", %" PRIdPTR " descs", size, align, n_descs); @@ -496,6 +515,7 @@ extern "C" CDECL rust_task * upcall_new_task(rust_task *spawner, rust_vec *name) { // name is a rust string structure. LOG_UPCALL_ENTRY(spawner); + scoped_lock with(spawner->dom->scheduler_lock); rust_dom *dom = spawner->dom; rust_task *task = dom->create_task(spawner, (const char *)name->data); return task; @@ -535,6 +555,7 @@ upcall_start_task(rust_task *spawner, */ extern "C" CDECL maybe_proxy * upcall_new_thread(rust_task *task, const char *name) { + I(task->dom, false); LOG_UPCALL_ENTRY(task); rust_dom *parent_dom = task->dom; rust_kernel *kernel = parent_dom->kernel; @@ -583,6 +604,7 @@ upcall_start_thread(rust_task *task, rust_proxy *child_task_proxy, uintptr_t spawnee_fn, size_t callsz) { + I(task->dom, false); LOG_UPCALL_ENTRY(task); #if 0 rust_dom *parenet_dom = task->dom; @@ -615,6 +637,7 @@ extern "C" CDECL void upcall_ivec_resize(rust_task *task, rust_ivec *v, size_t newsz) { + scoped_lock with(task->dom->scheduler_lock); I(task->dom, !v->fill); size_t new_alloc = next_power_of_two(newsz); @@ -633,6 +656,7 @@ extern "C" CDECL void upcall_ivec_spill(rust_task *task, rust_ivec *v, size_t newsz) { + scoped_lock with(task->dom->scheduler_lock); size_t new_alloc = next_power_of_two(newsz); rust_ivec_heap *heap_part = (rust_ivec_heap *) diff --git a/src/rt/sync/lock_and_signal.cpp b/src/rt/sync/lock_and_signal.cpp index 337a1efa4bc90..6934a7965215e 100644 --- a/src/rt/sync/lock_and_signal.cpp +++ b/src/rt/sync/lock_and_signal.cpp @@ -21,7 +21,9 @@ lock_and_signal::lock_and_signal() { } #else -lock_and_signal::lock_and_signal() { +lock_and_signal::lock_and_signal() + : _locked(false) +{ CHECKED(pthread_cond_init(&_cond, NULL)); CHECKED(pthread_mutex_init(&_mutex, NULL)); } From c21ec35648e2cb6be5c24077adda86356406707c Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Thu, 23 Jun 2011 12:22:45 -0700 Subject: [PATCH 7/7] Implementation mising features in lock_and_signal for Win32. Also lowered the minimum stack size to get the pfib benchmark to run without exhausting its address space on Windows. --- src/rt/rust_dom.cpp | 1 + src/rt/rust_srv.cpp | 3 +-- src/rt/rust_task.cpp | 6 +++--- src/rt/sync/lock_and_signal.cpp | 8 ++++---- src/rt/sync/lock_and_signal.h | 3 ++- 5 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index 5a2175f36a261..ca3ee9c6a30e6 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -87,6 +87,7 @@ rust_dom::malloc(size_t size, memory_region::memory_region_type type) { } else if (type == memory_region::SYNCHRONIZED) { return synchronized_region.malloc(size); } + I(this, false); return NULL; } diff --git a/src/rt/rust_srv.cpp b/src/rt/rust_srv.cpp index 7f49b52019d81..64befd6ee42d6 100644 --- a/src/rt/rust_srv.cpp +++ b/src/rt/rust_srv.cpp @@ -54,8 +54,7 @@ rust_srv::fatal(const char *expression, "fatal, '%s' failed, %s:%d %s", expression, file, (int)line, buf); log(msg); - abort(); - //exit(1); + exit(1); } void diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 4b300c1a52471..754ff1edaf13b 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -14,9 +14,9 @@ // FIXME (issue #151): This should be 0x300; the change here is for // practicality's sake until stack growth is working. -static size_t const min_stk_bytes = 0x300000; -// static size_t const min_stk_bytes = 0x10000; - +//static size_t const min_stk_bytes = 0x300000; +//static size_t const min_stk_bytes = 0x10000; +static size_t const min_stk_bytes = 0x100000; // Task stack segments. Heap allocated and chained together. diff --git a/src/rt/sync/lock_and_signal.cpp b/src/rt/sync/lock_and_signal.cpp index 6934a7965215e..93275ca3e07a3 100644 --- a/src/rt/sync/lock_and_signal.cpp +++ b/src/rt/sync/lock_and_signal.cpp @@ -41,18 +41,19 @@ lock_and_signal::~lock_and_signal() { void lock_and_signal::lock() { #if defined(__WIN32__) EnterCriticalSection(&_cs); + _holding_thread = GetCurrentThreadId(); #else CHECKED(pthread_mutex_lock(&_mutex)); _holding_thread = pthread_self(); - _locked = true; #endif + _locked = true; } void lock_and_signal::unlock() { + _locked = false; #if defined(__WIN32__) LeaveCriticalSection(&_cs); #else - _locked = false; CHECKED(pthread_mutex_unlock(&_mutex)); #endif } @@ -108,8 +109,7 @@ void lock_and_signal::signal_all() { bool lock_and_signal::lock_held_by_current_thread() { #if defined(__WIN32__) - // TODO: implement this functionality for win32. - return false; + return _locked && _holding_thread == GetCurrentThreadId(); #else return _locked && _holding_thread == pthread_self(); #endif diff --git a/src/rt/sync/lock_and_signal.h b/src/rt/sync/lock_and_signal.h index 2794027d5c73f..608c1fcbe3e8f 100644 --- a/src/rt/sync/lock_and_signal.h +++ b/src/rt/sync/lock_and_signal.h @@ -6,13 +6,14 @@ class lock_and_signal { #if defined(__WIN32__) HANDLE _event; CRITICAL_SECTION _cs; + DWORD _holding_thread; #else pthread_cond_t _cond; pthread_mutex_t _mutex; pthread_t _holding_thread; - bool _locked; #endif + bool _locked; public: lock_and_signal(); virtual ~lock_and_signal();