Skip to content

Simple M:N task to OS thread support (Issue #508) #566

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion src/rt/rust.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ command_line_args : public dom_owned<command_line_args>
}
};

int get_num_threads()
{
char *env = getenv("RUST_THREADS");
if(env) {
int num = atoi(env);
if(num > 0)
return num;
}
// TODO: in this case, determine the number of CPUs present on the
// machine.
return 1;
}

/**
* Main entry point into the Rust runtime. Here we create a Rust service,
* initialize the kernel, create the root domain and run it.
Expand All @@ -95,7 +108,11 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) {

dom->root_task->start(main_fn, (uintptr_t)args->args);

int ret = dom->start_main_loop();
int num_threads = get_num_threads();

DLOG(dom, dom, "Using %d worker threads.", num_threads);

int ret = dom->start_main_loops(num_threads);
delete args;
kernel->destroy_domain(dom);
kernel->join_all_domains();
Expand Down
5 changes: 5 additions & 0 deletions src/rt/rust_builtin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,17 @@ task_yield(rust_task *task) {

extern "C" CDECL void
task_join(rust_task *task, rust_task *join_task) {
task->dom->scheduler_lock.lock();
// If the other task is already dying, we don't have to wait for it.
if (join_task->dead() == false) {
join_task->tasks_waiting_to_join.push(task);
task->block(join_task, "joining local task");
task->dom->scheduler_lock.unlock();
task->yield(2);
}
else {
task->dom->scheduler_lock.unlock();
}
}

/* Debug builtins for std.dbg. */
Expand Down
4 changes: 4 additions & 0 deletions src/rt/rust_chan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ rust_chan::rust_chan(rust_task *task,
task(task),
port(port),
buffer(task->dom, unit_sz) {
++task->ref_count;
if (port) {
associate(port);
}
Expand All @@ -23,6 +24,7 @@ rust_chan::~rust_chan() {

A(task->dom, is_associated() == false,
"Channel must be disassociated before being freed.");
--task->ref_count;
}

/**
Expand All @@ -34,6 +36,7 @@ void rust_chan::associate(maybe_proxy<rust_port> *port) {
LOG(task, task,
"associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
this, port);
++this->ref_count;
this->port->referent()->chans.push(this);
}
}
Expand All @@ -52,6 +55,7 @@ void rust_chan::disassociate() {
LOG(task, task,
"disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
this, port->referent());
--this->ref_count;
port->referent()->chans.swap_delete(this);
}

Expand Down
95 changes: 75 additions & 20 deletions src/rt/rust_dom.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,14 @@ rust_dom::~rust_dom() {

void
rust_dom::activate(rust_task *task) {
curr_task = task;

context ctx;

task->ctx.next = &ctx;
DLOG(this, task, "descheduling...");
scheduler_lock.unlock();
task->ctx.swap(ctx);
scheduler_lock.lock();
DLOG(this, task, "task has returned");

curr_task = NULL;
}

void
Expand Down Expand Up @@ -89,6 +87,7 @@ rust_dom::malloc(size_t size, memory_region::memory_region_type type) {
} else if (type == memory_region::SYNCHRONIZED) {
return synchronized_region.malloc(size);
}
I(this, false);
return NULL;
}

Expand Down Expand Up @@ -168,9 +167,11 @@ rust_dom::number_of_live_tasks() {
*/
void
rust_dom::reap_dead_tasks() {
I(this, scheduler_lock.lock_held_by_current_thread());
for (size_t i = 0; i < dead_tasks.length(); ) {
rust_task *task = dead_tasks[i];
if (task->ref_count == 0) {
// Make sure this task isn't still running somewhere else...
if (task->ref_count == 0 && task->can_schedule()) {
I(this, task->tasks_waiting_to_join.is_empty());
dead_tasks.remove(task);
DLOG(this, task,
Expand Down Expand Up @@ -211,10 +212,13 @@ rust_dom::schedule_task() {
// FIXME: in the face of failing tasks, this is not always right.
// I(this, n_live_tasks() > 0);
if (running_tasks.length() > 0) {
size_t i = rand(&rctx);
i %= running_tasks.length();
if (running_tasks[i]->yield_timer.has_timed_out()) {
return (rust_task *)running_tasks[i];
size_t k = rand(&rctx);
// Look around for a runnable task, starting at k.
for(size_t j = 0; j < running_tasks.length(); ++j) {
size_t i = (j + k) % running_tasks.length();
if (running_tasks[i]->can_schedule()) {
return (rust_task *)running_tasks[i];
}
}
}
return NULL;
Expand Down Expand Up @@ -261,15 +265,20 @@ rust_dom::log_state() {
* drop to zero.
*/
int
rust_dom::start_main_loop() {
rust_dom::start_main_loop(int id) {
scheduler_lock.lock();

// Make sure someone is watching, to pull us out of infinite loops.
rust_timer timer(this);
//rust_timer timer(this);

DLOG(this, dom, "started domain loop");
DLOG(this, dom, "started domain loop %d", id);

while (number_of_live_tasks() > 0) {
A(this, kernel->is_deadlocked() == false, "deadlock");

DLOG(this, dom, "worker %d, number_of_live_tasks = %d",
id, number_of_live_tasks());

drain_incoming_message_queue(true);

rust_task *scheduled_task = schedule_task();
Expand All @@ -281,8 +290,11 @@ rust_dom::start_main_loop() {
if (scheduled_task == NULL) {
log_state();
DLOG(this, task,
"all tasks are blocked, scheduler yielding ...");
"all tasks are blocked, scheduler id %d yielding ...",
id);
scheduler_lock.unlock();
sync::sleep(100);
scheduler_lock.lock();
DLOG(this, task,
"scheduler resuming ...");
continue;
Expand All @@ -303,15 +315,22 @@ rust_dom::start_main_loop() {

interrupt_flag = 0;

DLOG(this, task,
"Running task %p on worker %d",
scheduled_task, id);
I(this, !scheduled_task->active);
scheduled_task->active = true;
activate(scheduled_task);
scheduled_task->active = false;

DLOG(this, task,
"returned from task %s @0x%" PRIxPTR
" in state '%s', sp=0x%" PRIxPTR,
scheduled_task->name,
(uintptr_t)scheduled_task,
scheduled_task->state->name,
scheduled_task->rust_sp);
"returned from task %s @0x%" PRIxPTR
" in state '%s', sp=0x%x, worker id=%d" PRIxPTR,
scheduled_task->name,
(uintptr_t)scheduled_task,
scheduled_task->state->name,
scheduled_task->rust_sp,
id);

/*
// These invariants are no longer valid, as rust_sp is not
Expand All @@ -334,17 +353,41 @@ rust_dom::start_main_loop() {
"scheduler yielding ...",
dead_tasks.length());
log_state();
scheduler_lock.unlock();
sync::yield();
scheduler_lock.lock();
} else {
drain_incoming_message_queue(true);
}
reap_dead_tasks();
}

DLOG(this, dom, "finished main-loop (dom.rval = %d)", rval);
DLOG(this, dom, "finished main-loop %d (dom.rval = %d)", id, rval);

scheduler_lock.unlock();
return rval;
}

int rust_dom::start_main_loops(int num_threads)
{
dom_worker *worker = NULL;

// -1, because this thread will also be a worker.
for(int i = 0; i < num_threads - 1; ++i) {
worker = new dom_worker(i + 1, this);
worker->start();
threads.push(worker);
}

start_main_loop(0);

while(threads.pop(&worker)) {
worker->join();
delete worker;
}

return rval;
}

rust_crate_cache *
rust_dom::get_cache() {
Expand All @@ -353,14 +396,26 @@ rust_dom::get_cache() {

rust_task *
rust_dom::create_task(rust_task *spawner, const char *name) {
//scheduler_lock.lock();
rust_task *task =
new (this) rust_task (this, &newborn_tasks, spawner, name);
DLOG(this, task, "created task: " PTR ", spawner: %s, name: %s",
task, spawner ? spawner->name : "null", name);
newborn_tasks.append(task);
//scheduler_lock.unlock();
return task;
}

rust_dom::dom_worker::dom_worker(int id, rust_dom *owner)
: id(id), owner(owner)
{
}

void rust_dom::dom_worker::run()
{
owner->start_main_loop(id);
}

//
// Local Variables:
// mode: C++
Expand Down
16 changes: 15 additions & 1 deletion src/rt/rust_dom.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,25 @@ struct rust_dom : public kernel_owned<rust_dom>, rc_base<rust_dom>
void reap_dead_tasks();
rust_task *schedule_task();

int start_main_loop();
int start_main_loop(int id);
int start_main_loops(int num_threads);

void log_state();

rust_task *create_task(rust_task *spawner, const char *name);

class dom_worker : public rust_thread {
int id;
rust_dom *owner;

public:
dom_worker(int id, rust_dom *owner);

virtual void run();
};

lock_and_signal scheduler_lock;
array_list<dom_worker *> threads;
};

inline rust_log &
Expand Down
Loading