Skip to content

Refactor TcpStream::connect into resolving loop and TcpStream::connect_to #119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
1 commit merged into from
Aug 28, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 55 additions & 52 deletions src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,61 +78,10 @@ impl TcpStream {
/// # Ok(()) }) }
/// ```
pub async fn connect<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpStream> {
enum State {
Waiting(TcpStream),
Error(io::Error),
Done,
}

let mut last_err = None;

for addr in addrs.to_socket_addrs()? {
let mut state = {
match mio::net::TcpStream::connect(&addr) {
Ok(mio_stream) => {
#[cfg(unix)]
let stream = TcpStream {
raw_fd: mio_stream.as_raw_fd(),
io_handle: IoHandle::new(mio_stream),
};

#[cfg(windows)]
let stream = TcpStream {
// raw_socket: mio_stream.as_raw_socket(),
io_handle: IoHandle::new(mio_stream),
};

State::Waiting(stream)
}
Err(err) => State::Error(err),
}
};

let res = future::poll_fn(|cx| {
match mem::replace(&mut state, State::Done) {
State::Waiting(stream) => {
// Once we've connected, wait for the stream to be writable as that's when
// the actual connection has been initiated. Once we're writable we check
// for `take_socket_error` to see if the connect actually hit an error or
// not.
//
// If all that succeeded then we ship everything on up.
if let Poll::Pending = stream.io_handle.poll_writable(cx)? {
state = State::Waiting(stream);
return Poll::Pending;
}

if let Some(err) = stream.io_handle.get_ref().take_error()? {
return Poll::Ready(Err(err));
}

Poll::Ready(Ok(stream))
}
State::Error(err) => Poll::Ready(Err(err)),
State::Done => panic!("`TcpStream::connect()` future polled after completion"),
}
})
.await;
let res = Self::connect_to(addr).await;

match res {
Ok(stream) => return Ok(stream),
Expand All @@ -148,6 +97,60 @@ impl TcpStream {
}))
}

/// Creates a new TCP stream connected to the specified address.
async fn connect_to(addr: SocketAddr) -> io::Result<TcpStream> {
let stream = mio::net::TcpStream::connect(&addr).map(|mio_stream| {
#[cfg(unix)]
let stream = TcpStream {
raw_fd: mio_stream.as_raw_fd(),
io_handle: IoHandle::new(mio_stream),
};

#[cfg(windows)]
let stream = TcpStream {
// raw_socket: mio_stream.as_raw_socket(),
io_handle: IoHandle::new(mio_stream),
};

stream
});

enum State {
Waiting(TcpStream),
Error(io::Error),
Done,
}
let mut state = match stream {
Ok(stream) => State::Waiting(stream),
Err(err) => State::Error(err),
};
future::poll_fn(|cx| {
match mem::replace(&mut state, State::Done) {
State::Waiting(stream) => {
// Once we've connected, wait for the stream to be writable as that's when
// the actual connection has been initiated. Once we're writable we check
// for `take_socket_error` to see if the connect actually hit an error or
// not.
//
// If all that succeeded then we ship everything on up.
if let Poll::Pending = stream.io_handle.poll_writable(cx)? {
state = State::Waiting(stream);
return Poll::Pending;
}

if let Some(err) = stream.io_handle.get_ref().take_error()? {
return Poll::Ready(Err(err));
}

Poll::Ready(Ok(stream))
}
State::Error(err) => Poll::Ready(Err(err)),
State::Done => panic!("`TcpStream::connect_to()` future polled after completion"),
}
})
.await
}

/// Returns the local address that this stream is connected to.
///
/// ## Examples
Expand Down