diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index 0f2a6cd7ef9ef..b1227af5f4c33 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -32,6 +32,7 @@ macro_rules! rtdebug ( ($( $arg:expr),+) => ( $(let _ = $arg)*; ) ) +#[path = "sched/mod.rs"] mod sched; mod rtio; pub mod uvll; diff --git a/src/libcore/rt/sched/local.rs b/src/libcore/rt/sched/local.rs new file mode 100644 index 0000000000000..d800101111464 --- /dev/null +++ b/src/libcore/rt/sched/local.rs @@ -0,0 +1,100 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Access to the thread-local Scheduler + +use ptr::mut_null; +use libc::c_void; +use cast::transmute; + +use super::Scheduler; +use tls = super::super::thread_local_storage; +#[cfg(test)] use super::super::uvio::UvEventLoop; + +/// Give the Scheduler to thread-local storage +pub fn put(sched: ~Scheduler) { + unsafe { + let key = tls_key(); + let void_sched: *mut c_void = transmute::<~Scheduler, *mut c_void>(sched); + tls::set(key, void_sched); + } +} + +/// Take ownership of the Scheduler from thread-local storage +pub fn take() -> ~Scheduler { + unsafe { + let key = tls_key(); + let void_sched: *mut c_void = tls::get(key); + assert!(void_sched.is_not_null()); + let sched = transmute::<*mut c_void, ~Scheduler>(void_sched); + tls::set(key, mut_null()); + return sched; + } +} + +/// Borrow a mutable reference to the thread-local Scheduler +/// # Safety Note +/// Because this leaves the Scheduler in thread-local storage it is possible +/// For the Scheduler pointer to be aliased +pub unsafe fn borrow() -> &mut Scheduler { + unsafe { + let key = tls_key(); + let mut void_sched: *mut c_void = tls::get(key); + assert!(void_sched.is_not_null()); + { + let void_sched_ptr = &mut void_sched; + let sched: &mut ~Scheduler = { + transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr) + }; + let sched: &mut Scheduler = &mut **sched; + return sched; + } + } +} + +fn tls_key() -> tls::Key { + unsafe { + let key: *mut c_void = rust_get_sched_tls_key(); + let key: &mut tls::Key = transmute(key); + return *key; + } +} + +extern { + fn rust_get_sched_tls_key() -> *mut c_void; +} + +#[test] +fn thread_local_scheduler_smoke_test() { + let scheduler = ~UvEventLoop::new_scheduler(); + put(scheduler); + let _scheduler = take(); +} + +#[test] +fn thread_local_scheduler_two_instances() { + let scheduler = ~UvEventLoop::new_scheduler(); + put(scheduler); + let _scheduler = take(); + let scheduler = ~UvEventLoop::new_scheduler(); + put(scheduler); + let _scheduler = take(); +} + +#[test] +fn borrow_smoke_test() { + let scheduler = ~UvEventLoop::new_scheduler(); + put(scheduler); + unsafe { + let _scheduler = borrow(); + } + let _scheduler = take(); +} + diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched/mod.rs similarity index 50% rename from src/libcore/rt/sched.rs rename to src/libcore/rt/sched/mod.rs index 25f446fb86d19..f157e6a80e0c4 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched/mod.rs @@ -11,18 +11,18 @@ use option::*; use sys; use cast::transmute; -use libc::c_void; -use ptr::mut_null; use super::work_queue::WorkQueue; use super::stack::{StackPool, StackSegment}; -use super::rtio::{EventLoop, EventLoopObject}; +use super::rtio::{EventLoop, EventLoopObject, IoFactoryObject}; use super::context::Context; -use tls = super::thread_local_storage; #[cfg(test)] use super::uvio::UvEventLoop; #[cfg(test)] use unstable::run_in_bare_thread; #[cfg(test)] use int; +#[cfg(test)] use cell::Cell; + +mod local; /// The Scheduler is responsible for coordinating execution of Tasks /// on a single thread. When the scheduler is running it is owned by @@ -38,31 +38,25 @@ pub struct Scheduler { priv saved_context: Context, /// The currently executing task priv current_task: Option<~Task>, - /// A queue of jobs to perform immediately upon return from task - /// context to scheduler context. - /// XXX: This probably should be a single cleanup action and it - /// should run after a context switch, not on return from the - /// scheduler - priv cleanup_jobs: ~[CleanupJob] + /// An action performed after a context switch on behalf of the + /// code running before the context switch + priv cleanup_job: Option } // XXX: Some hacks to put a &fn in Scheduler without borrowck // complaining type UnsafeTaskReceiver = sys::Closure; -trait HackAroundBorrowCk { - fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self; - fn to_fn(self) -> &fn(&mut Scheduler, ~Task); +trait ClosureConverter { + fn from_fn(&fn(~Task)) -> Self; + fn to_fn(self) -> &fn(~Task); } -impl HackAroundBorrowCk for UnsafeTaskReceiver { - fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver { - unsafe { transmute(f) } - } - fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { - unsafe { transmute(self) } - } +impl ClosureConverter for UnsafeTaskReceiver { + fn from_fn(f: &fn(~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } + fn to_fn(self) -> &fn(~Task) { unsafe { transmute(self) } } } enum CleanupJob { + DoNothing, RescheduleTask(~Task), RecycleTask(~Task), GiveTask(~Task, UnsafeTaskReceiver) @@ -84,7 +78,7 @@ pub impl Scheduler { stack_pool: StackPool::new(), saved_context: Context::empty(), current_task: None, - cleanup_jobs: ~[] + cleanup_job: None } } @@ -96,39 +90,52 @@ pub impl Scheduler { assert!(!self.in_task_context()); // Give ownership of the scheduler (self) to the thread - do self.install |scheduler| { - fn run_scheduler_once() { - do Scheduler::local |scheduler| { - if scheduler.resume_task_from_queue() { - // Ok, a task ran. Nice! We'll do it again later - scheduler.event_loop.callback(run_scheduler_once); - } + local::put(self); + + let scheduler = unsafe { local::borrow() }; + fn run_scheduler_once() { + let scheduler = Scheduler::take_local(); + if scheduler.resume_task_from_queue() { + // Ok, a task ran. Nice! We'll do it again later + do Scheduler::borrow_local |scheduler| { + scheduler.event_loop.callback(run_scheduler_once); } } - - scheduler.event_loop.callback(run_scheduler_once); - scheduler.event_loop.run(); } + + scheduler.event_loop.callback(run_scheduler_once); + scheduler.event_loop.run(); + + return local::take(); } - fn install(~self, f: &fn(&mut Scheduler)) -> ~Scheduler { - let mut tlsched = ThreadLocalScheduler::new(); - tlsched.put_scheduler(self); - { - let sched = tlsched.get_scheduler(); - f(sched); + /// Get a mutable pointer to the thread-local I/O + /// # Safety Note + /// This allows other mutable aliases to the scheduler, both in the current + /// execution context and other execution contexts. + unsafe fn borrow_local_io() -> &mut IoFactoryObject { + unsafe { + let io = local::borrow().event_loop.io().unwrap(); + transmute::<&mut IoFactoryObject, &mut IoFactoryObject>(io) } - return tlsched.take_scheduler(); } - fn local(f: &fn(&mut Scheduler)) { - let mut tlsched = ThreadLocalScheduler::new(); - f(tlsched.get_scheduler()); + /// Borrow the thread-local scheduler from thread-local storage. + /// While the scheduler is borrowed it is not available in TLS. + fn borrow_local(f: &fn(&mut Scheduler)) { + let mut sched = local::take(); + f(sched); + local::put(sched); + } + + /// Take ownership of the scheduler from thread local storage + fn take_local() -> ~Scheduler { + local::take() } // * Scheduler-context operations - fn resume_task_from_queue(&mut self) -> bool { + fn resume_task_from_queue(~self) -> bool { assert!(!self.in_task_context()); let mut self = self; @@ -139,24 +146,38 @@ pub impl Scheduler { } None => { rtdebug!("no tasks in queue"); + local::put(self); return false; } } } - fn resume_task_immediately(&mut self, task: ~Task) { + fn resume_task_immediately(~self, task: ~Task) { + let mut self = self; assert!(!self.in_task_context()); rtdebug!("scheduling a task"); // Store the task in the scheduler so it can be grabbed later self.current_task = Some(task); - self.swap_in_task(); + self.enqueue_cleanup_job(DoNothing); + + local::put(self); + + // Take pointers to both the task and scheduler's saved registers. + let sched = unsafe { local::borrow() }; + let (sched_context, _, next_task_context) = sched.get_contexts(); + let next_task_context = next_task_context.unwrap(); + // Context switch to the task, restoring it's registers + // and saving the scheduler's + Context::swap(sched_context, next_task_context); + + let sched = unsafe { local::borrow() }; // The running task should have passed ownership elsewhere - assert!(self.current_task.is_none()); + assert!(sched.current_task.is_none()); // Running tasks may have asked us to do some cleanup - self.run_cleanup_jobs(); + sched.run_cleanup_job(); } @@ -164,15 +185,23 @@ pub impl Scheduler { /// Called by a running task to end execution, after which it will /// be recycled by the scheduler for reuse in a new task. - fn terminate_current_task(&mut self) { + fn terminate_current_task(~self) { + let mut self = self; assert!(self.in_task_context()); rtdebug!("ending running task"); let dead_task = self.current_task.swap_unwrap(); self.enqueue_cleanup_job(RecycleTask(dead_task)); - let dead_task = self.task_from_last_cleanup_job(); - self.swap_out_task(dead_task); + + local::put(self); + + let sched = unsafe { local::borrow() }; + let (sched_context, last_task_context, _) = sched.get_contexts(); + let last_task_context = last_task_context.unwrap(); + Context::swap(last_task_context, sched_context); + + // Control never reaches here } /// Block a running task, context switch to the scheduler, then pass the @@ -183,127 +212,120 @@ pub impl Scheduler { /// The closure here is a *stack* closure that lives in the /// running task. It gets transmuted to the scheduler's lifetime /// and called while the task is blocked. - fn block_running_task_and_then(&mut self, f: &fn(&mut Scheduler, ~Task)) { + fn deschedule_running_task_and_then(~self, f: &fn(~Task)) { + let mut self = self; assert!(self.in_task_context()); rtdebug!("blocking task"); let blocked_task = self.current_task.swap_unwrap(); - let f_fake_region = unsafe { - transmute::<&fn(&mut Scheduler, ~Task), &fn(&mut Scheduler, ~Task)>(f) - }; - let f_opaque = HackAroundBorrowCk::from_fn(f_fake_region); + let f_fake_region = unsafe { transmute::<&fn(~Task), &fn(~Task)>(f) }; + let f_opaque = ClosureConverter::from_fn(f_fake_region); self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); - let blocked_task = self.task_from_last_cleanup_job(); - self.swap_out_task(blocked_task); + local::put(self); + + let sched = unsafe { local::borrow() }; + let (sched_context, last_task_context, _) = sched.get_contexts(); + let last_task_context = last_task_context.unwrap(); + Context::swap(last_task_context, sched_context); + + // We could be executing in a different thread now + let sched = unsafe { local::borrow() }; + sched.run_cleanup_job(); } /// Switch directly to another task, without going through the scheduler. /// You would want to think hard about doing this, e.g. if there are /// pending I/O events it would be a bad idea. - fn resume_task_from_running_task_direct(&mut self, next_task: ~Task) { + fn resume_task_from_running_task_direct(~self, next_task: ~Task) { + let mut self = self; assert!(self.in_task_context()); rtdebug!("switching tasks"); let old_running_task = self.current_task.swap_unwrap(); self.enqueue_cleanup_job(RescheduleTask(old_running_task)); - let old_running_task = self.task_from_last_cleanup_job(); - self.current_task = Some(next_task); - self.swap_in_task_from_running_task(old_running_task); - } + local::put(self); - // * Context switching + let sched = unsafe { local::borrow() }; + let (_, last_task_context, next_task_context) = sched.get_contexts(); + let last_task_context = last_task_context.unwrap(); + let next_task_context = next_task_context.unwrap(); + Context::swap(last_task_context, next_task_context); - // NB: When switching to a task callers are expected to first set - // self.running_task. When switching away from a task likewise move - // out of the self.running_task - - priv fn swap_in_task(&mut self) { - // Take pointers to both the task and scheduler's saved registers. - let running_task: &~Task = self.current_task.get_ref(); - let task_context = &running_task.saved_context; - let scheduler_context = &mut self.saved_context; - - // Context switch to the task, restoring it's registers - // and saving the scheduler's - Context::swap(scheduler_context, task_context); - } - - priv fn swap_out_task(&mut self, running_task: &mut Task) { - let task_context = &mut running_task.saved_context; - let scheduler_context = &self.saved_context; - Context::swap(task_context, scheduler_context); + // We could be executing in a different thread now + let sched = unsafe { local::borrow() }; + sched.run_cleanup_job(); } - priv fn swap_in_task_from_running_task(&mut self, running_task: &mut Task) { - let running_task_context = &mut running_task.saved_context; - let next_context = &self.current_task.get_ref().saved_context; - Context::swap(running_task_context, next_context); - } - - // * Other stuff fn in_task_context(&self) -> bool { self.current_task.is_some() } fn enqueue_cleanup_job(&mut self, job: CleanupJob) { - self.cleanup_jobs.unshift(job); + assert!(self.cleanup_job.is_none()); + self.cleanup_job = Some(job); } - fn run_cleanup_jobs(&mut self) { - assert!(!self.in_task_context()); - rtdebug!("running cleanup jobs"); + fn run_cleanup_job(&mut self) { + rtdebug!("running cleanup job"); - while !self.cleanup_jobs.is_empty() { - match self.cleanup_jobs.pop() { - RescheduleTask(task) => { - // NB: Pushing to the *front* of the queue - self.task_queue.push_front(task); - } - RecycleTask(task) => task.recycle(&mut self.stack_pool), - GiveTask(task, f) => (f.to_fn())(self, task) + assert!(self.cleanup_job.is_some()); + + let cleanup_job = self.cleanup_job.swap_unwrap(); + match cleanup_job { + DoNothing => { } + RescheduleTask(task) => { + // NB: Pushing to the *front* of the queue + self.task_queue.push_front(task); } + RecycleTask(task) => task.recycle(&mut self.stack_pool), + GiveTask(task, f) => (f.to_fn())(task) } } - // XXX: Hack. This should return &'self mut but I don't know how to - // make the borrowcheck happy - #[cfg(stage0)] - fn task_from_last_cleanup_job(&mut self) -> &mut Task { - assert!(!self.cleanup_jobs.is_empty()); - let last_job: &'self mut CleanupJob = &mut self.cleanup_jobs[0]; - let last_task: &'self Task = match last_job { - &RescheduleTask(~ref task) => task, - &RecycleTask(~ref task) => task, - &GiveTask(~ref task, _) => task, - }; - // XXX: Pattern matching mutable pointers above doesn't work - // because borrowck thinks the three patterns are conflicting - // borrows - return unsafe { transmute::<&Task, &mut Task>(last_task) }; - } - - // XXX: Hack. This should return &'self mut but I don't know how to - // make the borrowcheck happy - #[cfg(stage1)] - #[cfg(stage2)] - #[cfg(stage3)] - fn task_from_last_cleanup_job<'a>(&'a mut self) -> &mut Task { - assert!(!self.cleanup_jobs.is_empty()); - let last_job: &'a mut CleanupJob = &mut self.cleanup_jobs[0]; - let last_task: &'a Task = match last_job { - &RescheduleTask(~ref task) => task, - &RecycleTask(~ref task) => task, - &GiveTask(~ref task, _) => task, + /// Get mutable references to all the contexts that may be involved in a + /// context switch. + /// + /// Returns (the scheduler context, the optional context of the + /// task in the cleanup list, the optional context of the task in + /// the current task slot). When context switching to a task, + /// callers should first arrange for that task to be located in the + /// Scheduler's current_task slot and set up the + /// post-context-switch cleanup job. + fn get_contexts<'a>(&'a mut self) -> (&'a mut Context, + Option<&'a mut Context>, + Option<&'a mut Context>) { + let last_task = match self.cleanup_job { + Some(RescheduleTask(~ref task)) | + Some(RecycleTask(~ref task)) | + Some(GiveTask(~ref task, _)) => { + Some(task) + } + Some(DoNothing) => { + None + } + None => fail!(fmt!("all context switches should have a cleanup job")) }; // XXX: Pattern matching mutable pointers above doesn't work // because borrowck thinks the three patterns are conflicting // borrows - return unsafe { transmute::<&Task, &mut Task>(last_task) }; + unsafe { + let last_task = transmute::, Option<&mut Task>>(last_task); + let last_task_context = match last_task { + Some(ref t) => Some(&mut t.saved_context), None => None + }; + let next_task_context = match self.current_task { + Some(ref mut t) => Some(&mut t.saved_context), None => None + }; + // XXX: These transmutes can be removed after snapshot + return (transmute(&mut self.saved_context), + last_task_context, + transmute(next_task_context)); + } } } @@ -333,10 +355,15 @@ pub impl Task { priv fn build_start_wrapper(start: ~fn()) -> ~fn() { // XXX: The old code didn't have this extra allocation let wrapper: ~fn() = || { + // This is the first code to execute after the initial + // context switch to the task. The previous context may + // have asked us to do some cleanup. + let sched = unsafe { local::borrow() }; + sched.run_cleanup_job(); + start(); - let mut sched = ThreadLocalScheduler::new(); - let sched = sched.get_scheduler(); + let sched = Scheduler::take_local(); sched.terminate_current_task(); }; return wrapper; @@ -352,112 +379,6 @@ pub impl Task { } } -// NB: This is a type so we can use make use of the &self region. -struct ThreadLocalScheduler(tls::Key); - -impl ThreadLocalScheduler { - fn new() -> ThreadLocalScheduler { - unsafe { - // NB: This assumes that the TLS key has been created prior. - // Currently done in rust_start. - let key: *mut c_void = rust_get_sched_tls_key(); - let key: &mut tls::Key = transmute(key); - ThreadLocalScheduler(*key) - } - } - - fn put_scheduler(&mut self, scheduler: ~Scheduler) { - unsafe { - let key = match self { &ThreadLocalScheduler(key) => key }; - let value: *mut c_void = transmute::<~Scheduler, *mut c_void>(scheduler); - tls::set(key, value); - } - } - - #[cfg(stage0)] - fn get_scheduler(&mut self) -> &'self mut Scheduler { - unsafe { - let key = match self { &ThreadLocalScheduler(key) => key }; - let mut value: *mut c_void = tls::get(key); - assert!(value.is_not_null()); - { - let value_ptr = &mut value; - let sched: &mut ~Scheduler = { - transmute::<&mut *mut c_void, &mut ~Scheduler>(value_ptr) - }; - let sched: &mut Scheduler = &mut **sched; - return sched; - } - } - } - - #[cfg(stage1)] - #[cfg(stage2)] - #[cfg(stage3)] - fn get_scheduler<'a>(&'a mut self) -> &'a mut Scheduler { - unsafe { - let key = match self { &ThreadLocalScheduler(key) => key }; - let mut value: *mut c_void = tls::get(key); - assert!(value.is_not_null()); - { - let value_ptr = &mut value; - let sched: &mut ~Scheduler = { - transmute::<&mut *mut c_void, &mut ~Scheduler>(value_ptr) - }; - let sched: &mut Scheduler = &mut **sched; - return sched; - } - } - } - - fn take_scheduler(&mut self) -> ~Scheduler { - unsafe { - let key = match self { &ThreadLocalScheduler(key) => key }; - let value: *mut c_void = tls::get(key); - assert!(value.is_not_null()); - let sched = transmute(value); - tls::set(key, mut_null()); - return sched; - } - } -} - -extern { - fn rust_get_sched_tls_key() -> *mut c_void; -} - -#[test] -fn thread_local_scheduler_smoke_test() { - let scheduler = ~UvEventLoop::new_scheduler(); - let mut tls_scheduler = ThreadLocalScheduler::new(); - tls_scheduler.put_scheduler(scheduler); - { - let _scheduler = tls_scheduler.get_scheduler(); - } - let _scheduler = tls_scheduler.take_scheduler(); -} - -#[test] -fn thread_local_scheduler_two_instances() { - let scheduler = ~UvEventLoop::new_scheduler(); - let mut tls_scheduler = ThreadLocalScheduler::new(); - tls_scheduler.put_scheduler(scheduler); - { - - let _scheduler = tls_scheduler.get_scheduler(); - } - { - let scheduler = tls_scheduler.take_scheduler(); - tls_scheduler.put_scheduler(scheduler); - } - - let mut tls_scheduler = ThreadLocalScheduler::new(); - { - let _scheduler = tls_scheduler.get_scheduler(); - } - let _scheduler = tls_scheduler.take_scheduler(); -} - #[test] fn test_simple_scheduling() { do run_in_bare_thread { @@ -502,13 +423,12 @@ fn test_swap_tasks() { let mut sched = ~UvEventLoop::new_scheduler(); let task1 = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } - do Scheduler::local |sched| { - let task2 = ~do Task::new(&mut sched.stack_pool) { - unsafe { *count_ptr = *count_ptr + 1; } - }; - // Context switch directly to the new task - sched.resume_task_from_running_task_direct(task2); - } + let mut sched = Scheduler::take_local(); + let task2 = ~do Task::new(&mut sched.stack_pool) { + unsafe { *count_ptr = *count_ptr + 1; } + }; + // Context switch directly to the new task + sched.resume_task_from_running_task_direct(task2); unsafe { *count_ptr = *count_ptr + 1; } }; sched.task_queue.push_back(task1); @@ -535,7 +455,7 @@ fn test_run_a_lot_of_tasks_queued() { assert!(count == MAX); fn run_task(count_ptr: *mut int) { - do Scheduler::local |sched| { + do Scheduler::borrow_local |sched| { let task = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; @@ -568,18 +488,17 @@ fn test_run_a_lot_of_tasks_direct() { assert!(count == MAX); fn run_task(count_ptr: *mut int) { - do Scheduler::local |sched| { - let task = ~do Task::new(&mut sched.stack_pool) { - unsafe { - *count_ptr = *count_ptr + 1; - if *count_ptr != MAX { - run_task(count_ptr); - } + let mut sched = Scheduler::take_local(); + let task = ~do Task::new(&mut sched.stack_pool) { + unsafe { + *count_ptr = *count_ptr + 1; + if *count_ptr != MAX { + run_task(count_ptr); } - }; - // Context switch directly to the new task - sched.resume_task_from_running_task_direct(task); - } + } + }; + // Context switch directly to the new task + sched.resume_task_from_running_task_direct(task); }; } } @@ -589,11 +508,13 @@ fn test_block_task() { do run_in_bare_thread { let mut sched = ~UvEventLoop::new_scheduler(); let task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::local |sched| { - assert!(sched.in_task_context()); - do sched.block_running_task_and_then() |sched, task| { + let sched = Scheduler::take_local(); + assert!(sched.in_task_context()); + do sched.deschedule_running_task_and_then() |task| { + let task = Cell(task); + do Scheduler::borrow_local |sched| { assert!(!sched.in_task_context()); - sched.task_queue.push_back(task); + sched.task_queue.push_back(task.take()); } } }; diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs index 7162ed27a9d77..fe7b0a71dbfe6 100644 --- a/src/libcore/rt/uvio.rs +++ b/src/libcore/rt/uvio.rs @@ -120,37 +120,37 @@ impl IoFactory for UvIoFactory { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - do Scheduler::local |scheduler| { - assert!(scheduler.in_task_context()); + let scheduler = Scheduler::take_local(); + assert!(scheduler.in_task_context()); - // Block this task and take ownership, switch to scheduler context - do scheduler.block_running_task_and_then |scheduler, task| { + // Block this task and take ownership, switch to scheduler context + do scheduler.deschedule_running_task_and_then |task| { - rtdebug!("connect: entered scheduler context"); + rtdebug!("connect: entered scheduler context"); + do Scheduler::borrow_local |scheduler| { assert!(!scheduler.in_task_context()); - let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); - let task_cell = Cell(task); - - // Wait for a connection - do tcp_watcher.connect(addr) |stream_watcher, status| { - rtdebug!("connect: in connect callback"); - let maybe_stream = if status.is_none() { - rtdebug!("status is none"); - Some(~UvStream(stream_watcher)) - } else { - rtdebug!("status is some"); - stream_watcher.close(||()); - None - }; - - // Store the stream in the task's stack - unsafe { (*result_cell_ptr).put_back(maybe_stream); } - - // Context switch - do Scheduler::local |scheduler| { - scheduler.resume_task_immediately(task_cell.take()); - } - } + } + let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); + let task_cell = Cell(task); + + // Wait for a connection + do tcp_watcher.connect(addr) |stream_watcher, status| { + rtdebug!("connect: in connect callback"); + let maybe_stream = if status.is_none() { + rtdebug!("status is none"); + Some(~UvStream(stream_watcher)) + } else { + rtdebug!("status is some"); + stream_watcher.close(||()); + None + }; + + // Store the stream in the task's stack + unsafe { (*result_cell_ptr).put_back(maybe_stream); } + + // Context switch + let scheduler = Scheduler::take_local(); + scheduler.resume_task_immediately(task_cell.take()); } } @@ -194,33 +194,31 @@ impl TcpListener for UvTcpListener { let server_tcp_watcher = self.watcher(); - do Scheduler::local |scheduler| { - assert!(scheduler.in_task_context()); - - do scheduler.block_running_task_and_then |_, task| { - let task_cell = Cell(task); - let mut server_tcp_watcher = server_tcp_watcher; - do server_tcp_watcher.listen |server_stream_watcher, status| { - let maybe_stream = if status.is_none() { - let mut server_stream_watcher = server_stream_watcher; - let mut loop_ = loop_from_watcher(&server_stream_watcher); - let mut client_tcp_watcher = TcpWatcher::new(&mut loop_); - let mut client_tcp_watcher = client_tcp_watcher.as_stream(); - // XXX: Need's to be surfaced in interface - server_stream_watcher.accept(client_tcp_watcher); - Some(~UvStream::new(client_tcp_watcher)) - } else { - None - }; - - unsafe { (*result_cell_ptr).put_back(maybe_stream); } - - rtdebug!("resuming task from listen"); - // Context switch - do Scheduler::local |scheduler| { - scheduler.resume_task_immediately(task_cell.take()); - } - } + let scheduler = Scheduler::take_local(); + assert!(scheduler.in_task_context()); + + do scheduler.deschedule_running_task_and_then |task| { + let task_cell = Cell(task); + let mut server_tcp_watcher = server_tcp_watcher; + do server_tcp_watcher.listen |server_stream_watcher, status| { + let maybe_stream = if status.is_none() { + let mut server_stream_watcher = server_stream_watcher; + let mut loop_ = loop_from_watcher(&server_stream_watcher); + let mut client_tcp_watcher = TcpWatcher::new(&mut loop_); + let mut client_tcp_watcher = client_tcp_watcher.as_stream(); + // XXX: Need's to be surfaced in interface + server_stream_watcher.accept(client_tcp_watcher); + Some(~UvStream::new(client_tcp_watcher)) + } else { + None + }; + + unsafe { (*result_cell_ptr).put_back(maybe_stream); } + + rtdebug!("resuming task from listen"); + // Context switch + let scheduler = Scheduler::take_local(); + scheduler.resume_task_immediately(task_cell.take()); } } @@ -259,42 +257,42 @@ impl Stream for UvStream { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - do Scheduler::local |scheduler| { - assert!(scheduler.in_task_context()); - let watcher = self.watcher(); - let buf_ptr: *&mut [u8] = &buf; - do scheduler.block_running_task_and_then |scheduler, task| { - rtdebug!("read: entered scheduler context"); + let scheduler = Scheduler::take_local(); + assert!(scheduler.in_task_context()); + let watcher = self.watcher(); + let buf_ptr: *&mut [u8] = &buf; + do scheduler.deschedule_running_task_and_then |task| { + rtdebug!("read: entered scheduler context"); + do Scheduler::borrow_local |scheduler| { assert!(!scheduler.in_task_context()); + } + let mut watcher = watcher; + let task_cell = Cell(task); + // XXX: We shouldn't reallocate these callbacks every + // call to read + let alloc: AllocCallback = |_| unsafe { + slice_to_uv_buf(*buf_ptr) + }; + do watcher.read_start(alloc) |watcher, nread, _buf, status| { + + // Stop reading so that no read callbacks are + // triggered before the user calls `read` again. + // XXX: Is there a performance impact to calling + // stop here? let mut watcher = watcher; - let task_cell = Cell(task); - // XXX: We shouldn't reallocate these callbacks every - // call to read - let alloc: AllocCallback = |_| unsafe { - slice_to_uv_buf(*buf_ptr) + watcher.read_stop(); + + let result = if status.is_none() { + assert!(nread >= 0); + Ok(nread as uint) + } else { + Err(()) }; - do watcher.read_start(alloc) |watcher, nread, _buf, status| { - - // Stop reading so that no read callbacks are - // triggered before the user calls `read` again. - // XXX: Is there a performance impact to calling - // stop here? - let mut watcher = watcher; - watcher.read_stop(); - - let result = if status.is_none() { - assert!(nread >= 0); - Ok(nread as uint) - } else { - Err(()) - }; - - unsafe { (*result_cell_ptr).put_back(result); } - - do Scheduler::local |scheduler| { - scheduler.resume_task_immediately(task_cell.take()); - } - } + + unsafe { (*result_cell_ptr).put_back(result); } + + let scheduler = Scheduler::take_local(); + scheduler.resume_task_immediately(task_cell.take()); } } @@ -305,29 +303,27 @@ impl Stream for UvStream { fn write(&mut self, buf: &[u8]) -> Result<(), ()> { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - do Scheduler::local |scheduler| { - assert!(scheduler.in_task_context()); - let watcher = self.watcher(); - let buf_ptr: *&[u8] = &buf; - do scheduler.block_running_task_and_then |_, task| { - let mut watcher = watcher; - let task_cell = Cell(task); - let buf = unsafe { &*buf_ptr }; - // XXX: OMGCOPIES - let buf = buf.to_vec(); - do watcher.write(buf) |_watcher, status| { - let result = if status.is_none() { - Ok(()) - } else { - Err(()) - }; - - unsafe { (*result_cell_ptr).put_back(result); } - - do Scheduler::local |scheduler| { - scheduler.resume_task_immediately(task_cell.take()); - } - } + let scheduler = Scheduler::take_local(); + assert!(scheduler.in_task_context()); + let watcher = self.watcher(); + let buf_ptr: *&[u8] = &buf; + do scheduler.deschedule_running_task_and_then |task| { + let mut watcher = watcher; + let task_cell = Cell(task); + let buf = unsafe { &*buf_ptr }; + // XXX: OMGCOPIES + let buf = buf.to_vec(); + do watcher.write(buf) |_watcher, status| { + let result = if status.is_none() { + Ok(()) + } else { + Err(()) + }; + + unsafe { (*result_cell_ptr).put_back(result); } + + let scheduler = Scheduler::take_local(); + scheduler.resume_task_immediately(task_cell.take()); } } @@ -342,12 +338,10 @@ fn test_simple_io_no_connect() { do run_in_bare_thread { let mut sched = ~UvEventLoop::new_scheduler(); let task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::local |sched| { - let io = sched.event_loop.io().unwrap(); - let addr = Ipv4(127, 0, 0, 1, 2926); - let maybe_chan = io.connect(addr); - assert!(maybe_chan.is_none()); - } + let io = unsafe { Scheduler::borrow_local_io() }; + let addr = Ipv4(127, 0, 0, 1, 2926); + let maybe_chan = io.connect(addr); + assert!(maybe_chan.is_none()); }; sched.task_queue.push_back(task); sched.run(); @@ -362,29 +356,25 @@ fn test_simple_tcp_server_and_client() { let addr = Ipv4(127, 0, 0, 1, 2929); let client_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::local |sched| { - let io = sched.event_loop.io().unwrap(); - let mut stream = io.connect(addr).unwrap(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.close(); - } + let io = unsafe { Scheduler::borrow_local_io() }; + let mut stream = io.connect(addr).unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.close(); }; let server_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::local |sched| { - let io = sched.event_loop.io().unwrap(); - let mut listener = io.bind(addr).unwrap(); - let mut stream = listener.listen().unwrap(); - let mut buf = [0, .. 2048]; - let nread = stream.read(buf).unwrap(); - assert!(nread == 8); - for uint::range(0, nread) |i| { - rtdebug!("%u", buf[i] as uint); - assert!(buf[i] == i as u8); - } - stream.close(); - listener.close(); + let io = unsafe { Scheduler::borrow_local_io() }; + let mut listener = io.bind(addr).unwrap(); + let mut stream = listener.listen().unwrap(); + let mut buf = [0, .. 2048]; + let nread = stream.read(buf).unwrap(); + assert!(nread == 8); + for uint::range(0, nread) |i| { + rtdebug!("%u", buf[i] as uint); + assert!(buf[i] == i as u8); } + stream.close(); + listener.close(); }; // Start the server first so it listens before the client connects @@ -401,53 +391,51 @@ fn test_read_and_block() { let addr = Ipv4(127, 0, 0, 1, 2930); let client_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::local |sched| { - let io = sched.event_loop.io().unwrap(); - let mut stream = io.connect(addr).unwrap(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.close(); - } + let io = unsafe { Scheduler::borrow_local_io() }; + let mut stream = io.connect(addr).unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.close(); }; let server_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::local |sched| { - let io = sched.event_loop.io().unwrap(); - let mut listener = io.bind(addr).unwrap(); - let mut stream = listener.listen().unwrap(); - let mut buf = [0, .. 2048]; - - let expected = 32; - let mut current = 0; - let mut reads = 0; - - while current < expected { - let nread = stream.read(buf).unwrap(); - for uint::range(0, nread) |i| { - let val = buf[i] as uint; - assert!(val == current % 8); - current += 1; - } - reads += 1; - - do Scheduler::local |scheduler| { - // Yield to the other task in hopes that it - // will trigger a read callback while we are - // not ready for it - do scheduler.block_running_task_and_then |scheduler, task| { - scheduler.task_queue.push_back(task); - } + let io = unsafe { Scheduler::borrow_local_io() }; + let mut listener = io.bind(addr).unwrap(); + let mut stream = listener.listen().unwrap(); + let mut buf = [0, .. 2048]; + + let expected = 32; + let mut current = 0; + let mut reads = 0; + + while current < expected { + let nread = stream.read(buf).unwrap(); + for uint::range(0, nread) |i| { + let val = buf[i] as uint; + assert!(val == current % 8); + current += 1; + } + reads += 1; + + let scheduler = Scheduler::take_local(); + // Yield to the other task in hopes that it + // will trigger a read callback while we are + // not ready for it + do scheduler.deschedule_running_task_and_then |task| { + let task = Cell(task); + do Scheduler::borrow_local |scheduler| { + scheduler.task_queue.push_back(task.take()); } } + } - // Make sure we had multiple reads - assert!(reads > 1); + // Make sure we had multiple reads + assert!(reads > 1); - stream.close(); - listener.close(); - } + stream.close(); + listener.close(); }; // Start the server first so it listens before the client connects @@ -464,19 +452,17 @@ fn test_read_read_read() { let addr = Ipv4(127, 0, 0, 1, 2931); let client_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::local |sched| { - let io = sched.event_loop.io().unwrap(); - let mut stream = io.connect(addr).unwrap(); - let mut buf = [0, .. 2048]; - let mut total_bytes_read = 0; - while total_bytes_read < 500000000 { - let nread = stream.read(buf).unwrap(); - rtdebug!("read %u bytes", nread as uint); - total_bytes_read += nread; - } - rtdebug_!("read %u bytes total", total_bytes_read as uint); - stream.close(); + let io = unsafe { Scheduler::borrow_local_io() }; + let mut stream = io.connect(addr).unwrap(); + let mut buf = [0, .. 2048]; + let mut total_bytes_read = 0; + while total_bytes_read < 500000000 { + let nread = stream.read(buf).unwrap(); + rtdebug!("read %u bytes", nread as uint); + total_bytes_read += nread; } + rtdebug_!("read %u bytes total", total_bytes_read as uint); + stream.close(); }; sched.task_queue.push_back(client_task);