Skip to content

Add support for multiple schedulers #1781

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed
171 changes: 166 additions & 5 deletions src/libcore/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export get_task;
export spawn;
export spawn_joinable;
export spawn_connected;
export spawn_sched;
export connected_fn;
export connected_task;
export currently_unwinding;
Expand All @@ -62,10 +63,15 @@ type rust_closure = {
#[link_name = "rustrt"]
#[abi = "cdecl"]
native mod rustrt {
fn rust_get_sched_id() -> sched_id;
fn rust_new_sched(num_threads: c::uintptr_t) -> sched_id;

fn get_task_id() -> task_id;
fn rust_get_task() -> *rust_task;

fn new_task() -> task_id;
fn rust_new_task_in_sched(id: sched_id) -> task_id;

fn drop_task(task_id: *rust_task);
fn get_task_pointer(id: task_id) -> *rust_task;

Expand All @@ -85,6 +91,7 @@ type rust_task =

resource rust_task_ptr(task: *rust_task) { rustrt::drop_task(task); }

type sched_id = int;
type task_id = int;

/*
Expand All @@ -111,14 +118,17 @@ Returns:
A handle to the new task
*/
fn spawn(+f: fn~()) -> task {
spawn_inner(f, none)
spawn_inner(f, none, new_task_in_this_sched)
}

fn spawn_inner(-f: fn~(),
notify: option<comm::chan<task_notification>>) -> task unsafe {
fn spawn_inner(
-f: fn~(),
notify: option<comm::chan<task_notification>>,
new_task: fn() -> task_id
) -> task unsafe {
let closure: *rust_closure = unsafe::reinterpret_cast(ptr::addr_of(f));
#debug("spawn: closure={%x,%x}", (*closure).fnptr, (*closure).envptr);
let id = rustrt::new_task();
let id = new_task();

// set up notifications if they are enabled.
option::may(notify) {|c|
Expand All @@ -132,6 +142,39 @@ fn spawn_inner(-f: fn~(),
ret id;
}

fn new_task_in_this_sched() -> task_id {
rustrt::new_task()
}

fn new_task_in_new_sched(num_threads: uint) -> task_id {
let sched_id = rustrt::rust_new_sched(num_threads);
rustrt::rust_new_task_in_sched(sched_id)
}

/*
Function: spawn_sched

Creates a new scheduler and executes a task on it. Tasks subsequently
spawned by that task will also execute on the new scheduler. When
there are no more tasks to execute the scheduler terminates.

Arguments:

num_threads - The number of OS threads to dedicate schedule tasks on
f - A unique closure to execute as a task on the new scheduler

Failure:

The number of threads must be greater than 0

*/
fn spawn_sched(num_threads: uint, +f: fn~()) -> task {
if num_threads < 1u {
fail "Can not create a scheduler with no threads";
}
spawn_inner(f, none, bind new_task_in_new_sched(num_threads))
}

/*
Type: joinable_task

Expand All @@ -142,7 +185,7 @@ type joinable_task = (task, comm::port<task_notification>);
fn spawn_joinable(+f: fn~()) -> joinable_task {
let notify_port = comm::port();
let notify_chan = comm::chan(notify_port);
let task = spawn_inner(f, some(notify_chan));
let task = spawn_inner(f, some(notify_chan), new_task_in_this_sched);
ret (task, notify_port);
/*
resource notify_rsrc(data: (comm::chan<task_notification>,
Expand Down Expand Up @@ -411,6 +454,124 @@ mod tests {
_ { fail; }
}
}

#[test]
#[should_fail]
#[ignore(cfg(target_os = "win32"))]
fn spawn_sched_no_threads() {
spawn_sched(0u) {|| };
}

#[test]
fn spawn_sched_1() {
let po = comm::port();
let ch = comm::chan(po);

fn f(i: int, ch: comm::chan<()>) {
let parent_sched_id = rustrt::rust_get_sched_id();

spawn_sched(1u) {||
let child_sched_id = rustrt::rust_get_sched_id();
assert parent_sched_id != child_sched_id;

if (i == 0) {
comm::send(ch, ());
} else {
f(i - 1, ch);
}
};

}
f(10, ch);
comm::recv(po);
}

#[test]
fn spawn_sched_childs_on_same_sched() {
let po = comm::port();
let ch = comm::chan(po);

spawn_sched(1u) {||
let parent_sched_id = rustrt::rust_get_sched_id();
spawn {||
let child_sched_id = rustrt::rust_get_sched_id();
// This should be on the same scheduler
assert parent_sched_id == child_sched_id;
comm::send(ch, ());
};
};

comm::recv(po);
}

#[nolink]
native mod rt {
fn rust_dbg_lock_create() -> *ctypes::void;
fn rust_dbg_lock_destroy(lock: *ctypes::void);
fn rust_dbg_lock_lock(lock: *ctypes::void);
fn rust_dbg_lock_unlock(lock: *ctypes::void);
fn rust_dbg_lock_wait(lock: *ctypes::void);
fn rust_dbg_lock_signal(lock: *ctypes::void);
}

#[test]
fn spawn_sched_blocking() {

// Testing that a task in one scheduler can block natively
// without affecting other schedulers
iter::repeat(20u) {||

let start_po = comm::port();
let start_ch = comm::chan(start_po);
let fin_po = comm::port();
let fin_ch = comm::chan(fin_po);

let lock = rt::rust_dbg_lock_create();

spawn_sched(1u) {||
rt::rust_dbg_lock_lock(lock);

comm::send(start_ch, ());

// Block the scheduler thread
rt::rust_dbg_lock_wait(lock);
rt::rust_dbg_lock_unlock(lock);

comm::send(fin_ch, ());
};

// Wait until the other task has its lock
comm::recv(start_po);

fn pingpong(po: comm::port<int>, ch: comm::chan<int>) {
let val = 20;
while val > 0 {
val = comm::recv(po);
comm::send(ch, val - 1);
}
}

let setup_po = comm::port();
let setup_ch = comm::chan(setup_po);
let parent_po = comm::port();
let parent_ch = comm::chan(parent_po);
spawn {||
let child_po = comm::port();
comm::send(setup_ch, comm::chan(child_po));
pingpong(child_po, parent_ch);
};

let child_ch = comm::recv(setup_po);
comm::send(child_ch, 20);
pingpong(parent_po, child_ch);
rt::rust_dbg_lock_lock(lock);
rt::rust_dbg_lock_signal(lock);
rt::rust_dbg_lock_unlock(lock);
comm::recv(fin_po);
rt::rust_dbg_lock_destroy(lock);
}
}

}


Expand Down
7 changes: 4 additions & 3 deletions src/rt/rust.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) {
check_claims = env->check_claims;

rust_srv *srv = new rust_srv(env);
rust_kernel *kernel = new rust_kernel(srv, env->num_sched_threads);
rust_scheduler *sched = kernel->get_default_scheduler();
rust_kernel *kernel = new rust_kernel(srv);
rust_sched_id sched_id = kernel->create_scheduler(env->num_sched_threads);
rust_scheduler *sched = kernel->get_scheduler_by_id(sched_id);
rust_task_id root_id = sched->create_task(NULL, "main", MAIN_STACK_SIZE);
rust_task *root_task = kernel->get_task_by_id(root_id);
I(kernel, root_task != NULL);
Expand All @@ -98,7 +99,7 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) {
root_task->deref();
root_task = NULL;

int ret = kernel->start_schedulers();
int ret = kernel->wait_for_schedulers();
delete args;
delete kernel;
delete srv;
Expand Down
69 changes: 68 additions & 1 deletion src/rt/rust_builtin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,16 +431,43 @@ nano_time(uint64_t *ns) {
*ns = t.time_ns();
}

extern "C" CDECL rust_sched_id
rust_get_sched_id() {
rust_task *task = rust_task_thread::get_task();
return task->sched->get_id();
}

extern "C" CDECL rust_sched_id
rust_new_sched(uintptr_t threads) {
rust_task *task = rust_task_thread::get_task();
A(task->thread, threads > 0,
"Can't create a scheduler with no threads, silly!");
return task->kernel->create_scheduler(threads);
}

extern "C" CDECL rust_task_id
get_task_id() {
rust_task *task = rust_task_thread::get_task();
return task->user.id;
}

static rust_task_id
new_task_common(rust_scheduler *sched, rust_task *parent) {
return sched->create_task(parent, NULL);
}

extern "C" CDECL rust_task_id
new_task() {
rust_task *task = rust_task_thread::get_task();
return task->sched->create_task(task, NULL);
return new_task_common(task->sched, task);
}

extern "C" CDECL rust_task_id
rust_new_task_in_sched(rust_sched_id id) {
rust_task *task = rust_task_thread::get_task();
rust_scheduler *sched = task->kernel->get_scheduler_by_id(id);
// FIXME: What if we didn't get the scheduler?
return new_task_common(sched, task);
}

extern "C" CDECL void
Expand Down Expand Up @@ -599,6 +626,46 @@ rust_log_console_off() {
log_console_off(task->kernel->env);
}

extern "C" CDECL lock_and_signal *
rust_dbg_lock_create() {
return new lock_and_signal();
}

extern "C" CDECL void
rust_dbg_lock_destroy(lock_and_signal *lock) {
rust_task *task = rust_task_thread::get_task();
I(task->thread, lock);
delete lock;
}

extern "C" CDECL void
rust_dbg_lock_lock(lock_and_signal *lock) {
rust_task *task = rust_task_thread::get_task();
I(task->thread, lock);
lock->lock();
}

extern "C" CDECL void
rust_dbg_lock_unlock(lock_and_signal *lock) {
rust_task *task = rust_task_thread::get_task();
I(task->thread, lock);
lock->unlock();
}

extern "C" CDECL void
rust_dbg_lock_wait(lock_and_signal *lock) {
rust_task *task = rust_task_thread::get_task();
I(task->thread, lock);
lock->wait();
}

extern "C" CDECL void
rust_dbg_lock_signal(lock_and_signal *lock) {
rust_task *task = rust_task_thread::get_task();
I(task->thread, lock);
lock->signal();
}

//
// Local Variables:
// mode: C++
Expand Down
18 changes: 9 additions & 9 deletions src/rt/rust_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct stk_seg;
struct type_desc;
struct frame_glue_fns;

typedef intptr_t rust_sched_id;
typedef intptr_t rust_task_id;
typedef intptr_t rust_port_id;

Expand Down Expand Up @@ -103,15 +104,14 @@ static size_t const BUF_BYTES = 2048;
void ref() { ++ref_count; } \
void deref() { if (--ref_count == 0) { dtor; } }

#define RUST_ATOMIC_REFCOUNT() \
private: \
intptr_t ref_count; \
public: \
void ref() { \
intptr_t old = sync::increment(ref_count); \
assert(old > 0); \
} \
void deref() { if(0 == sync::decrement(ref_count)) { delete this; } }
#define RUST_ATOMIC_REFCOUNT() \
public: \
intptr_t ref_count; \
void ref() { \
intptr_t old = sync::increment(ref_count); \
assert(old > 0); \
} \
void deref() { if(0 == sync::decrement(ref_count)) { delete_this(); } }

template <typename T> struct task_owned {
inline void *operator new(size_t size, rust_task *task, const char *tag);
Expand Down
Loading