From 56080c476712e478ffe4ef8d6d727c0e3d21cfd0 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 22 Jan 2014 19:32:16 -0800 Subject: [PATCH] Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere. --- src/libnative/io/file.rs | 47 +++++---- src/libnative/io/net.rs | 130 +++++++++++++++---------- src/librustuv/access.rs | 109 +++++++++++++++++++++ src/librustuv/homing.rs | 4 +- src/librustuv/lib.rs | 4 +- src/librustuv/net.rs | 62 ++++++++++-- src/librustuv/pipe.rs | 31 +++++- src/librustuv/rc.rs | 49 ++++++++++ src/libstd/io/net/tcp.rs | 181 ++++++++++++++++++++++++++++++++++- src/libstd/io/net/udp.rs | 117 ++++++++++++++++++++++ src/libstd/io/net/unix.rs | 96 +++++++++++++++++++ src/libstd/io/pipe.rs | 6 ++ src/libstd/libc.rs | 49 ++++++++++ src/libstd/option.rs | 1 - src/libstd/rt/rtio.rs | 4 + src/libstd/unstable/mutex.rs | 1 - src/libstd/util.rs | 1 - src/libstd/vec.rs | 2 +- 18 files changed, 812 insertions(+), 82 deletions(-) create mode 100644 src/librustuv/access.rs create mode 100644 src/librustuv/rc.rs diff --git a/src/libnative/io/file.rs b/src/libnative/io/file.rs index cc5b0770d4d20..25fb2809e764a 100644 --- a/src/libnative/io/file.rs +++ b/src/libnative/io/file.rs @@ -10,6 +10,7 @@ //! Blocking posix-based file I/O +use std::sync::arc::UnsafeArc; use std::c_str::CString; use std::io::IoError; use std::io; @@ -55,9 +56,13 @@ pub fn keep_going(data: &[u8], f: |*u8, uint| -> i64) -> i64 { pub type fd_t = libc::c_int; +struct Inner { + fd: fd_t, + close_on_drop: bool, +} + pub struct FileDesc { - priv fd: fd_t, - priv close_on_drop: bool, + priv inner: UnsafeArc } impl FileDesc { @@ -70,7 +75,10 @@ impl FileDesc { /// Note that all I/O operations done on this object will be *blocking*, but /// they do not require the runtime to be active. pub fn new(fd: fd_t, close_on_drop: bool) -> FileDesc { - FileDesc { fd: fd, close_on_drop: close_on_drop } + FileDesc { inner: UnsafeArc::new(Inner { + fd: fd, + close_on_drop: close_on_drop + }) } } // FIXME(#10465) these functions should not be public, but anything in @@ -80,7 +88,7 @@ impl FileDesc { #[cfg(windows)] type rlen = libc::c_uint; #[cfg(not(windows))] type rlen = libc::size_t; let ret = retry(|| unsafe { - libc::read(self.fd, + libc::read(self.fd(), buf.as_ptr() as *mut libc::c_void, buf.len() as rlen) as libc::c_int }); @@ -97,7 +105,7 @@ impl FileDesc { #[cfg(not(windows))] type wlen = libc::size_t; let ret = keep_going(buf, |buf, len| { unsafe { - libc::write(self.fd, buf as *libc::c_void, len as wlen) as i64 + libc::write(self.fd(), buf as *libc::c_void, len as wlen) as i64 } }); if ret < 0 { @@ -107,7 +115,11 @@ impl FileDesc { } } - pub fn fd(&self) -> fd_t { self.fd } + pub fn fd(&self) -> fd_t { + // This unsafety is fine because we're just reading off the file + // descriptor, no one is modifying this. + unsafe { (*self.inner.get()).fd } + } } impl io::Reader for FileDesc { @@ -130,7 +142,7 @@ impl rtio::RtioFileStream for FileDesc { self.inner_write(buf) } fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result { - return os_pread(self.fd, buf.as_ptr(), buf.len(), offset); + return os_pread(self.fd(), buf.as_ptr(), buf.len(), offset); #[cfg(windows)] fn os_pread(fd: c_int, buf: *u8, amt: uint, offset: u64) -> IoResult { @@ -162,7 +174,7 @@ impl rtio::RtioFileStream for FileDesc { } } fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> { - return os_pwrite(self.fd, buf.as_ptr(), buf.len(), offset); + return os_pwrite(self.fd(), buf.as_ptr(), buf.len(), offset); #[cfg(windows)] fn os_pwrite(fd: c_int, buf: *u8, amt: uint, offset: u64) -> IoResult<()> { @@ -197,7 +209,7 @@ impl rtio::RtioFileStream for FileDesc { io::SeekCur => libc::FILE_CURRENT, }; unsafe { - let handle = libc::get_osfhandle(self.fd) as libc::HANDLE; + let handle = libc::get_osfhandle(self.fd()) as libc::HANDLE; let mut newpos = 0; match libc::SetFilePointerEx(handle, pos, &mut newpos, whence) { 0 => Err(super::last_error()), @@ -212,7 +224,7 @@ impl rtio::RtioFileStream for FileDesc { io::SeekEnd => libc::SEEK_END, io::SeekCur => libc::SEEK_CUR, }; - let n = unsafe { libc::lseek(self.fd, pos as libc::off_t, whence) }; + let n = unsafe { libc::lseek(self.fd(), pos as libc::off_t, whence) }; if n < 0 { Err(super::last_error()) } else { @@ -220,7 +232,7 @@ impl rtio::RtioFileStream for FileDesc { } } fn tell(&self) -> Result { - let n = unsafe { libc::lseek(self.fd, 0, libc::SEEK_CUR) }; + let n = unsafe { libc::lseek(self.fd(), 0, libc::SEEK_CUR) }; if n < 0 { Err(super::last_error()) } else { @@ -228,7 +240,7 @@ impl rtio::RtioFileStream for FileDesc { } } fn fsync(&mut self) -> Result<(), IoError> { - return os_fsync(self.fd); + return os_fsync(self.fd()); #[cfg(windows)] fn os_fsync(fd: c_int) -> IoResult<()> { @@ -247,7 +259,7 @@ impl rtio::RtioFileStream for FileDesc { #[cfg(not(windows))] fn datasync(&mut self) -> Result<(), IoError> { - return super::mkerr_libc(os_datasync(self.fd)); + return super::mkerr_libc(os_datasync(self.fd())); #[cfg(target_os = "macos")] fn os_datasync(fd: c_int) -> c_int { @@ -270,7 +282,7 @@ impl rtio::RtioFileStream for FileDesc { Ok(_) => {}, Err(e) => return Err(e), }; let ret = unsafe { - let handle = libc::get_osfhandle(self.fd) as libc::HANDLE; + let handle = libc::get_osfhandle(self.fd()) as libc::HANDLE; match libc::SetEndOfFile(handle) { 0 => Err(super::last_error()), _ => Ok(()) @@ -282,7 +294,7 @@ impl rtio::RtioFileStream for FileDesc { #[cfg(unix)] fn truncate(&mut self, offset: i64) -> Result<(), IoError> { super::mkerr_libc(retry(|| unsafe { - libc::ftruncate(self.fd, offset as libc::off_t) + libc::ftruncate(self.fd(), offset as libc::off_t) })) } } @@ -294,6 +306,9 @@ impl rtio::RtioPipe for FileDesc { fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { self.inner_write(buf) } + fn clone(&self) -> ~rtio::RtioPipe { + ~FileDesc { inner: self.inner.clone() } as ~rtio::RtioPipe + } } impl rtio::RtioTTY for FileDesc { @@ -312,7 +327,7 @@ impl rtio::RtioTTY for FileDesc { fn isatty(&self) -> bool { false } } -impl Drop for FileDesc { +impl Drop for Inner { fn drop(&mut self) { // closing stdio file handles makes no sense, so never do it. Also, note // that errors are ignored when closing a file descriptor. The reason diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index dd916c8f3c4b9..32cd6337f993d 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -14,6 +14,7 @@ use std::io; use std::libc; use std::mem; use std::rt::rtio; +use std::sync::arc::UnsafeArc; use std::unstable::intrinsics; use super::{IoResult, retry}; @@ -108,10 +109,27 @@ fn setsockopt(fd: sock_t, opt: libc::c_int, val: libc::c_int, let ret = libc::setsockopt(fd, opt, val, payload, mem::size_of::() as libc::socklen_t); - super::mkerr_libc(ret) + if ret != 0 { + Err(last_error()) + } else { + Ok(()) + } } } +#[cfg(windows)] +fn last_error() -> io::IoError { + extern "system" { + fn WSAGetLastError() -> libc::c_int; + } + super::translate_error(unsafe { WSAGetLastError() }, true) +} + +#[cfg(not(windows))] +fn last_error() -> io::IoError { + super::last_error() +} + #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); } #[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); } @@ -128,7 +146,7 @@ fn sockname(fd: sock_t, storage as *mut libc::sockaddr, &mut len as *mut libc::socklen_t); if ret != 0 { - return Err(super::last_error()) + return Err(last_error()) } } return sockaddr_to_addr(&storage, len as uint); @@ -222,7 +240,11 @@ pub fn init() { //////////////////////////////////////////////////////////////////////////////// pub struct TcpStream { - priv fd: sock_t, + priv inner: UnsafeArc, +} + +struct Inner { + fd: sock_t, } impl TcpStream { @@ -231,27 +253,31 @@ impl TcpStream { socket(addr, libc::SOCK_STREAM).and_then(|fd| { let (addr, len) = addr_to_sockaddr(addr); let addrp = &addr as *libc::sockaddr_storage; - let ret = TcpStream { fd: fd }; + let inner = Inner { fd: fd }; + let ret = TcpStream { inner: UnsafeArc::new(inner) }; match retry(|| { libc::connect(fd, addrp as *libc::sockaddr, len as libc::socklen_t) }) { - -1 => Err(super::last_error()), + -1 => Err(last_error()), _ => Ok(ret), } }) } } - pub fn fd(&self) -> sock_t { self.fd } + pub fn fd(&self) -> sock_t { + // This unsafety is fine because it's just a read-only arc + unsafe { (*self.inner.get()).fd } + } fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_NODELAY, + setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY, nodelay as libc::c_int) } fn set_keepalive(&mut self, seconds: Option) -> IoResult<()> { - let ret = setsockopt(self.fd, libc::SOL_SOCKET, libc::SO_KEEPALIVE, + let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE, seconds.is_some() as libc::c_int); match seconds { Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)), @@ -261,12 +287,12 @@ impl TcpStream { #[cfg(target_os = "macos")] fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_KEEPALIVE, + setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE, seconds as libc::c_int) } #[cfg(target_os = "freebsd")] fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_KEEPIDLE, + setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE, seconds as libc::c_int) } #[cfg(not(target_os = "macos"), not(target_os = "freebsd"))] @@ -282,7 +308,7 @@ impl rtio::RtioTcpStream for TcpStream { fn read(&mut self, buf: &mut [u8]) -> IoResult { let ret = retry(|| { unsafe { - libc::recv(self.fd, + libc::recv(self.fd(), buf.as_ptr() as *mut libc::c_void, buf.len() as wrlen, 0) as libc::c_int @@ -291,7 +317,7 @@ impl rtio::RtioTcpStream for TcpStream { if ret == 0 { Err(io::standard_error(io::EndOfFile)) } else if ret < 0 { - Err(super::last_error()) + Err(last_error()) } else { Ok(ret as uint) } @@ -299,20 +325,20 @@ impl rtio::RtioTcpStream for TcpStream { fn write(&mut self, buf: &[u8]) -> IoResult<()> { let ret = keep_going(buf, |buf, len| { unsafe { - libc::send(self.fd, + libc::send(self.fd(), buf as *mut libc::c_void, len as wrlen, 0) as i64 } }); if ret < 0 { - Err(super::last_error()) + Err(last_error()) } else { Ok(()) } } fn peer_name(&mut self) -> IoResult { - sockname(self.fd, libc::getpeername) + sockname(self.fd(), libc::getpeername) } fn control_congestion(&mut self) -> IoResult<()> { self.set_nodelay(false) @@ -326,15 +352,19 @@ impl rtio::RtioTcpStream for TcpStream { fn letdie(&mut self) -> IoResult<()> { self.set_keepalive(None) } + + fn clone(&self) -> ~rtio::RtioTcpStream { + ~TcpStream { inner: self.inner.clone() } as ~rtio::RtioTcpStream + } } impl rtio::RtioSocket for TcpStream { fn socket_name(&mut self) -> IoResult { - sockname(self.fd, libc::getsockname) + sockname(self.fd(), libc::getsockname) } } -impl Drop for TcpStream { +impl Drop for Inner { fn drop(&mut self) { unsafe { close(self.fd); } } } @@ -343,7 +373,7 @@ impl Drop for TcpStream { //////////////////////////////////////////////////////////////////////////////// pub struct TcpListener { - priv fd: sock_t, + priv inner: UnsafeArc, } impl TcpListener { @@ -352,7 +382,8 @@ impl TcpListener { socket(addr, libc::SOCK_STREAM).and_then(|fd| { let (addr, len) = addr_to_sockaddr(addr); let addrp = &addr as *libc::sockaddr_storage; - let ret = TcpListener { fd: fd }; + let inner = Inner { fd: fd }; + let ret = TcpListener { inner: UnsafeArc::new(inner) }; // On platforms with Berkeley-derived sockets, this allows // to quickly rebind a socket, without needing to wait for // the OS to clean up the previous one. @@ -366,18 +397,21 @@ impl TcpListener { } match libc::bind(fd, addrp as *libc::sockaddr, len as libc::socklen_t) { - -1 => Err(super::last_error()), + -1 => Err(last_error()), _ => Ok(ret), } }) } } - pub fn fd(&self) -> sock_t { self.fd } + pub fn fd(&self) -> sock_t { + // This is just a read-only arc so the unsafety is fine + unsafe { (*self.inner.get()).fd } + } pub fn native_listen(self, backlog: int) -> IoResult { - match unsafe { libc::listen(self.fd, backlog as libc::c_int) } { - -1 => Err(super::last_error()), + match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { + -1 => Err(last_error()), _ => Ok(TcpAcceptor { listener: self }) } } @@ -391,20 +425,16 @@ impl rtio::RtioTcpListener for TcpListener { impl rtio::RtioSocket for TcpListener { fn socket_name(&mut self) -> IoResult { - sockname(self.fd, libc::getsockname) + sockname(self.fd(), libc::getsockname) } } -impl Drop for TcpListener { - fn drop(&mut self) { unsafe { close(self.fd); } } -} - pub struct TcpAcceptor { priv listener: TcpListener, } impl TcpAcceptor { - pub fn fd(&self) -> sock_t { self.listener.fd } + pub fn fd(&self) -> sock_t { self.listener.fd() } pub fn native_accept(&mut self) -> IoResult { unsafe { @@ -417,8 +447,8 @@ impl TcpAcceptor { storagep as *mut libc::sockaddr, &mut size as *mut libc::socklen_t) as libc::c_int }) as sock_t { - -1 => Err(super::last_error()), - fd => Ok(TcpStream { fd: fd }) + -1 => Err(last_error()), + fd => Ok(TcpStream { inner: UnsafeArc::new(Inner { fd: fd })}) } } } @@ -444,7 +474,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { //////////////////////////////////////////////////////////////////////////////// pub struct UdpSocket { - priv fd: sock_t, + priv inner: UnsafeArc, } impl UdpSocket { @@ -453,25 +483,29 @@ impl UdpSocket { socket(addr, libc::SOCK_DGRAM).and_then(|fd| { let (addr, len) = addr_to_sockaddr(addr); let addrp = &addr as *libc::sockaddr_storage; - let ret = UdpSocket { fd: fd }; + let inner = Inner { fd: fd }; + let ret = UdpSocket { inner: UnsafeArc::new(inner) }; match libc::bind(fd, addrp as *libc::sockaddr, len as libc::socklen_t) { - -1 => Err(super::last_error()), + -1 => Err(last_error()), _ => Ok(ret), } }) } } - pub fn fd(&self) -> sock_t { self.fd } + pub fn fd(&self) -> sock_t { + // unsafety is fine because it's just a read-only arc + unsafe { (*self.inner.get()).fd } + } pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> { - setsockopt(self.fd, libc::SOL_SOCKET, libc::SO_BROADCAST, + setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST, on as libc::c_int) } pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP, + setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP, on as libc::c_int) } @@ -484,14 +518,14 @@ impl UdpSocket { // interface == INADDR_ANY imr_interface: libc::in_addr { s_addr: 0x0 }, }; - setsockopt(self.fd, libc::IPPROTO_IP, opt, mreq) + setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq) } In6Addr(addr) => { let mreq = libc::ip6_mreq { ipv6mr_multiaddr: addr, ipv6mr_interface: 0, }; - setsockopt(self.fd, libc::IPPROTO_IPV6, opt, mreq) + setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq) } } } @@ -514,14 +548,14 @@ impl rtio::RtioUdpSocket for UdpSocket { let mut addrlen: libc::socklen_t = mem::size_of::() as libc::socklen_t; let ret = retry(|| { - libc::recvfrom(self.fd, + libc::recvfrom(self.fd(), buf.as_ptr() as *mut libc::c_void, buf.len() as msglen_t, 0, storagep as *mut libc::sockaddr, &mut addrlen) as libc::c_int }); - if ret < 0 { return Err(super::last_error()) } + if ret < 0 { return Err(last_error()) } sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| { Ok((ret as uint, addr)) }) @@ -532,7 +566,7 @@ impl rtio::RtioUdpSocket for UdpSocket { let dstp = &dst as *libc::sockaddr_storage; unsafe { let ret = retry(|| { - libc::sendto(self.fd, + libc::sendto(self.fd(), buf.as_ptr() as *libc::c_void, buf.len() as msglen_t, 0, @@ -540,7 +574,7 @@ impl rtio::RtioUdpSocket for UdpSocket { len as libc::socklen_t) as libc::c_int }); match ret { - -1 => Err(super::last_error()), + -1 => Err(last_error()), n if n as uint != buf.len() => { Err(io::IoError { kind: io::OtherIoError, @@ -582,11 +616,11 @@ impl rtio::RtioUdpSocket for UdpSocket { } fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_MULTICAST_TTL, + setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL, ttl as libc::c_int) } fn time_to_live(&mut self, ttl: int) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int) + setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int) } fn hear_broadcasts(&mut self) -> IoResult<()> { @@ -595,8 +629,8 @@ impl rtio::RtioUdpSocket for UdpSocket { fn ignore_broadcasts(&mut self) -> IoResult<()> { self.set_broadcast(false) } -} -impl Drop for UdpSocket { - fn drop(&mut self) { unsafe { close(self.fd) } } + fn clone(&self) -> ~rtio::RtioUdpSocket { + ~UdpSocket { inner: self.inner.clone() } as ~rtio::RtioUdpSocket + } } diff --git a/src/librustuv/access.rs b/src/librustuv/access.rs new file mode 100644 index 0000000000000..9d06593a6eafd --- /dev/null +++ b/src/librustuv/access.rs @@ -0,0 +1,109 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/// An exclusive access primitive +/// +/// This primitive is used to gain exclusive access to read() and write() in uv. +/// It is assumed that all invocations of this struct happen on the same thread +/// (the uv event loop). + +use std::cast; +use std::sync::arc::UnsafeArc; +use std::rt::task::{BlockedTask, Task}; +use std::rt::local::Local; + +use homing::HomingMissile; + +pub struct Access { + priv inner: UnsafeArc, +} + +pub struct Guard<'a> { + priv access: &'a mut Access, + priv missile: Option, +} + +struct Inner { + queue: ~[BlockedTask], + held: bool, +} + +impl Access { + pub fn new() -> Access { + Access { + inner: UnsafeArc::new(Inner { + queue: ~[], + held: false, + }) + } + } + + pub fn grant<'a>(&'a mut self, missile: HomingMissile) -> Guard<'a> { + // This unsafety is actually OK because the homing missile argument + // guarantees that we're on the same event loop as all the other objects + // attempting to get access granted. + let inner: &mut Inner = unsafe { cast::transmute(self.inner.get()) }; + + if inner.held { + let t: ~Task = Local::take(); + t.deschedule(1, |task| { + inner.queue.push(task); + Ok(()) + }); + assert!(inner.held); + } else { + inner.held = true; + } + + Guard { access: self, missile: Some(missile) } + } +} + +impl Clone for Access { + fn clone(&self) -> Access { + Access { inner: self.inner.clone() } + } +} + +#[unsafe_destructor] +impl<'a> Drop for Guard<'a> { + fn drop(&mut self) { + // This guard's homing missile is still armed, so we're guaranteed to be + // on the same I/O event loop, so this unsafety should be ok. + assert!(self.missile.is_some()); + let inner: &mut Inner = unsafe { + cast::transmute(self.access.inner.get()) + }; + + match inner.queue.shift() { + // Here we have found a task that was waiting for access, and we + // current have the "access lock" we need to relinquish access to + // this sleeping task. + // + // To do so, we first drop out homing missile and we then reawaken + // the task. In reawakening the task, it will be immediately + // scheduled on this scheduler. Because we might be woken up on some + // other scheduler, we drop our homing missile before we reawaken + // the task. + Some(task) => { + drop(self.missile.take()); + let _ = task.wake().map(|t| t.reawaken()); + } + None => { inner.held = false; } + } + } +} + +impl Drop for Inner { + fn drop(&mut self) { + assert!(!self.held); + assert_eq!(self.queue.len(), 0); + } +} diff --git a/src/librustuv/homing.rs b/src/librustuv/homing.rs index a2f3457a9430c..25c929c995de7 100644 --- a/src/librustuv/homing.rs +++ b/src/librustuv/homing.rs @@ -125,8 +125,8 @@ pub trait HomingIO { /// After a homing operation has been completed, this will return the current /// task back to its appropriate home (if applicable). The field is used to /// assert that we are where we think we are. -struct HomingMissile { - io_home: uint, +pub struct HomingMissile { + priv io_home: uint, } impl HomingMissile { diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index 39d6f851e1722..b463bb7fd733d 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -68,8 +68,10 @@ pub use self::tty::TtyWatcher; mod macros; -mod queue; +mod access; mod homing; +mod queue; +mod rc; /// The implementation of `rtio` for libuv pub mod uvio; diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 5461fc6272d35..7660d2c4f2b3e 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -19,7 +19,9 @@ use std::rt::rtio; use std::rt::task::BlockedTask; use std::unstable::intrinsics; +use access::Access; use homing::{HomingIO, HomeHandle}; +use rc::Refcount; use stream::StreamWatcher; use super::{Loop, Request, UvError, Buf, status_to_io_result, uv_error_to_io_error, UvHandle, slice_to_uv_buf, @@ -152,6 +154,14 @@ pub struct TcpWatcher { handle: *uvll::uv_tcp_t, stream: StreamWatcher, home: HomeHandle, + priv refcount: Refcount, + + // libuv can't support concurrent reads and concurrent writes of the same + // stream object, so we use these access guards in order to arbitrate among + // multiple concurrent reads and writes. Note that libuv *can* read and + // write simultaneously, it just can't read and read simultaneously. + priv read_access: Access, + priv write_access: Access, } pub struct TcpListener { @@ -183,6 +193,9 @@ impl TcpWatcher { home: home, handle: handle, stream: StreamWatcher::new(handle), + refcount: Refcount::new(), + read_access: Access::new(), + write_access: Access::new(), } } @@ -238,12 +251,14 @@ impl rtio::RtioSocket for TcpWatcher { impl rtio::RtioTcpStream for TcpWatcher { fn read(&mut self, buf: &mut [u8]) -> Result { - let _m = self.fire_homing_missile(); + let m = self.fire_homing_missile(); + let _g = self.read_access.grant(m); self.stream.read(buf).map_err(uv_error_to_io_error) } fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); + let m = self.fire_homing_missile(); + let _g = self.write_access.grant(m); self.stream.write(buf).map_err(uv_error_to_io_error) } @@ -280,6 +295,17 @@ impl rtio::RtioTcpStream for TcpWatcher { uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint) }) } + + fn clone(&self) -> ~rtio::RtioTcpStream { + ~TcpWatcher { + handle: self.handle, + stream: StreamWatcher::new(self.handle), + home: self.home.clone(), + refcount: self.refcount.clone(), + write_access: self.write_access.clone(), + read_access: self.read_access.clone(), + } as ~rtio::RtioTcpStream + } } impl UvHandle for TcpWatcher { @@ -289,7 +315,9 @@ impl UvHandle for TcpWatcher { impl Drop for TcpWatcher { fn drop(&mut self) { let _m = self.fire_homing_missile(); - self.close(); + if self.refcount.decrement() { + self.close(); + } } } @@ -415,6 +443,11 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { pub struct UdpWatcher { handle: *uvll::uv_udp_t, home: HomeHandle, + + // See above for what these fields are + priv refcount: Refcount, + priv read_access: Access, + priv write_access: Access, } impl UdpWatcher { @@ -423,6 +456,9 @@ impl UdpWatcher { let udp = UdpWatcher { handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) }, home: io.make_handle(), + refcount: Refcount::new(), + read_access: Access::new(), + write_access: Access::new(), }; assert_eq!(unsafe { uvll::uv_udp_init(io.uv_loop(), udp.handle) @@ -463,7 +499,8 @@ impl rtio::RtioUdpSocket for UdpWatcher { buf: Option, result: Option<(ssize_t, Option)>, } - let _m = self.fire_homing_missile(); + let m = self.fire_homing_missile(); + let _g = self.read_access.grant(m); let a = match unsafe { uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb) @@ -533,7 +570,8 @@ impl rtio::RtioUdpSocket for UdpWatcher { fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> { struct Ctx { task: Option, result: c_int } - let _m = self.fire_homing_missile(); + let m = self.fire_homing_missile(); + let _g = self.write_access.grant(m); let mut req = Request::new(uvll::UV_UDP_SEND); let buf = slice_to_uv_buf(buf); @@ -636,13 +674,25 @@ impl rtio::RtioUdpSocket for UdpWatcher { 0 as c_int) }) } + + fn clone(&self) -> ~rtio::RtioUdpSocket { + ~UdpWatcher { + handle: self.handle, + home: self.home.clone(), + refcount: self.refcount.clone(), + write_access: self.write_access.clone(), + read_access: self.read_access.clone(), + } as ~rtio::RtioUdpSocket + } } impl Drop for UdpWatcher { fn drop(&mut self) { // Send ourselves home to close this handle (blocking while doing so). let _m = self.fire_homing_missile(); - self.close(); + if self.refcount.decrement() { + self.close(); + } } } diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index a021a13e2d98d..c312f112d28b4 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -14,7 +14,9 @@ use std::libc; use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor}; use std::rt::task::BlockedTask; +use access::Access; use homing::{HomingIO, HomeHandle}; +use rc::Refcount; use stream::StreamWatcher; use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error, wait_until_woken_after, wakeup}; @@ -25,6 +27,11 @@ pub struct PipeWatcher { stream: StreamWatcher, home: HomeHandle, priv defused: bool, + priv refcount: Refcount, + + // see comments in TcpWatcher for why these exist + priv write_access: Access, + priv read_access: Access, } pub struct PipeListener { @@ -61,6 +68,9 @@ impl PipeWatcher { stream: StreamWatcher::new(handle), home: home, defused: false, + refcount: Refcount::new(), + read_access: Access::new(), + write_access: Access::new(), } } @@ -118,14 +128,27 @@ impl PipeWatcher { impl RtioPipe for PipeWatcher { fn read(&mut self, buf: &mut [u8]) -> Result { - let _m = self.fire_homing_missile(); + let m = self.fire_homing_missile(); + let _g = self.read_access.grant(m); self.stream.read(buf).map_err(uv_error_to_io_error) } fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); + let m = self.fire_homing_missile(); + let _g = self.write_access.grant(m); self.stream.write(buf).map_err(uv_error_to_io_error) } + + fn clone(&self) -> ~RtioPipe { + ~PipeWatcher { + stream: StreamWatcher::new(self.stream.handle), + defused: false, + home: self.home.clone(), + refcount: self.refcount.clone(), + read_access: self.read_access.clone(), + write_access: self.write_access.clone(), + } as ~RtioPipe + } } impl HomingIO for PipeWatcher { @@ -138,8 +161,8 @@ impl UvHandle for PipeWatcher { impl Drop for PipeWatcher { fn drop(&mut self) { - if !self.defused { - let _m = self.fire_homing_missile(); + let _m = self.fire_homing_missile(); + if !self.defused && self.refcount.decrement() { self.close(); } } diff --git a/src/librustuv/rc.rs b/src/librustuv/rc.rs new file mode 100644 index 0000000000000..f43cf72236109 --- /dev/null +++ b/src/librustuv/rc.rs @@ -0,0 +1,49 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/// Simple refcount structure for cloning handles +/// +/// This is meant to be an unintrusive solution to cloning handles in rustuv. +/// The handles themselves shouldn't be sharing memory because there are bits of +/// state in the rust objects which shouldn't be shared across multiple users of +/// the same underlying uv object, hence Rc is not used and this simple counter +/// should suffice. + +use std::sync::arc::UnsafeArc; + +pub struct Refcount { + priv rc: UnsafeArc, +} + +impl Refcount { + /// Creates a new refcount of 1 + pub fn new() -> Refcount { + Refcount { rc: UnsafeArc::new(1) } + } + + fn increment(&self) { + unsafe { *self.rc.get() += 1; } + } + + /// Returns whether the refcount just hit 0 or not + pub fn decrement(&self) -> bool { + unsafe { + *self.rc.get() -= 1; + *self.rc.get() == 0 + } + } +} + +impl Clone for Refcount { + fn clone(&self) -> Refcount { + self.increment(); + Refcount { rc: self.rc.clone() } + } +} diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index a0bdc193d980c..66ceb03082f4b 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -8,11 +8,42 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +//! TCP network connections +//! +//! This module contains the ability to open a TCP stream to a socket address, +//! as well as creating a socket server to accept incoming connections. The +//! destination and binding addresses can either be an IPv4 or IPv6 address. +//! +//! A TCP connection implements the `Reader` and `Writer` traits, while the TCP +//! listener (socket server) implements the `Listener` and `Acceptor` traits. + +#[deny(missing_doc)]; + +use clone::Clone; use io::net::ip::SocketAddr; -use io::{Reader, Writer, Listener, Acceptor, IoResult}; +use io::{Reader, Writer, Listener, Acceptor}; +use io::IoResult; use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener}; use rt::rtio::{RtioTcpAcceptor, RtioTcpStream}; +/// A structure which represents a TCP stream between a local socket and a +/// remote socket. +/// +/// # Example +/// +/// ```rust +/// # #[allow(unused_must_use)]; +/// use std::io::net::tcp::TcpStream; +/// use std::io::net::ip::{Ipv4Addr, SocketAddr}; +/// +/// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 }; +/// let mut stream = TcpStream::connect(addr); +/// +/// stream.write([1]); +/// let mut buf = [0]; +/// stream.read(buf); +/// drop(stream); // close the connection +/// ``` pub struct TcpStream { priv obj: ~RtioTcpStream } @@ -22,21 +53,40 @@ impl TcpStream { TcpStream { obj: s } } + /// Creates a TCP connection to a remote socket address. + /// + /// If no error is encountered, then `Ok(stream)` is returned. pub fn connect(addr: SocketAddr) -> IoResult { LocalIo::maybe_raise(|io| { io.tcp_connect(addr).map(TcpStream::new) }) } + /// Returns the socket address of the remote peer of this TCP connection. pub fn peer_name(&mut self) -> IoResult { self.obj.peer_name() } + /// Returns the socket address of the local half of this TCP connection. pub fn socket_name(&mut self) -> IoResult { self.obj.socket_name() } } +impl Clone for TcpStream { + /// Creates a new handle to this TCP stream, allowing for simultaneous reads + /// and writes of this connection. + /// + /// The underlying TCP stream will not be closed until all handles to the + /// stream have been deallocated. All handles will also follow the same + /// stream, but two concurrent reads will not receive the same data. + /// Instead, the first read will receive the first packet received, and the + /// second read will receive the second packet. + fn clone(&self) -> TcpStream { + TcpStream { obj: self.obj.clone() } + } +} + impl Reader for TcpStream { fn read(&mut self, buf: &mut [u8]) -> IoResult { self.obj.read(buf) } } @@ -45,17 +95,56 @@ impl Writer for TcpStream { fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) } } +/// A structure representing a socket server. This listener is used to create a +/// `TcpAcceptor` which can be used to accept sockets on a local port. +/// +/// # Example +/// +/// ```rust +/// # fn main() {} +/// # fn foo() { +/// # #[allow(unused_must_use, dead_code)]; +/// use std::io::net::tcp::TcpListener; +/// use std::io::net::ip::{Ipv4Addr, SocketAddr}; +/// use std::io::{Acceptor, Listener}; +/// +/// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 80 }; +/// let listener = TcpListener::bind(addr); +/// +/// // bind the listener to the specified address +/// let mut acceptor = listener.listen(); +/// +/// // accept connections and process them +/// # fn handle_client(_: T) {} +/// for stream in acceptor.incoming() { +/// spawn(proc() { +/// handle_client(stream); +/// }); +/// } +/// +/// // close the socket server +/// drop(acceptor); +/// # } +/// ``` pub struct TcpListener { priv obj: ~RtioTcpListener } impl TcpListener { + /// Creates a new `TcpListener` which will be bound to the specified local + /// socket address. This listener is not ready for accepting connections, + /// `listen` must be called on it before that's possible. + /// + /// Binding with a port number of 0 will request that the OS assigns a port + /// to this listener. The port allocated can be queried via the + /// `socket_name` function. pub fn bind(addr: SocketAddr) -> IoResult { LocalIo::maybe_raise(|io| { io.tcp_bind(addr).map(|l| TcpListener { obj: l }) }) } + /// Returns the local socket address of this listener. pub fn socket_name(&mut self) -> IoResult { self.obj.socket_name() } @@ -67,6 +156,9 @@ impl Listener for TcpListener { } } +/// The accepting half of a TCP socket server. This structure is created through +/// a `TcpListener`'s `listen` method, and this object can be used to accept new +/// `TcpStream` instances. pub struct TcpAcceptor { priv obj: ~RtioTcpAcceptor } @@ -573,4 +665,91 @@ mod test { } let _listener = TcpListener::bind(addr); }) + + iotest!(fn tcp_clone_smoke() { + let addr = next_test_ip4(); + let mut acceptor = TcpListener::bind(addr).listen(); + + spawn(proc() { + let mut s = TcpStream::connect(addr); + let mut buf = [0, 0]; + assert_eq!(s.read(buf), Ok(1)); + assert_eq!(buf[0], 1); + s.write([2]).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p1, c1) = Chan::new(); + let (p2, c2) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + p1.recv(); + s2.write([1]).unwrap(); + c2.send(()); + }); + c1.send(()); + let mut buf = [0, 0]; + assert_eq!(s1.read(buf), Ok(1)); + p2.recv(); + }) + + iotest!(fn tcp_clone_two_read() { + let addr = next_test_ip6(); + let mut acceptor = TcpListener::bind(addr).listen(); + let (p, c) = SharedChan::new(); + let c2 = c.clone(); + + spawn(proc() { + let mut s = TcpStream::connect(addr); + s.write([1]).unwrap(); + p.recv(); + s.write([2]).unwrap(); + p.recv(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + let mut buf = [0, 0]; + s2.read(buf).unwrap(); + c2.send(()); + done.send(()); + }); + let mut buf = [0, 0]; + s1.read(buf).unwrap(); + c.send(()); + + p.recv(); + }) + + iotest!(fn tcp_clone_two_write() { + let addr = next_test_ip4(); + let mut acceptor = TcpListener::bind(addr).listen(); + + spawn(proc() { + let mut s = TcpStream::connect(addr); + let mut buf = [0, 1]; + s.read(buf).unwrap(); + s.read(buf).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + s2.write([1]).unwrap(); + done.send(()); + }); + s1.write([2]).unwrap(); + + p.recv(); + }) } + diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs index 0ef62648afcb7..3c02f56384792 100644 --- a/src/libstd/io/net/udp.rs +++ b/src/libstd/io/net/udp.rs @@ -8,6 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use clone::Clone; use result::{Ok, Err}; use io::net::ip::SocketAddr; use io::{Reader, Writer, IoResult}; @@ -41,6 +42,19 @@ impl UdpSocket { } } +impl Clone for UdpSocket { + /// Creates a new handle to this UDP socket, allowing for simultaneous reads + /// and writes of the socket. + /// + /// The underlying UDP socket will not be closed until all handles to the + /// socket have been deallocated. Two concurrent reads will not receive the + /// same data. Instead, the first read will receive the first packet + /// received, and the second read will receive the second packet. + fn clone(&self) -> UdpSocket { + UdpSocket { obj: self.obj.clone() } + } +} + pub struct UdpStream { priv socket: UdpSocket, priv connectedTo: SocketAddr @@ -250,4 +264,107 @@ mod test { iotest!(fn socket_name_ip6() { socket_name(next_test_ip6()); }) + + iotest!(fn udp_clone_smoke() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut sock1 = UdpSocket::bind(addr1).unwrap(); + let sock2 = UdpSocket::bind(addr2).unwrap(); + + spawn(proc() { + let mut sock2 = sock2; + let mut buf = [0, 0]; + assert_eq!(sock2.recvfrom(buf), Ok((1, addr1))); + assert_eq!(buf[0], 1); + sock2.sendto([2], addr1).unwrap(); + }); + + let sock3 = sock1.clone(); + + let (p1, c1) = Chan::new(); + let (p2, c2) = Chan::new(); + spawn(proc() { + let mut sock3 = sock3; + p1.recv(); + sock3.sendto([1], addr2).unwrap(); + c2.send(()); + }); + c1.send(()); + let mut buf = [0, 0]; + assert_eq!(sock1.recvfrom(buf), Ok((1, addr2))); + p2.recv(); + }) + + iotest!(fn udp_clone_two_read() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut sock1 = UdpSocket::bind(addr1).unwrap(); + let sock2 = UdpSocket::bind(addr2).unwrap(); + let (p, c) = SharedChan::new(); + let c2 = c.clone(); + + spawn(proc() { + let mut sock2 = sock2; + sock2.sendto([1], addr1).unwrap(); + p.recv(); + sock2.sendto([2], addr1).unwrap(); + p.recv(); + }); + + let sock3 = sock1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut sock3 = sock3; + let mut buf = [0, 0]; + sock3.recvfrom(buf).unwrap(); + c2.send(()); + done.send(()); + }); + let mut buf = [0, 0]; + sock1.recvfrom(buf).unwrap(); + c.send(()); + + p.recv(); + }) + + iotest!(fn udp_clone_two_write() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut sock1 = UdpSocket::bind(addr1).unwrap(); + let sock2 = UdpSocket::bind(addr2).unwrap(); + + let (p, c) = SharedChan::new(); + + spawn(proc() { + let mut sock2 = sock2; + let mut buf = [0, 1]; + + for _ in p.iter() { + match sock2.recvfrom(buf) { + Ok(..) => {} + Err(e) => fail!("failed receive: {}", e), + } + } + }); + + let sock3 = sock1.clone(); + + let (p, done) = Chan::new(); + let c2 = c.clone(); + spawn(proc() { + let mut sock3 = sock3; + match sock3.sendto([1], addr2) { + Ok(..) => c2.send(()), + Err(..) => {} + } + done.send(()); + }); + match sock1.sendto([2], addr2) { + Ok(..) => c.send(()), + Err(..) => {} + } + + p.recv(); + }) } diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index ce95b987663f7..3c7db9c868618 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -25,6 +25,7 @@ instances as clients. use prelude::*; use c_str::ToCStr; +use clone::Clone; use rt::rtio::{IoFactory, LocalIo, RtioUnixListener}; use rt::rtio::{RtioUnixAcceptor, RtioPipe}; use io::pipe::PipeStream; @@ -62,6 +63,12 @@ impl UnixStream { } } +impl Clone for UnixStream { + fn clone(&self) -> UnixStream { + UnixStream { obj: self.obj.clone() } + } +} + impl Reader for UnixStream { fn read(&mut self, buf: &mut [u8]) -> IoResult { self.obj.read(buf) } } @@ -228,4 +235,93 @@ mod tests { let _acceptor = UnixListener::bind(&path).listen(); assert!(path.exists()); } + + #[test] + fn unix_clone_smoke() { + let addr = next_test_unix(); + let mut acceptor = UnixListener::bind(&addr).listen(); + + spawn(proc() { + let mut s = UnixStream::connect(&addr); + let mut buf = [0, 0]; + assert_eq!(s.read(buf), Ok(1)); + assert_eq!(buf[0], 1); + s.write([2]).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p1, c1) = Chan::new(); + let (p2, c2) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + p1.recv(); + s2.write([1]).unwrap(); + c2.send(()); + }); + c1.send(()); + let mut buf = [0, 0]; + assert_eq!(s1.read(buf), Ok(1)); + p2.recv(); + } + + #[test] + fn unix_clone_two_read() { + let addr = next_test_unix(); + let mut acceptor = UnixListener::bind(&addr).listen(); + let (p, c) = SharedChan::new(); + let c2 = c.clone(); + + spawn(proc() { + let mut s = UnixStream::connect(&addr); + s.write([1]).unwrap(); + p.recv(); + s.write([2]).unwrap(); + p.recv(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + let mut buf = [0, 0]; + s2.read(buf).unwrap(); + c2.send(()); + done.send(()); + }); + let mut buf = [0, 0]; + s1.read(buf).unwrap(); + c.send(()); + + p.recv(); + } + + #[test] + fn unix_clone_two_write() { + let addr = next_test_unix(); + let mut acceptor = UnixListener::bind(&addr).listen(); + + spawn(proc() { + let mut s = UnixStream::connect(&addr); + let mut buf = [0, 1]; + s.read(buf).unwrap(); + s.read(buf).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + s2.write([1]).unwrap(); + done.send(()); + }); + s1.write([2]).unwrap(); + + p.recv(); + } } diff --git a/src/libstd/io/pipe.rs b/src/libstd/io/pipe.rs index ca85707149b92..83250bdae7361 100644 --- a/src/libstd/io/pipe.rs +++ b/src/libstd/io/pipe.rs @@ -51,6 +51,12 @@ impl PipeStream { } } +impl Clone for PipeStream { + fn clone(&self) -> PipeStream { + PipeStream { obj: self.obj.clone() } + } +} + impl Reader for PipeStream { fn read(&mut self, buf: &mut [u8]) -> IoResult { self.obj.read(buf) } } diff --git a/src/libstd/libc.rs b/src/libstd/libc.rs index 11a7b5dd19171..057d618f44490 100644 --- a/src/libstd/libc.rs +++ b/src/libstd/libc.rs @@ -960,6 +960,8 @@ pub mod types { } pub mod extra { use ptr; + use libc::consts::os::extra::{MAX_PROTOCOL_CHAIN, + WSAPROTOCOL_LEN}; use libc::types::common::c95::c_void; use libc::types::os::arch::c95::{c_char, c_int, c_uint, size_t}; use libc::types::os::arch::c95::{c_long, c_ulong}; @@ -1106,6 +1108,47 @@ pub mod types { } pub type LPFILETIME = *mut FILETIME; + + pub struct GUID { + Data1: DWORD, + Data2: DWORD, + Data3: DWORD, + Data4: [BYTE, ..8], + } + + struct WSAPROTOCOLCHAIN { + ChainLen: c_int, + ChainEntries: [DWORD, ..MAX_PROTOCOL_CHAIN], + } + + pub type LPWSAPROTOCOLCHAIN = *mut WSAPROTOCOLCHAIN; + + pub struct WSAPROTOCOL_INFO { + dwServiceFlags1: DWORD, + dwServiceFlags2: DWORD, + dwServiceFlags3: DWORD, + dwServiceFlags4: DWORD, + dwProviderFlags: DWORD, + ProviderId: GUID, + dwCatalogEntryId: DWORD, + ProtocolChain: WSAPROTOCOLCHAIN, + iVersion: c_int, + iAddressFamily: c_int, + iMaxSockAddr: c_int, + iMinSockAddr: c_int, + iSocketType: c_int, + iProtocol: c_int, + iProtocolMaxOffset: c_int, + iNetworkByteOrder: c_int, + iSecurityScheme: c_int, + dwMessageSize: DWORD, + dwProviderReserved: DWORD, + szProtocol: [u8, ..WSAPROTOCOL_LEN+1], + } + + pub type LPWSAPROTOCOL_INFO = *mut WSAPROTOCOL_INFO; + + pub type GROUP = c_uint; } } } @@ -1721,6 +1764,10 @@ pub mod consts { pub static FILE_BEGIN: DWORD = 0; pub static FILE_CURRENT: DWORD = 1; pub static FILE_END: DWORD = 2; + + pub static MAX_PROTOCOL_CHAIN: DWORD = 7; + pub static WSAPROTOCOL_LEN: DWORD = 255; + pub static INVALID_SOCKET: DWORD = !0; } pub mod sysconf { } @@ -4098,6 +4145,8 @@ pub mod funcs { lpFrequency: *mut LARGE_INTEGER) -> BOOL; pub fn QueryPerformanceCounter( lpPerformanceCount: *mut LARGE_INTEGER) -> BOOL; + + pub fn GetCurrentProcessId() -> DWORD; } } diff --git a/src/libstd/option.rs b/src/libstd/option.rs index 39b516aeb12a7..7bb29fdfacf65 100644 --- a/src/libstd/option.rs +++ b/src/libstd/option.rs @@ -480,7 +480,6 @@ mod tests { use iter::range; use str::StrSlice; - use util; use kinds::marker; use vec::ImmutableVector; diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 35b1e21df0677..8d02048d55cf0 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -203,6 +203,7 @@ pub trait RtioTcpStream : RtioSocket { fn nodelay(&mut self) -> Result<(), IoError>; fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError>; fn letdie(&mut self) -> Result<(), IoError>; + fn clone(&self) -> ~RtioTcpStream; } pub trait RtioSocket { @@ -224,6 +225,8 @@ pub trait RtioUdpSocket : RtioSocket { fn hear_broadcasts(&mut self) -> Result<(), IoError>; fn ignore_broadcasts(&mut self) -> Result<(), IoError>; + + fn clone(&self) -> ~RtioUdpSocket; } pub trait RtioTimer { @@ -253,6 +256,7 @@ pub trait RtioProcess { pub trait RtioPipe { fn read(&mut self, buf: &mut [u8]) -> Result; fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; + fn clone(&self) -> ~RtioPipe; } pub trait RtioUnixListener { diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs index 4804de756876f..82957cd93ceb8 100644 --- a/src/libstd/unstable/mutex.rs +++ b/src/libstd/unstable/mutex.rs @@ -380,7 +380,6 @@ mod test { use super::{Mutex, MUTEX_INIT}; use rt::thread::Thread; - use task; #[test] fn somke_lock() { diff --git a/src/libstd/util.rs b/src/libstd/util.rs index c075f9b4ba84f..715a10b9112f9 100644 --- a/src/libstd/util.rs +++ b/src/libstd/util.rs @@ -69,7 +69,6 @@ impl Void { mod tests { use super::*; use prelude::*; - use mem::size_of; #[test] fn identity_crisis() { diff --git a/src/libstd/vec.rs b/src/libstd/vec.rs index 4a6a4d54ae3e4..d53c2dceba248 100644 --- a/src/libstd/vec.rs +++ b/src/libstd/vec.rs @@ -4253,7 +4253,7 @@ mod tests { let h = x.mut_last(); assert_eq!(*h.unwrap(), 5); - let mut y: &mut [int] = []; + let y: &mut [int] = []; assert!(y.mut_last().is_none()); } }