From c401474d2d265ded20d2419e44536b50d48fcaf2 Mon Sep 17 00:00:00 2001 From: unknown Date: Sun, 23 Nov 2014 13:11:33 -0800 Subject: [PATCH] Change the UnixStream interface so that close_write on a variable prevents (at compile time) future attempts to write to that variable. This is also true for close_read. Note that UnixStreams cloned from a variable on which close_* is called are not affected by this change. They have the same interface as before, in which reads and writes may fail due to the underlying stream being closed for that operation. --- src/libstd/io/net/pipe.rs | 168 ++++++++++++++++++++++----------- src/libstd/sys/unix/pipe.rs | 58 +++++++++++- src/libstd/sys/windows/pipe.rs | 56 ++++++++++- 3 files changed, 219 insertions(+), 63 deletions(-) diff --git a/src/libstd/io/net/pipe.rs b/src/libstd/io/net/pipe.rs index 8e934d221d22c..1768af7fcd97f 100644 --- a/src/libstd/io/net/pipe.rs +++ b/src/libstd/io/net/pipe.rs @@ -30,6 +30,8 @@ use io::{Listener, Acceptor, IoResult, TimedOut, standard_error}; use time::Duration; use sys::pipe::UnixStream as UnixStreamImp; +use sys::pipe::UnixReadStream as UnixReadStreamImp; +use sys::pipe::UnixWriteStream as UnixWriteStreamImp; use sys::pipe::UnixListener as UnixListenerImp; use sys::pipe::UnixAcceptor as UnixAcceptorImp; @@ -38,8 +40,87 @@ pub struct UnixStream { inner: UnixStreamImp, } -impl UnixStream { +/// A stream which can read from a named pipe. +pub struct UnixReadStream { + inner: UnixReadStreamImp, +} + +/// A stream which can write to a named pipe. +pub struct UnixWriteStream { + inner: UnixWriteStreamImp, +} + +pub trait ReadStream : Reader { + /// Closes the reading part of this connection. + /// + /// This method will close the reading portion of this connection, causing + /// all pending and future reads to immediately return with an error. + /// + /// Note that this method affects all cloned handles associated with this + /// stream, not just this one handle. + fn close_read(self) -> IoResult; + + /// Sets the read timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + #[experimental = "the timeout argument may change in type and value"] + fn set_read_timeout(&mut self, timeout_ms: Option); +} + +pub trait WriteStream : Writer { + /// Closes the writing part of this connection. + /// + /// This method will close the writing portion of this connection, causing + /// all pending and future writes to immediately return with an error. + /// + /// Note that this method affects all cloned handles associated with this + /// stream, not just this one handle. + fn close_write(self) -> IoResult; + + /// Sets the write timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + #[experimental = "the timeout argument may change in type and value"] + fn set_write_timeout(&mut self, timeout_ms: Option); +} + +impl ReadStream<()> for UnixReadStream { + fn close_read(self) -> IoResult<()> { + self.inner.close_read() + } + fn set_read_timeout(&mut self, timeout_ms: Option) { + self.inner.set_read_timeout(timeout_ms) + } +} + +impl WriteStream<()> for UnixWriteStream { + fn close_write(self) -> IoResult<()> { + self.inner.close_write() + } + fn set_write_timeout(&mut self, timeout_ms: Option) { + self.inner.set_write_timeout(timeout_ms) + } +} + +impl ReadStream for UnixStream { + fn close_read(self) -> IoResult { + self.inner.close_read().map(|v| UnixWriteStream { inner : v }) + } + fn set_read_timeout(&mut self, timeout_ms: Option) { + self.inner.set_read_timeout(timeout_ms) + } +} + +impl WriteStream for UnixStream { + fn close_write(self) -> IoResult { + self.inner.close_write().map(|v| UnixReadStream { inner : v }) + } + fn set_write_timeout(&mut self, timeout_ms: Option) { + self.inner.set_write_timeout(timeout_ms) + } +} +impl UnixStream { /// Connect to a pipe named by `path`. This will attempt to open a /// connection to the underlying socket. /// @@ -79,29 +160,6 @@ impl UnixStream { .map(|inner| UnixStream { inner: inner }) } - - /// Closes the reading half of this connection. - /// - /// This method will close the reading portion of this connection, causing - /// all pending and future reads to immediately return with an error. - /// - /// Note that this method affects all cloned handles associated with this - /// stream, not just this one handle. - pub fn close_read(&mut self) -> IoResult<()> { - self.inner.close_read() - } - - /// Closes the writing half of this connection. - /// - /// This method will close the writing portion of this connection, causing - /// all pending and future writes to immediately return with an error. - /// - /// Note that this method affects all cloned handles associated with this - /// stream, not just this one handle. - pub fn close_write(&mut self) -> IoResult<()> { - self.inner.close_write() - } - /// Sets the read/write timeout for this socket. /// /// For more information, see `TcpStream::set_timeout` @@ -109,22 +167,6 @@ impl UnixStream { pub fn set_timeout(&mut self, timeout_ms: Option) { self.inner.set_timeout(timeout_ms) } - - /// Sets the read timeout for this socket. - /// - /// For more information, see `TcpStream::set_timeout` - #[experimental = "the timeout argument may change in type and value"] - pub fn set_read_timeout(&mut self, timeout_ms: Option) { - self.inner.set_read_timeout(timeout_ms) - } - - /// Sets the write timeout for this socket. - /// - /// For more information, see `TcpStream::set_timeout` - #[experimental = "the timeout argument may change in type and value"] - pub fn set_write_timeout(&mut self, timeout_ms: Option) { - self.inner.set_write_timeout(timeout_ms) - } } impl Clone for UnixStream { @@ -145,6 +187,30 @@ impl Writer for UnixStream { } } +impl Clone for UnixReadStream { + fn clone(&self) -> UnixReadStream { + UnixReadStream { inner: self.inner.clone() } + } +} + +impl Reader for UnixReadStream { + fn read(&mut self, buf: &mut [u8]) -> IoResult { + self.inner.read(buf) + } +} + +impl Clone for UnixWriteStream { + fn clone(&self) -> UnixWriteStream { + UnixWriteStream { inner: self.inner.clone() } + } +} + +impl Writer for UnixWriteStream { + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + self.inner.write(buf) + } +} + /// A value that can listen for incoming named pipe connection requests. pub struct UnixListener { /// The internal, opaque runtime Unix listener. @@ -583,29 +649,21 @@ mod tests { }); let mut b = [0]; - let mut s = UnixStream::connect(&addr).unwrap(); + let s = UnixStream::connect(&addr).unwrap(); let mut s2 = s.clone(); - // closing should prevent reads/writes - s.close_write().unwrap(); - assert!(s.write(&[0]).is_err()); - s.close_read().unwrap(); - assert!(s.read(&mut b).is_err()); + let s_read : UnixReadStream = s.close_write().unwrap(); + let mut s3 = s_read.clone(); + let _ : () = s_read.close_read().unwrap(); // closing should affect previous handles assert!(s2.write(&[0]).is_err()); assert!(s2.read(&mut b).is_err()); - - // closing should affect new handles - let mut s3 = s.clone(); - assert!(s3.write(&[0]).is_err()); assert!(s3.read(&mut b).is_err()); // make sure these don't die - let _ = s2.close_read(); - let _ = s2.close_write(); - let _ = s3.close_read(); - let _ = s3.close_write(); + let _ : () = s2.close_read().unwrap().close_write().unwrap(); + let _ : () = s3.close_read().unwrap(); } #[test] @@ -619,7 +677,7 @@ mod tests { let _ = rx.recv_opt(); }); - let mut s = UnixStream::connect(&addr).unwrap(); + let s = UnixStream::connect(&addr).unwrap(); let s2 = s.clone(); let (tx, rx) = channel(); spawn(proc() { diff --git a/src/libstd/sys/unix/pipe.rs b/src/libstd/sys/unix/pipe.rs index 4d3469a9c24a8..30ec3dc62e807 100644 --- a/src/libstd/sys/unix/pipe.rs +++ b/src/libstd/sys/unix/pipe.rs @@ -117,6 +117,42 @@ pub struct UnixStream { write_deadline: u64, } +pub struct UnixReadStream { + thing : UnixStream +} + +pub struct UnixWriteStream { + thing : UnixStream +} + +impl UnixReadStream { + pub fn read(&mut self, buf: &mut [u8]) -> IoResult { + self.thing.read(buf) + } + + pub fn close_read(self) -> IoResult<()> { + self.thing.close_read().map(|_| ()) + } + + pub fn set_read_timeout(&mut self, timeout: Option) { + self.thing.set_read_timeout(timeout) + } +} + +impl UnixWriteStream { + pub fn write(&mut self, buf: &[u8]) -> IoResult<()> { + self.thing.write(buf) + } + + pub fn close_write(self) -> IoResult<()> { + self.thing.close_read().map(|_| ()) + } + + pub fn set_write_timeout(&mut self, timeout: Option) { + self.thing.set_write_timeout(timeout) + } +} + impl UnixStream { pub fn connect(addr: &CString, timeout: Option) -> IoResult { @@ -177,12 +213,14 @@ impl UnixStream { } } - pub fn close_write(&mut self) -> IoResult<()> { - mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) + pub fn close_read(self) -> IoResult { + mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) + .map(|()| UnixWriteStream {thing: self.clone()}) } - pub fn close_read(&mut self) -> IoResult<()> { - mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) + pub fn close_write(self) -> IoResult { + mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) + .map(|()| UnixReadStream {thing: self.clone()}) } pub fn set_timeout(&mut self, timeout: Option) { @@ -206,6 +244,18 @@ impl Clone for UnixStream { } } +impl Clone for UnixReadStream { + fn clone(&self) -> UnixReadStream { + UnixReadStream {thing: self.thing.clone() } + } +} + +impl Clone for UnixWriteStream { + fn clone(&self) -> UnixWriteStream { + UnixWriteStream {thing: self.thing.clone() } + } +} + //////////////////////////////////////////////////////////////////////////////// // Unix Listener //////////////////////////////////////////////////////////////////////////////// diff --git a/src/libstd/sys/windows/pipe.rs b/src/libstd/sys/windows/pipe.rs index a623c2cd8e297..f7498eef29ffb 100644 --- a/src/libstd/sys/windows/pipe.rs +++ b/src/libstd/sys/windows/pipe.rs @@ -215,6 +215,42 @@ pub struct UnixStream { write_deadline: u64, } +pub struct UnixReadStream { + thing : UnixStream +} + +pub struct UnixWriteStream { + thing : UnixStream +} + +impl UnixReadStream { + pub fn read(&mut self, buf: &mut [u8]) -> IoResult { + self.thing.read(buf) + } + + pub fn close_read(self) -> IoResult<()> { + self.thing.close_read().map(|_| ()) + } + + pub fn set_read_timeout(&mut self, timeout: Option) { + self.thing.set_read_timeout(timeout) + } +} + +impl UnixWriteStream { + pub fn write(&mut self, buf: &[u8]) -> IoResult<()> { + self.thing.write(buf) + } + + pub fn close_write(self) -> IoResult<()> { + self.thing.close_read().map(|_| ()) + } + + pub fn set_write_timeout(&mut self, timeout: Option) { + self.thing.set_write_timeout(timeout) + } +} + impl UnixStream { fn try_connect(p: *const u16) -> Option { // Note that most of this is lifted from the libuv implementation. @@ -496,7 +532,7 @@ impl UnixStream { Ok(()) } - pub fn close_read(&mut self) -> IoResult<()> { + pub fn close_read(self) -> IoResult { // On windows, there's no actual shutdown() method for pipes, so we're // forced to emulate the behavior manually at the application level. To // do this, we need to both cancel any pending requests, as well as @@ -516,14 +552,14 @@ impl UnixStream { // no thread will erroneously sit in a read forever. let _guard = unsafe { self.inner.lock.lock() }; self.inner.read_closed.store(true, atomic::SeqCst); - self.cancel_io() + self.cancel_io().map(|()| UnixWriteStream { thing: self.clone() }) } - pub fn close_write(&mut self) -> IoResult<()> { + pub fn close_write(self) -> IoResult { // see comments in close_read() for why this lock is necessary let _guard = unsafe { self.inner.lock.lock() }; self.inner.write_closed.store(true, atomic::SeqCst); - self.cancel_io() + self.cancel_io().map(|()| UnixReadStream { thing: self.clone() }) } pub fn set_timeout(&mut self, timeout: Option) { @@ -551,6 +587,18 @@ impl Clone for UnixStream { } } +impl Clone for UnixReadStream { + fn clone(&self) -> UnixReadStream { + UnixReadStream {thing: self.thing.clone() } + } +} + +impl Clone for UnixWriteStream { + fn clone(&self) -> UnixWriteStream { + UnixWriteStream {thing: self.thing.clone() } + } +} + //////////////////////////////////////////////////////////////////////////////// // Unix Listener ////////////////////////////////////////////////////////////////////////////////