diff --git a/src/libstd/io/net/pipe.rs b/src/libstd/io/net/pipe.rs index ec997b71986cc..f9e9dc4b86225 100644 --- a/src/libstd/io/net/pipe.rs +++ b/src/libstd/io/net/pipe.rs @@ -26,6 +26,8 @@ use io::{Listener, Acceptor, IoResult, TimedOut, standard_error}; use time::Duration; use sys::pipe::UnixStream as UnixStreamImp; +use sys::pipe::UnixReadStream as UnixReadStreamImp; +use sys::pipe::UnixWriteStream as UnixWriteStreamImp; use sys::pipe::UnixListener as UnixListenerImp; use sys::pipe::UnixAcceptor as UnixAcceptorImp; @@ -36,8 +38,87 @@ pub struct UnixStream { inner: UnixStreamImp, } -impl UnixStream { +/// A stream which can read from a named pipe. +pub struct UnixReadStream { + inner: UnixReadStreamImp, +} + +/// A stream which can write to a named pipe. +pub struct UnixWriteStream { + inner: UnixWriteStreamImp, +} + +pub trait ReadStream : Reader { + /// Closes the reading part of this connection. + /// + /// This method will close the reading portion of this connection, causing + /// all pending and future reads to immediately return with an error. + /// + /// Note that this method affects all cloned handles associated with this + /// stream, not just this one handle. + fn close_read(self) -> IoResult; + + /// Sets the read timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + #[experimental = "the timeout argument may change in type and value"] + fn set_read_timeout(&mut self, timeout_ms: Option); +} + +pub trait WriteStream : Writer { + /// Closes the writing part of this connection. + /// + /// This method will close the writing portion of this connection, causing + /// all pending and future writes to immediately return with an error. + /// + /// Note that this method affects all cloned handles associated with this + /// stream, not just this one handle. + fn close_write(self) -> IoResult; + + /// Sets the write timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + #[experimental = "the timeout argument may change in type and value"] + fn set_write_timeout(&mut self, timeout_ms: Option); +} + +impl ReadStream<()> for UnixReadStream { + fn close_read(self) -> IoResult<()> { + self.inner.close_read() + } + fn set_read_timeout(&mut self, timeout_ms: Option) { + self.inner.set_read_timeout(timeout_ms) + } +} + +impl WriteStream<()> for UnixWriteStream { + fn close_write(self) -> IoResult<()> { + self.inner.close_write() + } + fn set_write_timeout(&mut self, timeout_ms: Option) { + self.inner.set_write_timeout(timeout_ms) + } +} + +impl ReadStream for UnixStream { + fn close_read(self) -> IoResult { + self.inner.close_read().map(|v| UnixWriteStream { inner : v }) + } + fn set_read_timeout(&mut self, timeout_ms: Option) { + self.inner.set_read_timeout(timeout_ms) + } +} + +impl WriteStream for UnixStream { + fn close_write(self) -> IoResult { + self.inner.close_write().map(|v| UnixReadStream { inner : v }) + } + fn set_write_timeout(&mut self, timeout_ms: Option) { + self.inner.set_write_timeout(timeout_ms) + } +} +impl UnixStream { /// Connect to a pipe named by `path`. This will attempt to open a /// connection to the underlying socket. /// @@ -77,29 +158,6 @@ impl UnixStream { .map(|inner| UnixStream { inner: inner }) } - - /// Closes the reading half of this connection. - /// - /// This method will close the reading portion of this connection, causing - /// all pending and future reads to immediately return with an error. - /// - /// Note that this method affects all cloned handles associated with this - /// stream, not just this one handle. - pub fn close_read(&mut self) -> IoResult<()> { - self.inner.close_read() - } - - /// Closes the writing half of this connection. - /// - /// This method will close the writing portion of this connection, causing - /// all pending and future writes to immediately return with an error. - /// - /// Note that this method affects all cloned handles associated with this - /// stream, not just this one handle. - pub fn close_write(&mut self) -> IoResult<()> { - self.inner.close_write() - } - /// Sets the read/write timeout for this socket. /// /// For more information, see `TcpStream::set_timeout` @@ -107,22 +165,6 @@ impl UnixStream { pub fn set_timeout(&mut self, timeout_ms: Option) { self.inner.set_timeout(timeout_ms) } - - /// Sets the read timeout for this socket. - /// - /// For more information, see `TcpStream::set_timeout` - #[experimental = "the timeout argument may change in type and value"] - pub fn set_read_timeout(&mut self, timeout_ms: Option) { - self.inner.set_read_timeout(timeout_ms) - } - - /// Sets the write timeout for this socket. - /// - /// For more information, see `TcpStream::set_timeout` - #[experimental = "the timeout argument may change in type and value"] - pub fn set_write_timeout(&mut self, timeout_ms: Option) { - self.inner.set_write_timeout(timeout_ms) - } } impl Clone for UnixStream { @@ -143,6 +185,30 @@ impl Writer for UnixStream { } } +impl Clone for UnixReadStream { + fn clone(&self) -> UnixReadStream { + UnixReadStream { inner: self.inner.clone() } + } +} + +impl Reader for UnixReadStream { + fn read(&mut self, buf: &mut [u8]) -> IoResult { + self.inner.read(buf) + } +} + +impl Clone for UnixWriteStream { + fn clone(&self) -> UnixWriteStream { + UnixWriteStream { inner: self.inner.clone() } + } +} + +impl Writer for UnixWriteStream { + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + self.inner.write(buf) + } +} + impl sys_common::AsInner for UnixStream { fn as_inner(&self) -> &UnixStreamImp { &self.inner @@ -599,29 +665,21 @@ mod tests { }); let mut b = [0]; - let mut s = UnixStream::connect(&addr).unwrap(); + let s = UnixStream::connect(&addr).unwrap(); let mut s2 = s.clone(); - // closing should prevent reads/writes - s.close_write().unwrap(); - assert!(s.write(&[0]).is_err()); - s.close_read().unwrap(); - assert!(s.read(&mut b).is_err()); + let s_read : UnixReadStream = s.close_write().unwrap(); + let mut s3 = s_read.clone(); + let _ : () = s_read.close_read().unwrap(); // closing should affect previous handles assert!(s2.write(&[0]).is_err()); assert!(s2.read(&mut b).is_err()); - - // closing should affect new handles - let mut s3 = s.clone(); - assert!(s3.write(&[0]).is_err()); assert!(s3.read(&mut b).is_err()); // make sure these don't die - let _ = s2.close_read(); - let _ = s2.close_write(); - let _ = s3.close_read(); - let _ = s3.close_write(); + let _ : () = s2.close_read().unwrap().close_write().unwrap(); + let _ : () = s3.close_read().unwrap(); } #[test] @@ -635,7 +693,7 @@ mod tests { let _ = rx.recv_opt(); }); - let mut s = UnixStream::connect(&addr).unwrap(); + let s = UnixStream::connect(&addr).unwrap(); let s2 = s.clone(); let (tx, rx) = channel(); spawn(proc() { diff --git a/src/libstd/sys/unix/pipe.rs b/src/libstd/sys/unix/pipe.rs index 3f70fb5c1a56c..ecd2dfd990476 100644 --- a/src/libstd/sys/unix/pipe.rs +++ b/src/libstd/sys/unix/pipe.rs @@ -117,6 +117,42 @@ pub struct UnixStream { write_deadline: u64, } +pub struct UnixReadStream { + thing : UnixStream +} + +pub struct UnixWriteStream { + thing : UnixStream +} + +impl UnixReadStream { + pub fn read(&mut self, buf: &mut [u8]) -> IoResult { + self.thing.read(buf) + } + + pub fn close_read(self) -> IoResult<()> { + self.thing.close_read().map(|_| ()) + } + + pub fn set_read_timeout(&mut self, timeout: Option) { + self.thing.set_read_timeout(timeout) + } +} + +impl UnixWriteStream { + pub fn write(&mut self, buf: &[u8]) -> IoResult<()> { + self.thing.write(buf) + } + + pub fn close_write(self) -> IoResult<()> { + self.thing.close_read().map(|_| ()) + } + + pub fn set_write_timeout(&mut self, timeout: Option) { + self.thing.set_write_timeout(timeout) + } +} + impl UnixStream { pub fn connect(addr: &CString, timeout: Option) -> IoResult { @@ -177,12 +213,14 @@ impl UnixStream { } } - pub fn close_write(&mut self) -> IoResult<()> { - mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) + pub fn close_read(self) -> IoResult { + mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) + .map(|()| UnixWriteStream {thing: self.clone()}) } - pub fn close_read(&mut self) -> IoResult<()> { - mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) + pub fn close_write(self) -> IoResult { + mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) + .map(|()| UnixReadStream {thing: self.clone()}) } pub fn set_timeout(&mut self, timeout: Option) { @@ -206,6 +244,18 @@ impl Clone for UnixStream { } } +impl Clone for UnixReadStream { + fn clone(&self) -> UnixReadStream { + UnixReadStream {thing: self.thing.clone() } + } +} + +impl Clone for UnixWriteStream { + fn clone(&self) -> UnixWriteStream { + UnixWriteStream {thing: self.thing.clone() } + } +} + //////////////////////////////////////////////////////////////////////////////// // Unix Listener //////////////////////////////////////////////////////////////////////////////// diff --git a/src/libstd/sys/windows/pipe.rs b/src/libstd/sys/windows/pipe.rs index ca7985aa35bf8..f12987a052966 100644 --- a/src/libstd/sys/windows/pipe.rs +++ b/src/libstd/sys/windows/pipe.rs @@ -215,6 +215,42 @@ pub struct UnixStream { write_deadline: u64, } +pub struct UnixReadStream { + thing : UnixStream +} + +pub struct UnixWriteStream { + thing : UnixStream +} + +impl UnixReadStream { + pub fn read(&mut self, buf: &mut [u8]) -> IoResult { + self.thing.read(buf) + } + + pub fn close_read(self) -> IoResult<()> { + self.thing.close_read().map(|_| ()) + } + + pub fn set_read_timeout(&mut self, timeout: Option) { + self.thing.set_read_timeout(timeout) + } +} + +impl UnixWriteStream { + pub fn write(&mut self, buf: &[u8]) -> IoResult<()> { + self.thing.write(buf) + } + + pub fn close_write(self) -> IoResult<()> { + self.thing.close_read().map(|_| ()) + } + + pub fn set_write_timeout(&mut self, timeout: Option) { + self.thing.set_write_timeout(timeout) + } +} + impl UnixStream { fn try_connect(p: *const u16) -> Option { // Note that most of this is lifted from the libuv implementation. @@ -496,7 +532,7 @@ impl UnixStream { Ok(()) } - pub fn close_read(&mut self) -> IoResult<()> { + pub fn close_read(self) -> IoResult { // On windows, there's no actual shutdown() method for pipes, so we're // forced to emulate the behavior manually at the application level. To // do this, we need to both cancel any pending requests, as well as @@ -516,14 +552,14 @@ impl UnixStream { // no thread will erroneously sit in a read forever. let _guard = unsafe { self.inner.lock.lock() }; self.inner.read_closed.store(true, atomic::SeqCst); - self.cancel_io() + self.cancel_io().map(|()| UnixWriteStream { thing: self.clone() }) } - pub fn close_write(&mut self) -> IoResult<()> { + pub fn close_write(self) -> IoResult { // see comments in close_read() for why this lock is necessary let _guard = unsafe { self.inner.lock.lock() }; self.inner.write_closed.store(true, atomic::SeqCst); - self.cancel_io() + self.cancel_io().map(|()| UnixReadStream { thing: self.clone() }) } pub fn set_timeout(&mut self, timeout: Option) { @@ -551,6 +587,18 @@ impl Clone for UnixStream { } } +impl Clone for UnixReadStream { + fn clone(&self) -> UnixReadStream { + UnixReadStream {thing: self.thing.clone() } + } +} + +impl Clone for UnixWriteStream { + fn clone(&self) -> UnixWriteStream { + UnixWriteStream {thing: self.thing.clone() } + } +} + //////////////////////////////////////////////////////////////////////////////// // Unix Listener ////////////////////////////////////////////////////////////////////////////////