diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index c7cece2ef8152..62cb6fe3bf3e2 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -71,6 +71,19 @@ command_line_args : public dom_owned } }; +int get_num_threads() +{ + char *env = getenv("RUST_THREADS"); + if(env) { + int num = atoi(env); + if(num > 0) + return num; + } + // TODO: in this case, determine the number of CPUs present on the + // machine. + return 1; +} + /** * Main entry point into the Rust runtime. Here we create a Rust service, * initialize the kernel, create the root domain and run it. @@ -95,7 +108,11 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { dom->root_task->start(main_fn, (uintptr_t)args->args); - int ret = dom->start_main_loop(); + int num_threads = get_num_threads(); + + DLOG(dom, dom, "Using %d worker threads.", num_threads); + + int ret = dom->start_main_loops(num_threads); delete args; kernel->destroy_domain(dom); kernel->join_all_domains(); diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 74b1075f70180..27fe45e42d7a1 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -391,12 +391,17 @@ task_yield(rust_task *task) { extern "C" CDECL void task_join(rust_task *task, rust_task *join_task) { + task->dom->scheduler_lock.lock(); // If the other task is already dying, we don't have to wait for it. if (join_task->dead() == false) { join_task->tasks_waiting_to_join.push(task); task->block(join_task, "joining local task"); + task->dom->scheduler_lock.unlock(); task->yield(2); } + else { + task->dom->scheduler_lock.unlock(); + } } /* Debug builtins for std.dbg. */ diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index fdc7f2704d315..cc03c227acda9 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -10,6 +10,7 @@ rust_chan::rust_chan(rust_task *task, task(task), port(port), buffer(task->dom, unit_sz) { + ++task->ref_count; if (port) { associate(port); } @@ -23,6 +24,7 @@ rust_chan::~rust_chan() { A(task->dom, is_associated() == false, "Channel must be disassociated before being freed."); + --task->ref_count; } /** @@ -34,6 +36,7 @@ void rust_chan::associate(maybe_proxy *port) { LOG(task, task, "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, this, port); + ++this->ref_count; this->port->referent()->chans.push(this); } } @@ -52,6 +55,7 @@ void rust_chan::disassociate() { LOG(task, task, "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, this, port->referent()); + --this->ref_count; port->referent()->chans.swap_delete(this); } diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index 1a5e14635099f..ca3ee9c6a30e6 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -47,16 +47,14 @@ rust_dom::~rust_dom() { void rust_dom::activate(rust_task *task) { - curr_task = task; - context ctx; task->ctx.next = &ctx; DLOG(this, task, "descheduling..."); + scheduler_lock.unlock(); task->ctx.swap(ctx); + scheduler_lock.lock(); DLOG(this, task, "task has returned"); - - curr_task = NULL; } void @@ -89,6 +87,7 @@ rust_dom::malloc(size_t size, memory_region::memory_region_type type) { } else if (type == memory_region::SYNCHRONIZED) { return synchronized_region.malloc(size); } + I(this, false); return NULL; } @@ -168,9 +167,11 @@ rust_dom::number_of_live_tasks() { */ void rust_dom::reap_dead_tasks() { + I(this, scheduler_lock.lock_held_by_current_thread()); for (size_t i = 0; i < dead_tasks.length(); ) { rust_task *task = dead_tasks[i]; - if (task->ref_count == 0) { + // Make sure this task isn't still running somewhere else... + if (task->ref_count == 0 && task->can_schedule()) { I(this, task->tasks_waiting_to_join.is_empty()); dead_tasks.remove(task); DLOG(this, task, @@ -211,10 +212,13 @@ rust_dom::schedule_task() { // FIXME: in the face of failing tasks, this is not always right. // I(this, n_live_tasks() > 0); if (running_tasks.length() > 0) { - size_t i = rand(&rctx); - i %= running_tasks.length(); - if (running_tasks[i]->yield_timer.has_timed_out()) { - return (rust_task *)running_tasks[i]; + size_t k = rand(&rctx); + // Look around for a runnable task, starting at k. + for(size_t j = 0; j < running_tasks.length(); ++j) { + size_t i = (j + k) % running_tasks.length(); + if (running_tasks[i]->can_schedule()) { + return (rust_task *)running_tasks[i]; + } } } return NULL; @@ -261,15 +265,20 @@ rust_dom::log_state() { * drop to zero. */ int -rust_dom::start_main_loop() { +rust_dom::start_main_loop(int id) { + scheduler_lock.lock(); + // Make sure someone is watching, to pull us out of infinite loops. - rust_timer timer(this); + //rust_timer timer(this); - DLOG(this, dom, "started domain loop"); + DLOG(this, dom, "started domain loop %d", id); while (number_of_live_tasks() > 0) { A(this, kernel->is_deadlocked() == false, "deadlock"); + DLOG(this, dom, "worker %d, number_of_live_tasks = %d", + id, number_of_live_tasks()); + drain_incoming_message_queue(true); rust_task *scheduled_task = schedule_task(); @@ -281,8 +290,11 @@ rust_dom::start_main_loop() { if (scheduled_task == NULL) { log_state(); DLOG(this, task, - "all tasks are blocked, scheduler yielding ..."); + "all tasks are blocked, scheduler id %d yielding ...", + id); + scheduler_lock.unlock(); sync::sleep(100); + scheduler_lock.lock(); DLOG(this, task, "scheduler resuming ..."); continue; @@ -303,15 +315,22 @@ rust_dom::start_main_loop() { interrupt_flag = 0; + DLOG(this, task, + "Running task %p on worker %d", + scheduled_task, id); + I(this, !scheduled_task->active); + scheduled_task->active = true; activate(scheduled_task); + scheduled_task->active = false; DLOG(this, task, - "returned from task %s @0x%" PRIxPTR - " in state '%s', sp=0x%" PRIxPTR, - scheduled_task->name, - (uintptr_t)scheduled_task, - scheduled_task->state->name, - scheduled_task->rust_sp); + "returned from task %s @0x%" PRIxPTR + " in state '%s', sp=0x%x, worker id=%d" PRIxPTR, + scheduled_task->name, + (uintptr_t)scheduled_task, + scheduled_task->state->name, + scheduled_task->rust_sp, + id); /* // These invariants are no longer valid, as rust_sp is not @@ -334,17 +353,41 @@ rust_dom::start_main_loop() { "scheduler yielding ...", dead_tasks.length()); log_state(); + scheduler_lock.unlock(); sync::yield(); + scheduler_lock.lock(); } else { drain_incoming_message_queue(true); } reap_dead_tasks(); } - DLOG(this, dom, "finished main-loop (dom.rval = %d)", rval); + DLOG(this, dom, "finished main-loop %d (dom.rval = %d)", id, rval); + + scheduler_lock.unlock(); return rval; } +int rust_dom::start_main_loops(int num_threads) +{ + dom_worker *worker = NULL; + + // -1, because this thread will also be a worker. + for(int i = 0; i < num_threads - 1; ++i) { + worker = new dom_worker(i + 1, this); + worker->start(); + threads.push(worker); + } + + start_main_loop(0); + + while(threads.pop(&worker)) { + worker->join(); + delete worker; + } + + return rval; +} rust_crate_cache * rust_dom::get_cache() { @@ -353,14 +396,26 @@ rust_dom::get_cache() { rust_task * rust_dom::create_task(rust_task *spawner, const char *name) { + //scheduler_lock.lock(); rust_task *task = new (this) rust_task (this, &newborn_tasks, spawner, name); DLOG(this, task, "created task: " PTR ", spawner: %s, name: %s", task, spawner ? spawner->name : "null", name); newborn_tasks.append(task); + //scheduler_lock.unlock(); return task; } +rust_dom::dom_worker::dom_worker(int id, rust_dom *owner) + : id(id), owner(owner) +{ +} + +void rust_dom::dom_worker::run() +{ + owner->start_main_loop(id); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h index 2b88a2c713f71..7f9fa7a2901ed 100644 --- a/src/rt/rust_dom.h +++ b/src/rt/rust_dom.h @@ -96,11 +96,25 @@ struct rust_dom : public kernel_owned, rc_base void reap_dead_tasks(); rust_task *schedule_task(); - int start_main_loop(); + int start_main_loop(int id); + int start_main_loops(int num_threads); void log_state(); rust_task *create_task(rust_task *spawner, const char *name); + + class dom_worker : public rust_thread { + int id; + rust_dom *owner; + + public: + dom_worker(int id, rust_dom *owner); + + virtual void run(); + }; + + lock_and_signal scheduler_lock; + array_list threads; }; inline rust_log & diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index bb8261bb78a0d..754ff1edaf13b 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -14,9 +14,9 @@ // FIXME (issue #151): This should be 0x300; the change here is for // practicality's sake until stack growth is working. -static size_t const min_stk_bytes = 0x300000; -// static size_t const min_stk_bytes = 0x10000; - +//static size_t const min_stk_bytes = 0x300000; +//static size_t const min_stk_bytes = 0x10000; +static size_t const min_stk_bytes = 0x100000; // Task stack segments. Heap allocated and chained together. @@ -70,7 +70,8 @@ rust_task::rust_task(rust_dom *dom, rust_task_list *state, list_index(-1), rendezvous_ptr(0), alarm(this), - handle(NULL) + handle(NULL), + active(false) { LOGPTR(dom, "new task", (uintptr_t)this); DLOG(dom, task, "sizeof(task) = %d (0x%x)", sizeof *this, sizeof *this); @@ -123,31 +124,31 @@ struct spawn_args { uintptr_t, uintptr_t); }; -// TODO: rewrite this in LLVM assembly so we can be sure the calling -// conventions will match. extern "C" CDECL void task_start_wrapper(spawn_args *a) { rust_task *task = a->task; int rval = 42; - // This is used by the context switching code. LLVM generates fastcall - // functions, but ucontext needs cdecl functions. This massages the - // calling conventions into the right form. a->f(&rval, task, a->a3, a->a4); - + LOG(task, task, "task exited with value %d", rval); - // TODO: the old exit glue does some magical argument copying stuff. This - // is probably still needed. + { + scoped_lock with(task->dom->scheduler_lock); + + // TODO: the old exit glue does some magical argument copying + // stuff. This is probably still needed. - // This is duplicated from upcall_exit, which is probably dead code by - // now. - LOG(task, task, "task ref_count: %d", task->ref_count); - A(task->dom, task->ref_count >= 0, - "Task ref_count should not be negative on exit!"); - task->die(); - task->notify_tasks_waiting_to_join(); + // This is duplicated from upcall_exit, which is probably dead code by + // now. + LOG(task, task, "task ref_count: %d", task->ref_count); + A(task->dom, task->ref_count >= 0, + "Task ref_count should not be negative on exit!"); + task->die(); + task->notify_tasks_waiting_to_join(); + + } task->yield(1); } @@ -158,6 +159,9 @@ rust_task::start(uintptr_t spawnee_fn, LOGPTR(dom, "from spawnee", spawnee_fn); I(dom, stk->data != NULL); + I(dom, !dom->scheduler_lock.lock_held_by_current_thread()); + + scoped_lock with(dom->scheduler_lock); char *sp = (char *)rust_sp; @@ -409,6 +413,7 @@ rust_task::free(void *p, bool is_gc) void rust_task::transition(rust_task_list *src, rust_task_list *dst) { + I(dom, dom->scheduler_lock.lock_held_by_current_thread()); DLOG(dom, task, "task %s " PTR " state change '%s' -> '%s' while in '%s'", name, (uintptr_t)this, src->name, dst->name, state->name); @@ -482,6 +487,11 @@ rust_task::get_handle() { return handle; } +bool rust_task::can_schedule() +{ + return yield_timer.has_timed_out() && !active; +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index a022a348667ab..3f9a0660300c6 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -50,6 +50,10 @@ rust_task : public maybe_proxy, rust_handle *handle; context ctx; + + // This flag indicates that a worker is either currently running the task + // or is about to run this task. + volatile bool active; // Only a pointer to 'name' is kept, so it must live as long as this task. rust_task(rust_dom *dom, @@ -111,6 +115,8 @@ rust_task : public maybe_proxy, frame_glue_fns *get_frame_glue_fns(uintptr_t fp); rust_crate_cache * get_crate_cache(); + + bool can_schedule(); }; // diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 7fb6bc4c84dc5..885f82ddac393 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -23,6 +23,7 @@ str_buf(rust_task *task, rust_str *s); extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) { + I(task->dom, false); LOG_UPCALL_ENTRY(task); task->grow(n_frame_bytes); } @@ -74,6 +75,7 @@ extern "C" CDECL rust_port* upcall_new_port(rust_task *task, size_t unit_sz) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; + scoped_lock with(dom->scheduler_lock); LOG(task, comm, "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)", (uintptr_t) task, task->name, unit_sz); return new (dom) rust_port(task, unit_sz); @@ -82,6 +84,7 @@ upcall_new_port(rust_task *task, size_t unit_sz) { extern "C" CDECL void upcall_del_port(rust_task *task, rust_port *port) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); LOG(task, comm, "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port); I(task->dom, !port->ref_count); delete port; @@ -121,6 +124,7 @@ upcall_flush_chan(rust_task *task, rust_chan *chan) { extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); LOG(task, comm, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); @@ -143,7 +147,7 @@ void upcall_del_chan(rust_task *task, rust_chan *chan) { // here is that we can get ourselves in a deadlock if the // parent task tries to join us. // - // 2. We can leave the channel in a "dormnat" state by not freeing + // 2. We can leave the channel in a "dormant" state by not freeing // it and letting the receiver task delete it for us instead. if (chan->buffer.is_empty() == false) { return; @@ -162,6 +166,7 @@ extern "C" CDECL rust_chan * upcall_clone_chan(rust_task *task, maybe_proxy *target, rust_chan *chan) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); size_t unit_sz = chan->buffer.unit_sz; maybe_proxy *port = chan->port; rust_task *target_task = NULL; @@ -203,29 +208,34 @@ upcall_sleep(rust_task *task, size_t time_in_us) { extern "C" CDECL void upcall_send(rust_task *task, rust_chan *chan, void *sptr) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); chan->send(sptr); LOG(task, comm, "=== sent data ===>"); } extern "C" CDECL void upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { - LOG_UPCALL_ENTRY(task); - LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR - ", size: 0x%" PRIxPTR ", chan_no: %d", - (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, - port->chans.length()); + { + LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); - if (port->receive(dptr)) { - return; - } + LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR + ", size: 0x%" PRIxPTR ", chan_no: %d", + (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, + port->chans.length()); + + if (port->receive(dptr)) { + return; + } - // No data was buffered on any incoming channel, so block this task - // on the port. Remember the rendezvous location so that any sender - // task can write to it before waking up this task. + // No data was buffered on any incoming channel, so block this task + // on the port. Remember the rendezvous location so that any sender + // task can write to it before waking up this task. - LOG(task, comm, "<=== waiting for rendezvous data ==="); - task->rendezvous_ptr = dptr; - task->block(port, "waiting for rendezvous data"); + LOG(task, comm, "<=== waiting for rendezvous data ==="); + task->rendezvous_ptr = dptr; + task->block(port, "waiting for rendezvous data"); + } task->yield(3); } @@ -245,6 +255,7 @@ upcall_fail(rust_task *task, extern "C" CDECL void upcall_kill(rust_task *task, maybe_proxy *target) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); if (target->is_proxy()) { notify_message:: send(notify_message::KILL, "kill", task->get_handle(), @@ -261,18 +272,22 @@ upcall_kill(rust_task *task, maybe_proxy *target) { */ extern "C" CDECL void upcall_exit(rust_task *task) { - LOG_UPCALL_ENTRY(task); - LOG(task, task, "task ref_count: %d", task->ref_count); - A(task->dom, task->ref_count >= 0, - "Task ref_count should not be negative on exit!"); - task->die(); - task->notify_tasks_waiting_to_join(); + { + LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); + LOG(task, task, "task ref_count: %d", task->ref_count); + A(task->dom, task->ref_count >= 0, + "Task ref_count should not be negative on exit!"); + task->die(); + task->notify_tasks_waiting_to_join(); + } task->yield(1); } extern "C" CDECL uintptr_t upcall_malloc(rust_task *task, size_t nbytes, type_desc *td) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); LOG(task, mem, "upcall malloc(%" PRIdPTR ", 0x%" PRIxPTR ")" @@ -293,6 +308,7 @@ upcall_malloc(rust_task *task, size_t nbytes, type_desc *td) { extern "C" CDECL void upcall_free(rust_task *task, void* ptr, uintptr_t is_gc) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); rust_dom *dom = task->dom; DLOG(dom, mem, "upcall free(0x%" PRIxPTR ", is_gc=%" PRIdPTR ")", @@ -303,6 +319,7 @@ upcall_free(rust_task *task, void* ptr, uintptr_t is_gc) { extern "C" CDECL uintptr_t upcall_mark(rust_task *task, void* ptr) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); rust_dom *dom = task->dom; if (ptr) { @@ -333,6 +350,7 @@ rust_str *make_str(rust_task *task, char const *s, size_t fill) { extern "C" CDECL rust_str * upcall_new_str(rust_task *task, char const *s, size_t fill) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); return make_str(task, s, fill); } @@ -340,6 +358,7 @@ upcall_new_str(rust_task *task, char const *s, size_t fill) { extern "C" CDECL rust_str * upcall_dup_str(rust_task *task, rust_str *str) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); return make_str(task, (char const *)str->data, str->fill); } @@ -347,6 +366,7 @@ upcall_dup_str(rust_task *task, rust_str *str) { extern "C" CDECL rust_vec * upcall_new_vec(rust_task *task, size_t fill, type_desc *td) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); rust_dom *dom = task->dom; DLOG(dom, mem, "upcall new_vec(%" PRIdPTR ")", fill); size_t alloc = next_power_of_two(sizeof(rust_vec) + fill); @@ -451,6 +471,7 @@ upcall_vec_append(rust_task *task, type_desc *t, type_desc *elem_t, rust_vec **dst_ptr, rust_vec *src, bool skip_null) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); rust_vec *dst = *dst_ptr; uintptr_t need_copy; size_t n_src_bytes = skip_null ? src->fill - 1 : src->fill; @@ -480,6 +501,7 @@ upcall_get_type_desc(rust_task *task, size_t n_descs, type_desc const **descs) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); LOG(task, cache, "upcall get_type_desc with size=%" PRIdPTR ", align=%" PRIdPTR ", %" PRIdPTR " descs", size, align, n_descs); @@ -493,6 +515,7 @@ extern "C" CDECL rust_task * upcall_new_task(rust_task *spawner, rust_vec *name) { // name is a rust string structure. LOG_UPCALL_ENTRY(spawner); + scoped_lock with(spawner->dom->scheduler_lock); rust_dom *dom = spawner->dom; rust_task *task = dom->create_task(spawner, (const char *)name->data); return task; @@ -532,6 +555,7 @@ upcall_start_task(rust_task *spawner, */ extern "C" CDECL maybe_proxy * upcall_new_thread(rust_task *task, const char *name) { + I(task->dom, false); LOG_UPCALL_ENTRY(task); rust_dom *parent_dom = task->dom; rust_kernel *kernel = parent_dom->kernel; @@ -580,6 +604,7 @@ upcall_start_thread(rust_task *task, rust_proxy *child_task_proxy, uintptr_t spawnee_fn, size_t callsz) { + I(task->dom, false); LOG_UPCALL_ENTRY(task); #if 0 rust_dom *parenet_dom = task->dom; @@ -612,6 +637,7 @@ extern "C" CDECL void upcall_ivec_resize(rust_task *task, rust_ivec *v, size_t newsz) { + scoped_lock with(task->dom->scheduler_lock); I(task->dom, !v->fill); size_t new_alloc = next_power_of_two(newsz); @@ -630,6 +656,7 @@ extern "C" CDECL void upcall_ivec_spill(rust_task *task, rust_ivec *v, size_t newsz) { + scoped_lock with(task->dom->scheduler_lock); size_t new_alloc = next_power_of_two(newsz); rust_ivec_heap *heap_part = (rust_ivec_heap *) diff --git a/src/rt/sync/lock_and_signal.cpp b/src/rt/sync/lock_and_signal.cpp index 4f2622858261b..93275ca3e07a3 100644 --- a/src/rt/sync/lock_and_signal.cpp +++ b/src/rt/sync/lock_and_signal.cpp @@ -21,7 +21,9 @@ lock_and_signal::lock_and_signal() { } #else -lock_and_signal::lock_and_signal() { +lock_and_signal::lock_and_signal() + : _locked(false) +{ CHECKED(pthread_cond_init(&_cond, NULL)); CHECKED(pthread_mutex_init(&_mutex, NULL)); } @@ -39,12 +41,16 @@ lock_and_signal::~lock_and_signal() { void lock_and_signal::lock() { #if defined(__WIN32__) EnterCriticalSection(&_cs); + _holding_thread = GetCurrentThreadId(); #else CHECKED(pthread_mutex_lock(&_mutex)); + _holding_thread = pthread_self(); #endif + _locked = true; } void lock_and_signal::unlock() { + _locked = false; #if defined(__WIN32__) LeaveCriticalSection(&_cs); #else @@ -100,6 +106,25 @@ void lock_and_signal::signal_all() { #endif } +bool lock_and_signal::lock_held_by_current_thread() +{ +#if defined(__WIN32__) + return _locked && _holding_thread == GetCurrentThreadId(); +#else + return _locked && _holding_thread == pthread_self(); +#endif +} + +scoped_lock::scoped_lock(lock_and_signal &lock) + : lock(lock) +{ + lock.lock(); +} + +scoped_lock::~scoped_lock() +{ + lock.unlock(); +} // // Local Variables: diff --git a/src/rt/sync/lock_and_signal.h b/src/rt/sync/lock_and_signal.h index 1f1cfb4124b23..608c1fcbe3e8f 100644 --- a/src/rt/sync/lock_and_signal.h +++ b/src/rt/sync/lock_and_signal.h @@ -1,3 +1,4 @@ +// -*- c++ -*- #ifndef LOCK_AND_SIGNAL_H #define LOCK_AND_SIGNAL_H @@ -5,10 +6,14 @@ class lock_and_signal { #if defined(__WIN32__) HANDLE _event; CRITICAL_SECTION _cs; + DWORD _holding_thread; #else pthread_cond_t _cond; pthread_mutex_t _mutex; + + pthread_t _holding_thread; #endif + bool _locked; public: lock_and_signal(); virtual ~lock_and_signal(); @@ -19,6 +24,16 @@ class lock_and_signal { void timed_wait(size_t timeout_in_ns); void signal(); void signal_all(); + + bool lock_held_by_current_thread(); +}; + +class scoped_lock { + lock_and_signal &lock; + +public: + scoped_lock(lock_and_signal &lock); + ~scoped_lock(); }; #endif /* LOCK_AND_SIGNAL_H */ diff --git a/src/rt/test/rust_test_runtime.cpp b/src/rt/test/rust_test_runtime.cpp index 0fed35dad8249..18a957edd603f 100644 --- a/src/rt/test/rust_test_runtime.cpp +++ b/src/rt/test/rust_test_runtime.cpp @@ -53,7 +53,7 @@ rust_task_test::worker::run() { kernel->create_domain("test"); rust_dom *domain = handle->referent(); domain->root_task->start((uintptr_t)&task_entry, (uintptr_t)NULL); - domain->start_main_loop(); + domain->start_main_loop(0); kernel->destroy_domain(domain); } diff --git a/src/rt/util/array_list.h b/src/rt/util/array_list.h index 9ad4b2080417f..495594cf7e68a 100644 --- a/src/rt/util/array_list.h +++ b/src/rt/util/array_list.h @@ -1,3 +1,4 @@ +// -*- c++ -*- #ifndef ARRAY_LIST_H #define ARRAY_LIST_H diff --git a/src/rt/util/indexed_list.h b/src/rt/util/indexed_list.h index 173e9ede2ba07..0c36604328fab 100644 --- a/src/rt/util/indexed_list.h +++ b/src/rt/util/indexed_list.h @@ -1,3 +1,4 @@ +// -*- c++ -*- #ifndef INDEXED_LIST_H #define INDEXED_LIST_H diff --git a/src/test/bench/shootout/pfib.rs b/src/test/bench/shootout/pfib.rs new file mode 100644 index 0000000000000..f7fda8b811025 --- /dev/null +++ b/src/test/bench/shootout/pfib.rs @@ -0,0 +1,41 @@ +// -*- rust -*- + +/* + A parallel version of fibonacci numbers. +*/ + +fn recv[T](&port[T] p) -> T { + let T x; + p |> x; + ret x; +} + +fn fib(int n) -> int { + fn pfib(chan[int] c, int n) { + if (n == 0) { + c <| 0; + } + else if (n <= 2) { + c <| 1; + } + else { + let port[int] p = port(); + + auto t1 = spawn pfib(chan(p), n - 1); + auto t2 = spawn pfib(chan(p), n - 2); + + c <| recv(p) + recv(p); + } + } + + let port[int] p = port(); + auto t = spawn pfib(chan(p), n); + ret recv(p); +} + +fn main() { + assert (fib(8) == 21); + assert (fib(15) == 610); + log fib(8); + log fib(15); +} diff --git a/src/test/run-pass/infinite-loops.rs b/src/test/run-pass/infinite-loops.rs new file mode 100644 index 0000000000000..be9fed9ea1c83 --- /dev/null +++ b/src/test/run-pass/infinite-loops.rs @@ -0,0 +1,29 @@ +/* + A simple way to make sure threading works. This should use all the + CPU cycles an any machines that we're likely to see for a while. +*/ + +// xfail-stage0 +// xfail-stage1 +// xfail-stage2 +// xfail-stage3 + +use std; +import std::task::join; + +fn loop(int n) { + let task t1; + let task t2; + + if(n > 0) { + t1 = spawn loop(n - 1); + t2 = spawn loop(n - 1); + } + + while(true) {} +} + +fn main() { + let task t = spawn loop(5); + join(t); +}