diff --git a/src/libstd/rt/io/net/ip.rs b/src/libstd/rt/io/net/ip.rs index 77176088801de..3b3ea80eafa8f 100644 --- a/src/libstd/rt/io/net/ip.rs +++ b/src/libstd/rt/io/net/ip.rs @@ -17,7 +17,7 @@ use option::{Option, None, Some}; type Port = u16; -#[deriving(Eq, TotalEq)] +#[deriving(Eq, TotalEq, Clone)] pub enum IpAddr { Ipv4Addr(u8, u8, u8, u8), Ipv6Addr(u16, u16, u16, u16, u16, u16, u16, u16) @@ -62,7 +62,7 @@ impl ToStr for IpAddr { } } -#[deriving(Eq, TotalEq)] +#[deriving(Eq, TotalEq, Clone)] pub struct SocketAddr { ip: IpAddr, port: Port, diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index 27222542e087d..746fa5668a5f0 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -88,9 +88,7 @@ impl Writer for TcpStream { fn write(&mut self, buf: &[u8]) { match (**self).write(buf) { Ok(_) => (), - Err(ioerr) => { - io_error::cond.raise(ioerr); - } + Err(ioerr) => io_error::cond.raise(ioerr), } } @@ -129,9 +127,7 @@ impl TcpListener { impl Listener for TcpListener { fn accept(&mut self) -> Option { match (**self).accept() { - Ok(s) => { - Some(TcpStream::new(s)) - } + Ok(s) => Some(TcpStream::new(s)), Err(ioerr) => { io_error::cond.raise(ioerr); return None; diff --git a/src/libstd/rt/io/timer.rs b/src/libstd/rt/io/timer.rs index c7820ebf6238b..bfd1ed48ac180 100644 --- a/src/libstd/rt/io/timer.rs +++ b/src/libstd/rt/io/timer.rs @@ -41,7 +41,7 @@ impl Timer { } impl RtioTimer for Timer { - fn sleep(&self, msecs: u64) { + fn sleep(&mut self, msecs: u64) { (**self).sleep(msecs); } } @@ -50,15 +50,11 @@ impl RtioTimer for Timer { mod test { use super::*; use rt::test::*; - use option::{Some, None}; #[test] fn test_io_timer_sleep_simple() { do run_in_newsched_task { let timer = Timer::new(); - match timer { - Some(t) => t.sleep(1), - None => assert!(false) - } + do timer.map_move |mut t| { t.sleep(1) }; } } -} \ No newline at end of file +} diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index a7c794fb5f142..36eb37a3630fb 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -88,5 +88,5 @@ pub trait RtioUdpSocket : RtioSocket { } pub trait RtioTimer { - fn sleep(&self, msecs: u64); + fn sleep(&mut self, msecs: u64); } diff --git a/src/libstd/rt/uv/net.rs b/src/libstd/rt/uv/net.rs index c8b3d41a78d79..e8d0296e543a4 100644 --- a/src/libstd/rt/uv/net.rs +++ b/src/libstd/rt/uv/net.rs @@ -190,9 +190,10 @@ impl StreamWatcher { extern fn close_cb(handle: *uvll::uv_stream_t) { let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); - stream_watcher.get_watcher_data().close_cb.take_unwrap()(); + let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap(); stream_watcher.drop_watcher_data(); unsafe { free_handle(handle as *c_void) } + cb(); } } } @@ -411,9 +412,10 @@ impl UdpWatcher { extern fn close_cb(handle: *uvll::uv_udp_t) { let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle); - udp_watcher.get_watcher_data().close_cb.take_unwrap()(); + let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap(); udp_watcher.drop_watcher_data(); unsafe { free_handle(handle as *c_void) } + cb(); } } } diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index d4794da9b0f28..1250a4512f713 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -23,7 +23,7 @@ use rt::io::net::ip::{SocketAddr, IpAddr}; use rt::io::{standard_error, OtherIoError}; use rt::local::Local; use rt::rtio::*; -use rt::sched::Scheduler; +use rt::sched::{Scheduler, SchedHandle}; use rt::tube::Tube; use rt::uv::*; use rt::uv::idle::IdleWatcher; @@ -37,6 +37,49 @@ use unstable::sync::Exclusive; run_in_newsched_task}; #[cfg(test)] use iterator::{Iterator, range}; +// XXX we should not be calling uvll functions in here. + +trait HomingIO { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle; + /* XXX This will move pinned tasks to do IO on the proper scheduler + * and then move them back to their home. + */ + fn home_for_io(&mut self, io: &fn(&mut Self) -> A) -> A { + use rt::sched::{PinnedTask, TaskFromFriend}; + // go home + let old_home = Cell::new_empty(); + let old_home_ptr = &old_home; + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + // get the old home first + do task.wake().map_move |mut task| { + old_home_ptr.put_back(task.take_unwrap_home()); + self.home().send(PinnedTask(task)); + }; + } + + // do IO + let a = io(self); + + // unhome home + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |scheduler, task| { + do task.wake().map_move |mut task| { + task.give_home(old_home.take()); + scheduler.make_handle().send(TaskFromFriend(task)); + }; + } + + // return the result of the IO + a + } +} + +// get a handle for the current scheduler +macro_rules! get_handle_to_current_scheduler( + () => (do Local::borrow:: |sched| { sched.make_handle() }) +) + enum SocketNameKind { TcpPeer, Tcp, @@ -45,12 +88,10 @@ enum SocketNameKind { fn socket_name>(sk: SocketNameKind, handle: U) -> Result { - #[fixed_stack_segment]; #[inline(never)]; - let getsockname = match sk { - TcpPeer => uvll::rust_uv_tcp_getpeername, - Tcp => uvll::rust_uv_tcp_getsockname, - Udp => uvll::rust_uv_udp_getsockname + TcpPeer => uvll::tcp_getpeername, + Tcp => uvll::tcp_getsockname, + Udp => uvll::udp_getsockname, }; // Allocate a sockaddr_storage @@ -80,6 +121,7 @@ fn socket_name>(sk: SocketNameKind, } +// Obviously an Event Loop is always home. pub struct UvEventLoop { uvio: UvIoFactory } @@ -149,6 +191,7 @@ fn test_callback_run_once() { } } +// The entire point of async is to call into a loop from other threads so it does not need to home. pub struct UvRemoteCallback { // The uv async handle for triggering the callback async: AsyncWatcher, @@ -251,40 +294,38 @@ impl IoFactory for UvIoFactory { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); - // Block this task and take ownership, switch to scheduler context + let scheduler = Local::take::(); do scheduler.deschedule_running_task_and_then |_, task| { - rtdebug!("connect: entered scheduler context"); - let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); + let mut tcp = TcpWatcher::new(self.uv_loop()); let task_cell = Cell::new(task); // Wait for a connection - do tcp_watcher.connect(addr) |stream_watcher, status| { - rtdebug!("connect: in connect callback"); - if status.is_none() { - rtdebug!("status is none"); - let tcp_watcher = - NativeHandle::from_native_handle(stream_watcher.native_handle()); - let res = Ok(~UvTcpStream(tcp_watcher)); - - // Store the stream in the task's stack - unsafe { (*result_cell_ptr).put_back(res); } - - // Context switch - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } else { - rtdebug!("status is some"); - let task_cell = Cell::new(task_cell.take()); - do stream_watcher.close { - let res = Err(uv_error_to_io_error(status.unwrap())); + do tcp.connect(addr) |stream, status| { + match status { + None => { + let tcp = NativeHandle::from_native_handle(stream.native_handle()); + let home = get_handle_to_current_scheduler!(); + let res = Ok(~UvTcpStream { watcher: tcp, home: home }); + + // Store the stream in the task's stack unsafe { (*result_cell_ptr).put_back(res); } + + // Context switch let scheduler = Local::take::(); scheduler.resume_blocked_task_immediately(task_cell.take()); } - }; + Some(_) => { + let task_cell = Cell::new(task_cell.take()); + do stream.close { + let res = Err(uv_error_to_io_error(status.unwrap())); + unsafe { (*result_cell_ptr).put_back(res); } + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } } } @@ -295,7 +336,10 @@ impl IoFactory for UvIoFactory { fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> { let mut watcher = TcpWatcher::new(self.uv_loop()); match watcher.bind(addr) { - Ok(_) => Ok(~UvTcpListener::new(watcher)), + Ok(_) => { + let home = get_handle_to_current_scheduler!(); + Ok(~UvTcpListener::new(watcher, home)) + } Err(uverr) => { let scheduler = Local::take::(); do scheduler.deschedule_running_task_and_then |_, task| { @@ -313,7 +357,10 @@ impl IoFactory for UvIoFactory { fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> { let mut watcher = UdpWatcher::new(self.uv_loop()); match watcher.bind(addr) { - Ok(_) => Ok(~UvUdpSocket(watcher)), + Ok(_) => { + let home = get_handle_to_current_scheduler!(); + Ok(~UvUdpSocket { watcher: watcher, home: home }) + } Err(uverr) => { let scheduler = Local::take::(); do scheduler.deschedule_running_task_and_then |_, task| { @@ -329,22 +376,30 @@ impl IoFactory for UvIoFactory { } fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> { - Ok(~UvTimer(TimerWatcher::new(self.uv_loop()))) + let watcher = TimerWatcher::new(self.uv_loop()); + let home = get_handle_to_current_scheduler!(); + Ok(~UvTimer::new(watcher, home)) } } pub struct UvTcpListener { watcher: TcpWatcher, listening: bool, - incoming_streams: Tube> + incoming_streams: Tube>, + home: SchedHandle, +} + +impl HomingIO for UvTcpListener { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } } impl UvTcpListener { - fn new(watcher: TcpWatcher) -> UvTcpListener { + fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener { UvTcpListener { watcher: watcher, listening: false, - incoming_streams: Tube::new() + incoming_streams: Tube::new(), + home: home, } } @@ -353,13 +408,16 @@ impl UvTcpListener { impl Drop for UvTcpListener { fn drop(&self) { - let watcher = self.watcher(); - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do watcher.as_stream().close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + // XXX need mutable finalizer + let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) }; + do self_.home_for_io |self_| { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.watcher().as_stream().close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } } } @@ -367,83 +425,92 @@ impl Drop for UvTcpListener { impl RtioSocket for UvTcpListener { fn socket_name(&mut self) -> Result { - socket_name(Tcp, self.watcher) + do self.home_for_io |self_| { + socket_name(Tcp, self_.watcher) + } } } impl RtioTcpListener for UvTcpListener { fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> { - rtdebug!("entering listen"); - - if self.listening { - return self.incoming_streams.recv(); - } - - self.listening = true; - - let server_tcp_watcher = self.watcher(); - let incoming_streams_cell = Cell::new(self.incoming_streams.clone()); - - let incoming_streams_cell = Cell::new(incoming_streams_cell.take()); - let mut server_tcp_watcher = server_tcp_watcher; - do server_tcp_watcher.listen |mut server_stream_watcher, status| { - let maybe_stream = if status.is_none() { - let mut loop_ = server_stream_watcher.event_loop(); - let client_tcp_watcher = TcpWatcher::new(&mut loop_); - // XXX: Need's to be surfaced in interface - server_stream_watcher.accept(client_tcp_watcher.as_stream()); - Ok(~UvTcpStream(client_tcp_watcher)) - } else { - Err(standard_error(OtherIoError)) - }; + do self.home_for_io |self_| { + + if !self_.listening { + self_.listening = true; + + let incoming_streams_cell = Cell::new(self_.incoming_streams.clone()); + + do self_.watcher().listen |mut server, status| { + let stream = match status { + Some(_) => Err(standard_error(OtherIoError)), + None => { + let client = TcpWatcher::new(&server.event_loop()); + // XXX: needs to be surfaced in interface + server.accept(client.as_stream()); + let home = get_handle_to_current_scheduler!(); + Ok(~UvTcpStream { watcher: client, home: home }) + } + }; + + let mut incoming_streams = incoming_streams_cell.take(); + incoming_streams.send(stream); + incoming_streams_cell.put_back(incoming_streams); + } - let mut incoming_streams = incoming_streams_cell.take(); - incoming_streams.send(maybe_stream); - incoming_streams_cell.put_back(incoming_streams); + } + self_.incoming_streams.recv() } - - return self.incoming_streams.recv(); } fn accept_simultaneously(&mut self) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; - - let r = unsafe { - uvll::rust_uv_tcp_simultaneous_accepts(self.watcher.native_handle(), 1 as c_int) - }; + do self.home_for_io |self_| { + let r = unsafe { + uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int) + }; - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher(), r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; - - let r = unsafe { - uvll::rust_uv_tcp_simultaneous_accepts(self.watcher.native_handle(), 0 as c_int) - }; + do self.home_for_io |self_| { + let r = unsafe { + uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int) + }; - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher(), r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } } -pub struct UvTcpStream(TcpWatcher); +pub struct UvTcpStream { + watcher: TcpWatcher, + home: SchedHandle, +} + +impl HomingIO for UvTcpStream { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} impl Drop for UvTcpStream { fn drop(&self) { - rtdebug!("closing tcp stream"); - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self.as_stream().close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + // XXX need mutable finalizer + let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) }; + do this.home_for_io |self_| { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.watcher.as_stream().close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } } } @@ -451,148 +518,161 @@ impl Drop for UvTcpStream { impl RtioSocket for UvTcpStream { fn socket_name(&mut self) -> Result { - socket_name(Tcp, **self) + do self.home_for_io |self_| { + socket_name(Tcp, self_.watcher) + } } } impl RtioTcpStream for UvTcpStream { fn read(&mut self, buf: &mut [u8]) -> Result { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - - let scheduler = Local::take::(); - let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |_sched, task| { - rtdebug!("read: entered scheduler context"); - let task_cell = Cell::new(task); - // XXX: We shouldn't reallocate these callbacks every - // call to read - let alloc: AllocCallback = |_| unsafe { - slice_to_uv_buf(*buf_ptr) - }; - let mut watcher = self.as_stream(); - do watcher.read_start(alloc) |mut watcher, nread, _buf, status| { - - // Stop reading so that no read callbacks are - // triggered before the user calls `read` again. - // XXX: Is there a performance impact to calling - // stop here? - watcher.read_stop(); - - let result = if status.is_none() { - assert!(nread >= 0); - Ok(nread as uint) - } else { - Err(uv_error_to_io_error(status.unwrap())) + do self.home_for_io |self_| { + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + + let scheduler = Local::take::(); + let buf_ptr: *&mut [u8] = &buf; + do scheduler.deschedule_running_task_and_then |_sched, task| { + let task_cell = Cell::new(task); + // XXX: We shouldn't reallocate these callbacks every + // call to read + let alloc: AllocCallback = |_| unsafe { + slice_to_uv_buf(*buf_ptr) }; + let mut watcher = self_.watcher.as_stream(); + do watcher.read_start(alloc) |mut watcher, nread, _buf, status| { - unsafe { (*result_cell_ptr).put_back(result); } + // Stop reading so that no read callbacks are + // triggered before the user calls `read` again. + // XXX: Is there a performance impact to calling + // stop here? + watcher.read_stop(); - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + let result = if status.is_none() { + assert!(nread >= 0); + Ok(nread as uint) + } else { + Err(uv_error_to_io_error(status.unwrap())) + }; + + unsafe { (*result_cell_ptr).put_back(result); } + + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } - } - assert!(!result_cell.is_empty()); - return result_cell.take(); + assert!(!result_cell.is_empty()); + result_cell.take() + } } fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); - let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - let mut watcher = self.as_stream(); - do watcher.write(buf) |_watcher, status| { - let result = if status.is_none() { - Ok(()) - } else { - Err(uv_error_to_io_error(status.unwrap())) - }; - - unsafe { (*result_cell_ptr).put_back(result); } + do self.home_for_io |self_| { + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + let scheduler = Local::take::(); + let buf_ptr: *&[u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; + let mut watcher = self_.watcher.as_stream(); + do watcher.write(buf) |_watcher, status| { + let result = if status.is_none() { + Ok(()) + } else { + Err(uv_error_to_io_error(status.unwrap())) + }; + + unsafe { (*result_cell_ptr).put_back(result); } - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } - } - assert!(!result_cell.is_empty()); - return result_cell.take(); + assert!(!result_cell.is_empty()); + result_cell.take() + } } fn peer_name(&mut self) -> Result { - socket_name(TcpPeer, **self) + do self.home_for_io |self_| { + socket_name(TcpPeer, self_.watcher) + } } fn control_congestion(&mut self) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; + do self.home_for_io |self_| { + let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) }; - let r = unsafe { - uvll::rust_uv_tcp_nodelay(self.native_handle(), 0 as c_int) - }; - - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn nodelay(&mut self) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; - - let r = unsafe { - uvll::rust_uv_tcp_nodelay(self.native_handle(), 1 as c_int) - }; + do self.home_for_io |self_| { + let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; - - let r = unsafe { - uvll::rust_uv_tcp_keepalive(self.native_handle(), 1 as c_int, - delay_in_seconds as c_uint) - }; + do self.home_for_io |self_| { + let r = unsafe { + uvll::tcp_keepalive(self_.watcher.native_handle(), 1 as c_int, + delay_in_seconds as c_uint) + }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn letdie(&mut self) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; - - let r = unsafe { - uvll::rust_uv_tcp_keepalive(self.native_handle(), 0 as c_int, 0 as c_uint) - }; + do self.home_for_io |self_| { + let r = unsafe { + uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint) + }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } } -pub struct UvUdpSocket(UdpWatcher); +pub struct UvUdpSocket { + watcher: UdpWatcher, + home: SchedHandle, +} + +impl HomingIO for UvUdpSocket { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} impl Drop for UvUdpSocket { fn drop(&self) { - rtdebug!("closing udp socket"); - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self.close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + // XXX need mutable finalizer + let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) }; + do this.home_for_io |_| { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do this.watcher.close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } } } @@ -600,203 +680,240 @@ impl Drop for UvUdpSocket { impl RtioSocket for UvUdpSocket { fn socket_name(&mut self) -> Result { - socket_name(Udp, **self) + do self.home_for_io |self_| { + socket_name(Udp, self_.watcher) + } } } impl RtioUdpSocket for UvUdpSocket { fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - - let scheduler = Local::take::(); - let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |_sched, task| { - rtdebug!("recvfrom: entered scheduler context"); - let task_cell = Cell::new(task); - let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; - do self.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| { - let _ = flags; // XXX add handling for partials? - - watcher.recv_stop(); - - let result = match status { - None => { - assert!(nread >= 0); - Ok((nread as uint, addr)) - } - Some(err) => Err(uv_error_to_io_error(err)) - }; - - unsafe { (*result_cell_ptr).put_back(result); } + do self.home_for_io |self_| { + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + + let scheduler = Local::take::(); + let buf_ptr: *&mut [u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; + do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| { + let _ = flags; // /XXX add handling for partials? + + watcher.recv_stop(); + + let result = match status { + None => { + assert!(nread >= 0); + Ok((nread as uint, addr)) + } + Some(err) => Err(uv_error_to_io_error(err)), + }; + + unsafe { (*result_cell_ptr).put_back(result); } - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } - } - assert!(!result_cell.is_empty()); - return result_cell.take(); + assert!(!result_cell.is_empty()); + result_cell.take() + } } fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); - let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - do self.send(buf, dst) |_watcher, status| { + do self.home_for_io |self_| { + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + let scheduler = Local::take::(); + let buf_ptr: *&[u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; + do self_.watcher.send(buf, dst) |_watcher, status| { + + let result = match status { + None => Ok(()), + Some(err) => Err(uv_error_to_io_error(err)), + }; + + unsafe { (*result_cell_ptr).put_back(result); } - let result = match status { - None => Ok(()), - Some(err) => Err(uv_error_to_io_error(err)), - }; - - unsafe { (*result_cell_ptr).put_back(result); } - - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } - } - assert!(!result_cell.is_empty()); - return result_cell.take(); + assert!(!result_cell.is_empty()); + result_cell.take() + } } fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { - let r = unsafe { - do multi.to_str().with_c_str |m_addr| { - uvll::udp_set_membership(self.native_handle(), m_addr, - ptr::null(), uvll::UV_JOIN_GROUP) - } - }; + do self.home_for_io |self_| { + let r = unsafe { + do multi.to_str().with_c_str |m_addr| { + uvll::udp_set_membership(self_.watcher.native_handle(), m_addr, + ptr::null(), uvll::UV_JOIN_GROUP) + } + }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { - let r = unsafe { - do multi.to_str().with_c_str |m_addr| { - uvll::udp_set_membership(self.native_handle(), m_addr, - ptr::null(), uvll::UV_LEAVE_GROUP) - } - }; + do self.home_for_io |self_| { + let r = unsafe { + do multi.to_str().with_c_str |m_addr| { + uvll::udp_set_membership(self_.watcher.native_handle(), m_addr, + ptr::null(), uvll::UV_LEAVE_GROUP) + } + }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn loop_multicast_locally(&mut self) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_multicast_loop(self.native_handle(), 1 as c_int) - }; + do self.home_for_io |self_| { + + let r = unsafe { + uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int) + }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_multicast_loop(self.native_handle(), 0 as c_int) - }; + do self.home_for_io |self_| { + + let r = unsafe { + uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int) + }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_multicast_ttl(self.native_handle(), ttl as c_int) - }; + do self.home_for_io |self_| { - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + let r = unsafe { + uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int) + }; + + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_ttl(self.native_handle(), ttl as c_int) - }; + do self.home_for_io |self_| { + + let r = unsafe { + uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int) + }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn hear_broadcasts(&mut self) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_broadcast(self.native_handle(), 1 as c_int) - }; + do self.home_for_io |self_| { + + let r = unsafe { + uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int) + }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn ignore_broadcasts(&mut self) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_broadcast(self.native_handle(), 0 as c_int) - }; + do self.home_for_io |self_| { - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + let r = unsafe { + uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int) + }; + + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } } -pub struct UvTimer(timer::TimerWatcher); +pub struct UvTimer { + watcher: timer::TimerWatcher, + home: SchedHandle, +} + +impl HomingIO for UvTimer { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} impl UvTimer { - fn new(w: timer::TimerWatcher) -> UvTimer { - UvTimer(w) + fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer { + UvTimer { watcher: w, home: home } } } impl Drop for UvTimer { fn drop(&self) { - rtdebug!("closing UvTimer"); - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self.close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) }; + do self_.home_for_io |self_| { + rtdebug!("closing UvTimer"); + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.watcher.close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } } } } impl RtioTimer for UvTimer { - fn sleep(&self, msecs: u64) { - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_sched, task| { - rtdebug!("sleep: entered scheduler context"); - let task_cell = Cell::new(task); - let mut watcher = **self; - do watcher.start(msecs, 0) |_, status| { - assert!(status.is_none()); - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + fn sleep(&mut self, msecs: u64) { + do self.home_for_io |self_| { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_sched, task| { + rtdebug!("sleep: entered scheduler context"); + let task_cell = Cell::new(task); + do self_.watcher.start(msecs, 0) |_, status| { + assert!(status.is_none()); + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } + self_.watcher.stop(); } - let mut w = **self; - w.stop(); } } @@ -824,6 +941,152 @@ fn test_simple_udp_io_bind_only() { } } +#[test] +fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() { + use rt::sleeper_list::SleeperList; + use rt::work_queue::WorkQueue; + use rt::thread::Thread; + use rt::task::Task; + use rt::sched::{Shutdown, TaskFromFriend}; + do run_in_bare_thread { + let sleepers = SleeperList::new(); + let work_queue1 = WorkQueue::new(); + let work_queue2 = WorkQueue::new(); + let queues = ~[work_queue1.clone(), work_queue2.clone()]; + + let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(), + sleepers.clone()); + let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(), + sleepers.clone()); + + let handle1 = Cell::new(sched1.make_handle()); + let handle2 = Cell::new(sched2.make_handle()); + let tasksFriendHandle = Cell::new(sched2.make_handle()); + + let on_exit: ~fn(bool) = |exit_status| { + handle1.take().send(Shutdown); + handle2.take().send(Shutdown); + rtassert!(exit_status); + }; + + let test_function: ~fn() = || { + let io = unsafe { Local::unsafe_borrow::() }; + let addr = next_test_ip4(); + let maybe_socket = unsafe { (*io).udp_bind(addr) }; + // this socket is bound to this event loop + assert!(maybe_socket.is_ok()); + + // block self on sched1 + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + // unblock task + do task.wake().map_move |task| { + // send self to sched2 + tasksFriendHandle.take().send(TaskFromFriend(task)); + }; + // sched1 should now sleep since it has nothing else to do + } + // sched2 will wake up and get the task + // as we do nothing else, the function ends and the socket goes out of scope + // sched2 will start to run the destructor + // the destructor will first block the task, set it's home as sched1, then enqueue it + // sched2 will dequeue the task, see that it has a home, and send it to sched1 + // sched1 will wake up, exec the close function on the correct loop, and then we're done + }; + + let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function); + main_task.death.on_exit = Some(on_exit); + let main_task = Cell::new(main_task); + + let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {}); + + let sched1 = Cell::new(sched1); + let sched2 = Cell::new(sched2); + + let thread1 = do Thread::start { + sched1.take().bootstrap(main_task.take()); + }; + let thread2 = do Thread::start { + sched2.take().bootstrap(null_task.take()); + }; + + thread1.join(); + thread2.join(); + } +} + +#[test] +fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() { + use rt::sleeper_list::SleeperList; + use rt::work_queue::WorkQueue; + use rt::thread::Thread; + use rt::task::Task; + use rt::comm::oneshot; + use rt::sched::Shutdown; + do run_in_bare_thread { + let sleepers = SleeperList::new(); + let work_queue1 = WorkQueue::new(); + let work_queue2 = WorkQueue::new(); + let queues = ~[work_queue1.clone(), work_queue2.clone()]; + + let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(), + sleepers.clone()); + let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(), + sleepers.clone()); + + let handle1 = Cell::new(sched1.make_handle()); + let handle2 = Cell::new(sched2.make_handle()); + + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + let body1: ~fn() = || { + let io = unsafe { Local::unsafe_borrow::() }; + let addr = next_test_ip4(); + let socket = unsafe { (*io).udp_bind(addr) }; + assert!(socket.is_ok()); + chan.take().send(socket); + }; + + let body2: ~fn() = || { + let socket = port.take().recv(); + assert!(socket.is_ok()); + /* The socket goes out of scope and the destructor is called. + * The destructor: + * - sends itself back to sched1 + * - frees the socket + * - resets the home of the task to whatever it was previously + */ + }; + + let on_exit: ~fn(bool) = |exit| { + handle1.take().send(Shutdown); + handle2.take().send(Shutdown); + rtassert!(exit); + }; + + let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1)); + + let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2); + task2.death.on_exit = Some(on_exit); + let task2 = Cell::new(task2); + + let sched1 = Cell::new(sched1); + let sched2 = Cell::new(sched2); + + let thread1 = do Thread::start { + sched1.take().bootstrap(task1.take()); + }; + let thread2 = do Thread::start { + sched2.take().bootstrap(task2.take()); + }; + + thread1.join(); + thread2.join(); + } +} + #[test] fn test_simple_tcp_server_and_client() { do run_in_newsched_task { @@ -855,6 +1118,85 @@ fn test_simple_tcp_server_and_client() { } } +#[test] +fn test_simple_tcp_server_and_client_on_diff_threads() { + use rt::sleeper_list::SleeperList; + use rt::work_queue::WorkQueue; + use rt::thread::Thread; + use rt::task::Task; + use rt::sched::{Shutdown}; + do run_in_bare_thread { + let sleepers = SleeperList::new(); + + let server_addr = next_test_ip4(); + let client_addr = server_addr.clone(); + + let server_work_queue = WorkQueue::new(); + let client_work_queue = WorkQueue::new(); + let queues = ~[server_work_queue.clone(), client_work_queue.clone()]; + + let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue, + queues.clone(), sleepers.clone()); + let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue, + queues.clone(), sleepers.clone()); + + let server_handle = Cell::new(server_sched.make_handle()); + let client_handle = Cell::new(client_sched.make_handle()); + + let server_on_exit: ~fn(bool) = |exit_status| { + server_handle.take().send(Shutdown); + rtassert!(exit_status); + }; + + let client_on_exit: ~fn(bool) = |exit_status| { + client_handle.take().send(Shutdown); + rtassert!(exit_status); + }; + + let server_fn: ~fn() = || { + let io = unsafe { Local::unsafe_borrow::() }; + let mut listener = unsafe { (*io).tcp_bind(server_addr).unwrap() }; + let mut stream = listener.accept().unwrap(); + let mut buf = [0, .. 2048]; + let nread = stream.read(buf).unwrap(); + assert_eq!(nread, 8); + for i in range(0u, nread) { + assert_eq!(buf[i], i as u8); + } + }; + + let client_fn: ~fn() = || { + let io = unsafe { Local::unsafe_borrow::() }; + let mut stream = unsafe { (*io).tcp_connect(client_addr) }; + while stream.is_err() { + stream = unsafe { (*io).tcp_connect(client_addr) }; + } + stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]); + }; + + let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn); + server_task.death.on_exit = Some(server_on_exit); + let server_task = Cell::new(server_task); + + let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn); + client_task.death.on_exit = Some(client_on_exit); + let client_task = Cell::new(client_task); + + let server_sched = Cell::new(server_sched); + let client_sched = Cell::new(client_sched); + + let server_thread = do Thread::start { + server_sched.take().bootstrap(server_task.take()); + }; + let client_thread = do Thread::start { + client_sched.take().bootstrap(client_task.take()); + }; + + server_thread.join(); + client_thread.join(); + } +} + #[test] fn test_simple_udp_server_and_client() { do run_in_newsched_task { @@ -1071,19 +1413,13 @@ fn test_udp_many_read() { } } -fn test_timer_sleep_simple_impl() { - unsafe { - let io = Local::unsafe_borrow::(); - let timer = (*io).timer_init(); - match timer { - Ok(t) => t.sleep(1), - Err(_) => assert!(false) - } - } -} #[test] fn test_timer_sleep_simple() { do run_in_newsched_task { - test_timer_sleep_simple_impl(); + unsafe { + let io = Local::unsafe_borrow::(); + let timer = (*io).timer_init(); + do timer.map_move |mut t| { t.sleep(1) }; + } } } diff --git a/src/libstd/rt/uv/uvll.rs b/src/libstd/rt/uv/uvll.rs index 65c0cffe5a073..0ea2175336ab0 100644 --- a/src/libstd/rt/uv/uvll.rs +++ b/src/libstd/rt/uv/uvll.rs @@ -172,6 +172,7 @@ fn request_sanity_check() { } } +// XXX Event loops ignore SIGPIPE by default. pub unsafe fn loop_new() -> *c_void { #[fixed_stack_segment]; #[inline(never)]; @@ -287,7 +288,7 @@ pub unsafe fn get_udp_handle_from_send_req(send_req: *uv_udp_send_t) -> *uv_udp_ return rust_uv_get_udp_handle_from_send_req(send_req); } -pub unsafe fn udp_get_sockname(handle: *uv_udp_t, name: *sockaddr_storage) -> c_int { +pub unsafe fn udp_getsockname(handle: *uv_udp_t, name: *sockaddr_storage) -> c_int { #[fixed_stack_segment]; #[inline(never)]; return rust_uv_udp_getsockname(handle, name); diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index 0462789af9ff9..b5d6e02b46a4c 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -13,12 +13,21 @@ #include #endif +#ifndef __WIN32__ +// for signal +#include +#endif + #include "uv.h" #include "rust_globals.h" extern "C" void* rust_uv_loop_new() { +// XXX libuv doesn't always ignore SIGPIPE even though we don't need it. +#ifndef __WIN32__ + signal(SIGPIPE, SIG_IGN); +#endif return (void*)uv_loop_new(); }