diff --git a/src/libstd/rt/io/timer.rs b/src/libstd/rt/io/timer.rs index 19b33feacbd86..2494238c38042 100644 --- a/src/libstd/rt/io/timer.rs +++ b/src/libstd/rt/io/timer.rs @@ -8,12 +8,17 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use comm; +use kinds::Send; use option::{Option, Some, None}; use result::{Ok, Err}; use rt::io::{io_error}; use rt::rtio::{IoFactory, IoFactoryObject, RtioTimer, RtioTimerObject}; use rt::local::Local; +use rt::select::SelectInner; +use rt::sched::Scheduler; +use rt::kill::BlockedTask; pub struct Timer(~RtioTimerObject); @@ -24,6 +29,13 @@ pub fn sleep(msecs: u64) { timer.sleep(msecs) } +/// Sleep the current task for `msecs` milliseconds. +pub fn sleep_uv(msecs: u64) { + let mut timer = Timer::new().expect("timer::sleep: could not create a Timer"); + + timer.sleep_uv(msecs) +} + impl Timer { pub fn new() -> Option { @@ -46,24 +58,139 @@ impl Timer { pub fn sleep(&mut self, msecs: u64) { (**self).sleep(msecs); } + + pub fn sleep_uv(&mut self, msecs: u64) { + (**self).sleep_uv(msecs, true); + } +} + +impl SelectInner for Timer { + + fn optimistic_check(&mut self) -> bool { + (**self).optimistic_check() + } + + fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { + (**self).block_on(sched, task) + } + + fn unblock_from(&mut self) -> bool {false} +} + +trait TimedPort { + + /** + * This implementation adds the required + * API for recv_timeout with an approximate + * but not safe behavior. + * + * Current implementations of this Trait for + * both PortOne and Port poll on the port every + * second to check for new messages. A correct + * implementation for recv_timeout should implement + * `SelectInner` and Select for UvTimer and re-write + * the sleep method around that. + * + * FIXME: (flaper87) #9195 + */ + fn recv_timeout(self, msecs: u64) -> Option; +} + +impl TimedPort for comm::PortOne { + + fn recv_timeout(self, msecs: u64) -> Option { + let mut tout = msecs; + let mut timer = Timer::new().unwrap(); + + while tout > 0 { + if self.peek() { return Some(self.recv()); } + timer.sleep(1000); + tout -= 1000; + } + + None + } +} + + +impl TimedPort for comm::Port { + + fn recv_timeout(self, msecs: u64) -> Option { + let mut tout = msecs; + let mut timer = Timer::new().unwrap(); + + while tout > 0 { + if self.peek() { return Some(self.recv()); } + timer.sleep_uv(1000); + tout -= 1000; + } + + None + } } #[cfg(test)] mod test { use super::*; use rt::test::*; + use task; + use comm; + #[test] fn test_io_timer_sleep_simple() { do run_in_mt_newsched_task { let timer = Timer::new(); - do timer.map_move |mut t| { t.sleep(1) }; + do timer.map_move |mut t| { t.sleep(1000) }; } } #[test] fn test_io_timer_sleep_standalone() { do run_in_mt_newsched_task { - sleep(1) + sleep(1000) + } + } + + #[test] + fn test_io_timer_sleep_uv_simple() { + do run_in_mt_newsched_task { + let timer = Timer::new(); + do timer.map_move |mut t| { t.sleep_uv(1) }; + } + } + + #[test] + fn test_io_timer_sleep_uv_standalone() { + do run_in_mt_newsched_task { + sleep_uv(1) + } + } + + #[test] + fn test_recv_timeout() { + do run_in_newsched_task { + let (p, c) = comm::stream::(); + do task::spawn { + let mut t = Timer::new().unwrap(); + t.sleep_uv(1000); + c.send(1); + } + + assert!(p.recv_timeout(2000).unwrap() == 1); + } + } + + #[test] + fn test_recv_timeout_expire() { + do run_in_newsched_task { + let (p, c) = comm::stream::(); + do task::spawn { + let mut t = Timer::new().unwrap(); + t.sleep_uv(3000); + c.send(1); + } + + assert!(p.recv_timeout(1000).is_none()); } } } diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index ca521c792dc73..e8a76358680a2 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -132,6 +132,8 @@ pub trait RtioUdpSocket : RtioSocket { pub trait RtioTimer { fn sleep(&mut self, msecs: u64); + fn sleep_uv(&mut self, msecs: u64, deschedule: bool); + fn sleep_then(&mut self, msecs: u64, cb: &fn(&mut Self)); } pub trait RtioFileStream { diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index f9b71db704347..a45a937150ae8 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -19,20 +19,22 @@ use option::*; use ptr; use str; use result::*; +use rt::kill::BlockedTask; use rt::io::IoError; use rt::io::net::ip::{SocketAddr, IpAddr}; use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd}; use rt::io::process::ProcessConfig; -use rt::kill::BlockedTask; use rt::local::Local; use rt::rtio::*; use rt::sched::{Scheduler, SchedHandle}; +use rt::select::SelectInner; use rt::tube::Tube; use rt::task::SchedHome; use rt::uv::*; use rt::uv::idle::IdleWatcher; use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr, accum_sockaddrs}; use rt::uv::addrinfo::GetAddrInfoRequest; +use unstable::atomics::{AtomicUint, Acquire, SeqCst}; use unstable::sync::Exclusive; use path::Path; use super::super::io::support::PathLike; @@ -41,6 +43,8 @@ use libc::{lseek, off_t, O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create, CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite, FileStat}; + +use select::Select; use task; #[cfg(test)] use container::Container; @@ -1346,9 +1350,17 @@ impl RtioUdpSocket for UvUdpSocket { } } +static TIMER_NOT_RUNNING: uint = 0; +static TIMER_RUNNING: uint = 0; +static TIMER_FINISHED: uint = 0; + + pub struct UvTimer { watcher: timer::TimerWatcher, home: SchedHandle, + + // Holds the timer state + priv state: AtomicUint, } impl HomingIO for UvTimer { @@ -1357,14 +1369,16 @@ impl HomingIO for UvTimer { impl UvTimer { fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer { - UvTimer { watcher: w, home: home } + UvTimer { watcher: w, home: home, + state: AtomicUint::new(TIMER_NOT_RUNNING)} } } impl Drop for UvTimer { fn drop(&mut self) { + rtdebug!("UvTimer: dropping timer"); do self.home_for_io_with_sched |self_, scheduler| { - rtdebug!("closing UvTimer"); + rtdebug!("UvTimer: closing timer"); do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); do self_.watcher.close { @@ -1377,22 +1391,120 @@ impl Drop for UvTimer { } impl RtioTimer for UvTimer { + fn sleep(&mut self, msecs: u64) { do self.home_for_io_with_sched |self_, scheduler| { do scheduler.deschedule_running_task_and_then |_sched, task| { - rtdebug!("sleep: entered scheduler context"); + rtdebug!("UvTimer: entered scheduler context"); + let task_cell = Cell::new(task); + let state_cell = Cell::new((*self).state); do self_.watcher.start(msecs, 0) |_, status| { assert!(status.is_none()); - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + + state_cell.take().swap(TIMER_FINISHED, SeqCst); + do Local::borrow |sched: &mut Scheduler| { + sched.enqueue_blocked_task(task_cell.take()); + } } } + + self_.watcher.stop(); + } + } + + // NOTE(flaper87) This will be removed + fn sleep_uv(&mut self, _msecs: u64, _deschedule: bool) {} + + + /// This method starts the timer and then calls the provided + /// callback. The callback is responsible for blocking - if + /// needed - and wait for the timer to complete. + /// + /// For example: + /// do t.sleep_then(1000) |timer| { + /// let scheduler: ~Scheduler = Local::take(); + /// do scheduler.deschedule_running_task_and_then |sched, task| { + /// timer.block_on(sched, task); + /// }; + /// } + /// + fn sleep_then(&mut self, msecs: u64, cb: &fn(&mut UvTimer)) { + + do self.home_for_io_with_sched |self_, _| { + rtdebug!("UvTimer: About to sleep"); + + let state_cell = Cell::new((*self).state); + do self_.watcher.start(msecs, 0) |_, status| { + rtdebug!("UvTimer: Timer Callback"); + assert!(status.is_none()); + + let oldstate = state_cell.take().swap(TIMER_FINISHED, SeqCst); + unsafe { + match oldstate { + TIMER_RUNNING => { + rtdebug!("No task to wake up"); + } + task_as_state => { + do Local::borrow |sched: &mut Scheduler| { + rtdebug!("UvTimer: Waking up task: {}", task_as_state); + let task = BlockedTask::cast_from_uint(task_as_state); + sched.enqueue_blocked_task(task); + } + } + } + } + } + + rtdebug!("UvTimer: Sleep callback"); + cb(self_); + + rtdebug!("UvTimer: Stop the timer"); self_.watcher.stop(); } } } +impl SelectInner for UvTimer { + fn optimistic_check(&mut self) -> bool { + self.state.load(Acquire) == TIMER_FINISHED + } + + fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { + + unsafe { + let task_as_state = task.cast_to_uint(); + let oldstate = (*self).state.compare_and_swap(TIMER_NOT_RUNNING, + task_as_state, SeqCst); + + match oldstate { + TIMER_FINISHED => { + rtdebug!("UvTimer: Timer ran out"); + + let _task = BlockedTask::cast_from_uint(task_as_state); + sched.enqueue_blocked_task(_task); + true + } + _ => { + rtdebug!("UvTimer: Timer is still running or not running yet."); + false + } + } + } + } + + fn unblock_from(&mut self) -> bool { + rtdebug!("timer: unblock_from"); + + // Not much to do here. No tasks to + // be rescheduled, nor memory to be + // freed. Just return the timer state. + self.optimistic_check() + } +} + +impl Select for UvTimer { } + pub struct UvFileStream { loop_: Loop, fd: c_int, @@ -2154,12 +2266,33 @@ fn test_udp_many_read() { } #[test] -fn test_timer_sleep_simple() { +fn test_timer_sleep_uv_simple() { do run_in_mt_newsched_task { unsafe { let io: *mut IoFactoryObject = Local::unsafe_borrow(); let timer = (*io).timer_init(); - do timer.map_move |mut t| { t.sleep(1) }; + do timer.map_move |mut t| { t.sleep_uv(1, true) }; + } + } +} + +#[test] +fn test_timer_sleep_uv_no_deschedule() { + do run_in_mt_newsched_task { + unsafe { + let io: *mut IoFactoryObject = Local::unsafe_borrow(); + let timer = (*io).timer_init(); + do timer.map_move |mut t| { + rtdebug!("Before Sleep"); + do t.sleep_then(1000) |self_| { + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |sched, task| { + rtdebug!("Descheduling test task"); + self_.block_on(sched, task); + }; + } + rtdebug!("After Sleep"); + }; } } }