Skip to content

Change the UnixStream interface so that close_write on a variable preven... #19251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 113 additions & 55 deletions src/libstd/io/net/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use io::{Listener, Acceptor, IoResult, TimedOut, standard_error};
use time::Duration;

use sys::pipe::UnixStream as UnixStreamImp;
use sys::pipe::UnixReadStream as UnixReadStreamImp;
use sys::pipe::UnixWriteStream as UnixWriteStreamImp;
use sys::pipe::UnixListener as UnixListenerImp;
use sys::pipe::UnixAcceptor as UnixAcceptorImp;

Expand All @@ -36,8 +38,87 @@ pub struct UnixStream {
inner: UnixStreamImp,
}

impl UnixStream {
/// A stream which can read from a named pipe.
pub struct UnixReadStream {
inner: UnixReadStreamImp,
}

/// A stream which can write to a named pipe.
pub struct UnixWriteStream {
inner: UnixWriteStreamImp,
}

pub trait ReadStream<T> : Reader {
/// Closes the reading part of this connection.
///
/// This method will close the reading portion of this connection, causing
/// all pending and future reads to immediately return with an error.
///
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
fn close_read(self) -> IoResult<T>;

/// Sets the read timeout for this socket.
///
/// For more information, see `TcpStream::set_timeout`
#[experimental = "the timeout argument may change in type and value"]
fn set_read_timeout(&mut self, timeout_ms: Option<u64>);
}

pub trait WriteStream<T> : Writer {
/// Closes the writing part of this connection.
///
/// This method will close the writing portion of this connection, causing
/// all pending and future writes to immediately return with an error.
///
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
fn close_write(self) -> IoResult<T>;

/// Sets the write timeout for this socket.
///
/// For more information, see `TcpStream::set_timeout`
#[experimental = "the timeout argument may change in type and value"]
fn set_write_timeout(&mut self, timeout_ms: Option<u64>);
}

impl ReadStream<()> for UnixReadStream {
fn close_read(self) -> IoResult<()> {
self.inner.close_read()
}
fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
self.inner.set_read_timeout(timeout_ms)
}
}

impl WriteStream<()> for UnixWriteStream {
fn close_write(self) -> IoResult<()> {
self.inner.close_write()
}
fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
self.inner.set_write_timeout(timeout_ms)
}
}

impl ReadStream<UnixWriteStream> for UnixStream {
fn close_read(self) -> IoResult<UnixWriteStream> {
self.inner.close_read().map(|v| UnixWriteStream { inner : v })
}
fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
self.inner.set_read_timeout(timeout_ms)
}
}

impl WriteStream<UnixReadStream> for UnixStream {
fn close_write(self) -> IoResult<UnixReadStream> {
self.inner.close_write().map(|v| UnixReadStream { inner : v })
}
fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
self.inner.set_write_timeout(timeout_ms)
}
}

impl UnixStream {
/// Connect to a pipe named by `path`. This will attempt to open a
/// connection to the underlying socket.
///
Expand Down Expand Up @@ -77,52 +158,13 @@ impl UnixStream {
.map(|inner| UnixStream { inner: inner })
}


/// Closes the reading half of this connection.
///
/// This method will close the reading portion of this connection, causing
/// all pending and future reads to immediately return with an error.
///
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
pub fn close_read(&mut self) -> IoResult<()> {
self.inner.close_read()
}

/// Closes the writing half of this connection.
///
/// This method will close the writing portion of this connection, causing
/// all pending and future writes to immediately return with an error.
///
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
pub fn close_write(&mut self) -> IoResult<()> {
self.inner.close_write()
}

/// Sets the read/write timeout for this socket.
///
/// For more information, see `TcpStream::set_timeout`
#[experimental = "the timeout argument may change in type and value"]
pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
self.inner.set_timeout(timeout_ms)
}

/// Sets the read timeout for this socket.
///
/// For more information, see `TcpStream::set_timeout`
#[experimental = "the timeout argument may change in type and value"]
pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
self.inner.set_read_timeout(timeout_ms)
}

/// Sets the write timeout for this socket.
///
/// For more information, see `TcpStream::set_timeout`
#[experimental = "the timeout argument may change in type and value"]
pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
self.inner.set_write_timeout(timeout_ms)
}
}

impl Clone for UnixStream {
Expand All @@ -143,6 +185,30 @@ impl Writer for UnixStream {
}
}

impl Clone for UnixReadStream {
fn clone(&self) -> UnixReadStream {
UnixReadStream { inner: self.inner.clone() }
}
}

impl Reader for UnixReadStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
self.inner.read(buf)
}
}

impl Clone for UnixWriteStream {
fn clone(&self) -> UnixWriteStream {
UnixWriteStream { inner: self.inner.clone() }
}
}

impl Writer for UnixWriteStream {
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
self.inner.write(buf)
}
}

impl sys_common::AsInner<UnixStreamImp> for UnixStream {
fn as_inner(&self) -> &UnixStreamImp {
&self.inner
Expand Down Expand Up @@ -599,29 +665,21 @@ mod tests {
});

let mut b = [0];
let mut s = UnixStream::connect(&addr).unwrap();
let s = UnixStream::connect(&addr).unwrap();
let mut s2 = s.clone();

// closing should prevent reads/writes
s.close_write().unwrap();
assert!(s.write(&[0]).is_err());
s.close_read().unwrap();
assert!(s.read(&mut b).is_err());
let s_read : UnixReadStream = s.close_write().unwrap();
let mut s3 = s_read.clone();
let _ : () = s_read.close_read().unwrap();

// closing should affect previous handles
assert!(s2.write(&[0]).is_err());
assert!(s2.read(&mut b).is_err());

// closing should affect new handles
let mut s3 = s.clone();
assert!(s3.write(&[0]).is_err());
assert!(s3.read(&mut b).is_err());

// make sure these don't die
let _ = s2.close_read();
let _ = s2.close_write();
let _ = s3.close_read();
let _ = s3.close_write();
let _ : () = s2.close_read().unwrap().close_write().unwrap();
let _ : () = s3.close_read().unwrap();
}

#[test]
Expand All @@ -635,7 +693,7 @@ mod tests {
let _ = rx.recv_opt();
});

let mut s = UnixStream::connect(&addr).unwrap();
let s = UnixStream::connect(&addr).unwrap();
let s2 = s.clone();
let (tx, rx) = channel();
spawn(proc() {
Expand Down
58 changes: 54 additions & 4 deletions src/libstd/sys/unix/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,42 @@ pub struct UnixStream {
write_deadline: u64,
}

pub struct UnixReadStream {
thing : UnixStream
}

pub struct UnixWriteStream {
thing : UnixStream
}

impl UnixReadStream {
pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
self.thing.read(buf)
}

pub fn close_read(self) -> IoResult<()> {
self.thing.close_read().map(|_| ())
}

pub fn set_read_timeout(&mut self, timeout: Option<u64>) {
self.thing.set_read_timeout(timeout)
}
}

impl UnixWriteStream {
pub fn write(&mut self, buf: &[u8]) -> IoResult<()> {
self.thing.write(buf)
}

pub fn close_write(self) -> IoResult<()> {
self.thing.close_read().map(|_| ())
}

pub fn set_write_timeout(&mut self, timeout: Option<u64>) {
self.thing.set_write_timeout(timeout)
}
}

impl UnixStream {
pub fn connect(addr: &CString,
timeout: Option<u64>) -> IoResult<UnixStream> {
Expand Down Expand Up @@ -177,12 +213,14 @@ impl UnixStream {
}
}

pub fn close_write(&mut self) -> IoResult<()> {
mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
pub fn close_read(self) -> IoResult<UnixWriteStream> {
mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
.map(|()| UnixWriteStream {thing: self.clone()})
}

pub fn close_read(&mut self) -> IoResult<()> {
mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
pub fn close_write(self) -> IoResult<UnixReadStream> {
mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
.map(|()| UnixReadStream {thing: self.clone()})
}

pub fn set_timeout(&mut self, timeout: Option<u64>) {
Expand All @@ -206,6 +244,18 @@ impl Clone for UnixStream {
}
}

impl Clone for UnixReadStream {
fn clone(&self) -> UnixReadStream {
UnixReadStream {thing: self.thing.clone() }
}
}

impl Clone for UnixWriteStream {
fn clone(&self) -> UnixWriteStream {
UnixWriteStream {thing: self.thing.clone() }
}
}

////////////////////////////////////////////////////////////////////////////////
// Unix Listener
////////////////////////////////////////////////////////////////////////////////
Expand Down
56 changes: 52 additions & 4 deletions src/libstd/sys/windows/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,42 @@ pub struct UnixStream {
write_deadline: u64,
}

pub struct UnixReadStream {
thing : UnixStream
}

pub struct UnixWriteStream {
thing : UnixStream
}

impl UnixReadStream {
pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
self.thing.read(buf)
}

pub fn close_read(self) -> IoResult<()> {
self.thing.close_read().map(|_| ())
}

pub fn set_read_timeout(&mut self, timeout: Option<u64>) {
self.thing.set_read_timeout(timeout)
}
}

impl UnixWriteStream {
pub fn write(&mut self, buf: &[u8]) -> IoResult<()> {
self.thing.write(buf)
}

pub fn close_write(self) -> IoResult<()> {
self.thing.close_read().map(|_| ())
}

pub fn set_write_timeout(&mut self, timeout: Option<u64>) {
self.thing.set_write_timeout(timeout)
}
}

impl UnixStream {
fn try_connect(p: *const u16) -> Option<libc::HANDLE> {
// Note that most of this is lifted from the libuv implementation.
Expand Down Expand Up @@ -496,7 +532,7 @@ impl UnixStream {
Ok(())
}

pub fn close_read(&mut self) -> IoResult<()> {
pub fn close_read(self) -> IoResult<UnixWriteStream> {
// On windows, there's no actual shutdown() method for pipes, so we're
// forced to emulate the behavior manually at the application level. To
// do this, we need to both cancel any pending requests, as well as
Expand All @@ -516,14 +552,14 @@ impl UnixStream {
// no thread will erroneously sit in a read forever.
let _guard = unsafe { self.inner.lock.lock() };
self.inner.read_closed.store(true, atomic::SeqCst);
self.cancel_io()
self.cancel_io().map(|()| UnixWriteStream { thing: self.clone() })
}

pub fn close_write(&mut self) -> IoResult<()> {
pub fn close_write(self) -> IoResult<UnixReadStream> {
// see comments in close_read() for why this lock is necessary
let _guard = unsafe { self.inner.lock.lock() };
self.inner.write_closed.store(true, atomic::SeqCst);
self.cancel_io()
self.cancel_io().map(|()| UnixReadStream { thing: self.clone() })
}

pub fn set_timeout(&mut self, timeout: Option<u64>) {
Expand Down Expand Up @@ -551,6 +587,18 @@ impl Clone for UnixStream {
}
}

impl Clone for UnixReadStream {
fn clone(&self) -> UnixReadStream {
UnixReadStream {thing: self.thing.clone() }
}
}

impl Clone for UnixWriteStream {
fn clone(&self) -> UnixWriteStream {
UnixWriteStream {thing: self.thing.clone() }
}
}

////////////////////////////////////////////////////////////////////////////////
// Unix Listener
////////////////////////////////////////////////////////////////////////////////
Expand Down