From 80870af3bb2e72d81eecc245e7a350fe5370febb Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 8 Apr 2020 12:55:24 +0200 Subject: [PATCH 1/6] mechanical first pass at updating to mio 0.7 --- Cargo.toml | 7 ++-- src/net/tcp/listener.rs | 12 +++--- src/net/tcp/stream.rs | 6 +-- src/net/udp/mod.rs | 10 ++--- src/os/unix/net/datagram.rs | 22 +++++------ src/os/unix/net/listener.rs | 44 ++++++++-------------- src/os/unix/net/stream.rs | 39 ++++++++++--------- src/rt/reactor.rs | 74 ++++++++++++++++++------------------- 8 files changed, 99 insertions(+), 115 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d49eb957b..d5bff22d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,9 @@ default = [ "kv-log-macro", "log", "mio", - "mio-uds", + "mio/tcp", + "mio/udp", + "mio/uds", "num_cpus", "pin-project-lite", ] @@ -67,8 +69,7 @@ futures-timer = { version = "3.0.2", optional = true } kv-log-macro = { version = "1.0.4", optional = true } log = { version = "0.4.8", features = ["kv_unstable"], optional = true } memchr = { version = "2.3.3", optional = true } -mio = { version = "0.6.19", optional = true } -mio-uds = { version = "0.6.7", optional = true } +mio = { version = "0.7.0", optional = true } num_cpus = { version = "1.12.0", optional = true } once_cell = { version = "1.3.1", optional = true } pin-project-lite = { version = "0.1.4", optional = true } diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 9e15d40f6..dc0b9172d 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -5,8 +5,8 @@ use std::sync::Arc; use crate::future; use crate::io; -use crate::rt::Watcher; use crate::net::{TcpStream, ToSocketAddrs}; +use crate::rt::Watcher; use crate::stream::Stream; use crate::task::{Context, Poll}; @@ -79,7 +79,7 @@ impl TcpListener { let addrs = addrs.to_socket_addrs().await?; for addr in addrs { - match mio::net::TcpListener::bind(&addr) { + match mio::net::TcpListener::bind(addr) { Ok(mio_listener) => { return Ok(TcpListener { watcher: Watcher::new(mio_listener), @@ -114,11 +114,9 @@ impl TcpListener { /// # Ok(()) }) } /// ``` pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { - let (io, addr) = - future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.accept_std())) - .await?; + let (mio_stream, addr) = + future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.accept())).await?; - let mio_stream = mio::net::TcpStream::from_stream(io)?; let stream = TcpStream { watcher: Arc::new(Watcher::new(mio_stream)), }; @@ -206,7 +204,7 @@ impl<'a> Stream for Incoming<'a> { impl From for TcpListener { /// Converts a `std::net::TcpListener` into its asynchronous equivalent. fn from(listener: std::net::TcpListener) -> TcpListener { - let mio_listener = mio::net::TcpListener::from_std(listener).unwrap(); + let mio_listener = mio::net::TcpListener::from_std(listener); TcpListener { watcher: Watcher::new(mio_listener), } diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 1f50e8f1e..e1bb067b4 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -5,8 +5,8 @@ use std::sync::Arc; use crate::future; use crate::io::{self, Read, Write}; -use crate::rt::Watcher; use crate::net::ToSocketAddrs; +use crate::rt::Watcher; use crate::task::{Context, Poll}; /// A TCP stream between a local and a remote socket. @@ -79,7 +79,7 @@ impl TcpStream { // when it returns with `Ok`. We therefore wait for write readiness to // be sure the connection has either been established or there was an // error which we check for afterwards. - let watcher = match mio::net::TcpStream::connect(&addr) { + let watcher = match mio::net::TcpStream::connect(addr) { Ok(s) => Watcher::new(s), Err(e) => { last_err = Some(e); @@ -370,7 +370,7 @@ impl Write for &TcpStream { impl From for TcpStream { /// Converts a `std::net::TcpStream` into its asynchronous equivalent. fn from(stream: std::net::TcpStream) -> TcpStream { - let mio_stream = mio::net::TcpStream::from_stream(stream).unwrap(); + let mio_stream = mio::net::TcpStream::from_std(stream); TcpStream { watcher: Arc::new(Watcher::new(mio_stream)), } diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index 774478d3b..b1a702593 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -69,12 +69,10 @@ impl UdpSocket { /// ``` pub async fn bind(addrs: A) -> io::Result { let mut last_err = None; - let addrs = addrs - .to_socket_addrs() - .await?; + let addrs = addrs.to_socket_addrs().await?; for addr in addrs { - match mio::net::UdpSocket::bind(&addr) { + match mio::net::UdpSocket::bind(addr) { Ok(mio_socket) => { return Ok(UdpSocket { watcher: Watcher::new(mio_socket), @@ -155,7 +153,7 @@ impl UdpSocket { future::poll_fn(|cx| { self.watcher - .poll_write_with(cx, |inner| inner.send_to(buf, &addr)) + .poll_write_with(cx, |inner| inner.send_to(buf, addr)) }) .await .context(|| format!("could not send packet to {}", addr)) @@ -498,7 +496,7 @@ impl UdpSocket { impl From for UdpSocket { /// Converts a `std::net::UdpSocket` into its asynchronous equivalent. fn from(socket: std::net::UdpSocket) -> UdpSocket { - let mio_socket = mio::net::UdpSocket::from_socket(socket).unwrap(); + let mio_socket = mio::net::UdpSocket::from_std(socket); UdpSocket { watcher: Watcher::new(mio_socket), } diff --git a/src/os/unix/net/datagram.rs b/src/os/unix/net/datagram.rs index 5a2d6ec91..20b1046a0 100644 --- a/src/os/unix/net/datagram.rs +++ b/src/os/unix/net/datagram.rs @@ -3,14 +3,12 @@ use std::fmt; use std::net::Shutdown; -use mio_uds; - use super::SocketAddr; use crate::future; use crate::io; -use crate::rt::Watcher; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::path::Path; +use crate::rt::Watcher; use crate::task::spawn_blocking; /// A Unix datagram socket. @@ -42,11 +40,11 @@ use crate::task::spawn_blocking; /// # Ok(()) }) } /// ``` pub struct UnixDatagram { - watcher: Watcher, + watcher: Watcher, } impl UnixDatagram { - fn new(socket: mio_uds::UnixDatagram) -> UnixDatagram { + fn new(socket: mio::net::UnixDatagram) -> UnixDatagram { UnixDatagram { watcher: Watcher::new(socket), } @@ -67,7 +65,7 @@ impl UnixDatagram { /// ``` pub async fn bind>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - let socket = spawn_blocking(move || mio_uds::UnixDatagram::bind(path)).await?; + let socket = spawn_blocking(move || mio::net::UnixDatagram::bind(path)).await?; Ok(UnixDatagram::new(socket)) } @@ -85,7 +83,7 @@ impl UnixDatagram { /// # Ok(()) }) } /// ``` pub fn unbound() -> io::Result { - let socket = mio_uds::UnixDatagram::unbound()?; + let socket = mio::net::UnixDatagram::unbound()?; Ok(UnixDatagram::new(socket)) } @@ -105,7 +103,7 @@ impl UnixDatagram { /// # Ok(()) }) } /// ``` pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { - let (a, b) = mio_uds::UnixDatagram::pair()?; + let (a, b) = mio::net::UnixDatagram::pair()?; let a = UnixDatagram::new(a); let b = UnixDatagram::new(b); Ok((a, b)) @@ -152,7 +150,7 @@ impl UnixDatagram { /// # /// # Ok(()) }) } /// ``` - pub fn local_addr(&self) -> io::Result { + pub fn local_addr(&self) -> io::Result { self.watcher.get_ref().local_addr() } @@ -175,7 +173,7 @@ impl UnixDatagram { /// # /// # Ok(()) }) } /// ``` - pub fn peer_addr(&self) -> io::Result { + pub fn peer_addr(&self) -> io::Result { self.watcher.get_ref().peer_addr() } @@ -196,7 +194,7 @@ impl UnixDatagram { /// # /// # Ok(()) }) } /// ``` - pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, mio::net::SocketAddr)> { future::poll_fn(|cx| { self.watcher .poll_read_with(cx, |inner| inner.recv_from(buf)) @@ -315,7 +313,7 @@ impl fmt::Debug for UnixDatagram { impl From for UnixDatagram { /// Converts a `std::os::unix::net::UnixDatagram` into its asynchronous equivalent. fn from(datagram: std::os::unix::net::UnixDatagram) -> UnixDatagram { - let mio_datagram = mio_uds::UnixDatagram::from_datagram(datagram).unwrap(); + let mio_datagram = mio::net::UnixDatagram::from_std(datagram); UnixDatagram { watcher: Watcher::new(mio_datagram), } diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index 9f6bdcbc5..ecced860c 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -1,18 +1,16 @@ //! Unix-specific networking extensions. use std::fmt; -use std::pin::Pin; use std::future::Future; - -use mio_uds; +use std::pin::Pin; use super::SocketAddr; use super::UnixStream; use crate::future; use crate::io; -use crate::rt::Watcher; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::path::Path; +use crate::rt::Watcher; use crate::stream::Stream; use crate::task::{spawn_blocking, Context, Poll}; @@ -50,7 +48,7 @@ use crate::task::{spawn_blocking, Context, Poll}; /// # Ok(()) }) } /// ``` pub struct UnixListener { - watcher: Watcher, + watcher: Watcher, } impl UnixListener { @@ -69,7 +67,7 @@ impl UnixListener { /// ``` pub async fn bind>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - let listener = spawn_blocking(move || mio_uds::UnixListener::bind(path)).await?; + let listener = spawn_blocking(move || mio::net::UnixListener::bind(path)).await?; Ok(UnixListener { watcher: Watcher::new(listener), @@ -92,28 +90,16 @@ impl UnixListener { /// # /// # Ok(()) }) } /// ``` - pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { + pub async fn accept(&self) -> io::Result<(UnixStream, mio::net::SocketAddr)> { future::poll_fn(|cx| { - let res = futures_core::ready!(self.watcher.poll_read_with(cx, |inner| { - match inner.accept_std() { - // Converting to `WouldBlock` so that the watcher will - // add the waker of this task to a list of readers. - Ok(None) => Err(io::ErrorKind::WouldBlock.into()), - res => res, - } - })); - - match res? { - Some((io, addr)) => { - let mio_stream = mio_uds::UnixStream::from_stream(io)?; - let stream = UnixStream { - watcher: Watcher::new(mio_stream), - }; - Poll::Ready(Ok((stream, addr))) - } - // This should never happen since `None` is converted to `WouldBlock` - None => unreachable!(), - } + let res = + futures_core::ready!(self.watcher.poll_read_with(cx, |inner| { inner.accept() })); + + let (mio_stream, addr) = res?; + let stream = UnixStream { + watcher: Watcher::new(mio_stream), + }; + Poll::Ready(Ok((stream, addr))) }) .await } @@ -162,7 +148,7 @@ impl UnixListener { /// # /// # Ok(()) }) } /// ``` - pub fn local_addr(&self) -> io::Result { + pub fn local_addr(&self) -> io::Result { self.watcher.get_ref().local_addr() } } @@ -209,7 +195,7 @@ impl Stream for Incoming<'_> { impl From for UnixListener { /// Converts a `std::os::unix::net::UnixListener` into its asynchronous equivalent. fn from(listener: std::os::unix::net::UnixListener) -> UnixListener { - let mio_listener = mio_uds::UnixListener::from_listener(listener).unwrap(); + let mio_listener = mio::net::UnixListener::from_std(listener); UnixListener { watcher: Watcher::new(mio_listener), } diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs index a1c83f1b9..169ad22d2 100644 --- a/src/os/unix/net/stream.rs +++ b/src/os/unix/net/stream.rs @@ -5,14 +5,13 @@ use std::io::{Read as _, Write as _}; use std::net::Shutdown; use std::pin::Pin; -use mio_uds; - use super::SocketAddr; +use crate::future; use crate::io::{self, Read, Write}; -use crate::rt::Watcher; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::path::Path; -use crate::task::{spawn_blocking, Context, Poll}; +use crate::rt::Watcher; +use crate::task::{Context, Poll}; /// A Unix stream socket. /// @@ -38,7 +37,7 @@ use crate::task::{spawn_blocking, Context, Poll}; /// # Ok(()) }) } /// ``` pub struct UnixStream { - pub(super) watcher: Watcher, + pub(super) watcher: Watcher, } impl UnixStream { @@ -58,14 +57,20 @@ impl UnixStream { pub async fn connect>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - spawn_blocking(move || { - let std_stream = std::os::unix::net::UnixStream::connect(path)?; - let mio_stream = mio_uds::UnixStream::from_stream(std_stream)?; - Ok(UnixStream { - watcher: Watcher::new(mio_stream), - }) - }) - .await + // mio's UnixStream::connect is non-blocking and may just be in progress + // when it returns with `Ok`. We therefore wait for write readiness to + // be sure the connection has either been established or there was an + // error which we check for afterwards. + let mio_stream = mio::net::UnixStream::connect(path)?; + let watcher = Watcher::new(mio_stream); + + future::poll_fn(|cx| watcher.poll_write_ready(cx)).await; + + match watcher.get_ref().take_error() { + Ok(None) => Ok(UnixStream { watcher }), + Ok(Some(e)) => Err(e), + Err(e) => Err(e), + } } /// Creates an unnamed pair of connected sockets. @@ -84,7 +89,7 @@ impl UnixStream { /// # Ok(()) }) } /// ``` pub fn pair() -> io::Result<(UnixStream, UnixStream)> { - let (a, b) = mio_uds::UnixStream::pair()?; + let (a, b) = mio::net::UnixStream::pair()?; let a = UnixStream { watcher: Watcher::new(a), }; @@ -108,7 +113,7 @@ impl UnixStream { /// # /// # Ok(()) }) } /// ``` - pub fn local_addr(&self) -> io::Result { + pub fn local_addr(&self) -> io::Result { self.watcher.get_ref().local_addr() } @@ -126,7 +131,7 @@ impl UnixStream { /// # /// # Ok(()) }) } /// ``` - pub fn peer_addr(&self) -> io::Result { + pub fn peer_addr(&self) -> io::Result { self.watcher.get_ref().peer_addr() } @@ -230,7 +235,7 @@ impl fmt::Debug for UnixStream { impl From for UnixStream { /// Converts a `std::os::unix::net::UnixStream` into its asynchronous equivalent. fn from(stream: std::os::unix::net::UnixStream) -> UnixStream { - let mio_stream = mio_uds::UnixStream::from_stream(stream).unwrap(); + let mio_stream = mio::net::UnixStream::from_std(stream); UnixStream { watcher: Watcher::new(mio_stream), } diff --git a/src/rt/reactor.rs b/src/rt/reactor.rs index 2a35b72c5..e03749fef 100644 --- a/src/rt/reactor.rs +++ b/src/rt/reactor.rs @@ -2,7 +2,7 @@ use std::fmt; use std::sync::{Arc, Mutex}; use std::time::Duration; -use mio::{self, Evented}; +use mio::{self, event::Source, Interest}; use slab::Slab; use crate::io; @@ -64,25 +64,26 @@ impl Reactor { /// Creates a new reactor for polling I/O events. pub fn new() -> io::Result { let poller = mio::Poll::new()?; - let notify_reg = mio::Registration::new2(); - - let mut reactor = Reactor { - poller, - events: Mutex::new(mio::Events::with_capacity(1000)), - entries: Mutex::new(Slab::new()), - notify_reg, - notify_token: mio::Token(0), - }; - - // Register a dummy I/O handle for waking up the polling thread. - let entry = reactor.register(&reactor.notify_reg.0)?; - reactor.notify_token = entry.token; - - Ok(reactor) + todo!(); + // let notify_reg = mio::Registration::new2(); + + // let mut reactor = Reactor { + // poller, + // events: Mutex::new(mio::Events::with_capacity(1000)), + // entries: Mutex::new(Slab::new()), + // notify_reg, + // notify_token: mio::Token(0), + // }; + + // // Register a dummy I/O handle for waking up the polling thread. + // let entry = reactor.register(&reactor.notify_reg.0)?; + // reactor.notify_token = entry.token; + + // Ok(reactor) } /// Registers an I/O event source and returns its associated entry. - fn register(&self, source: &dyn Evented) -> io::Result> { + fn register(&self, source: &mut dyn Source) -> io::Result> { let mut entries = self.entries.lock().unwrap(); // Reserve a vacant spot in the slab and use its key as the token value. @@ -104,17 +105,17 @@ impl Reactor { vacant.insert(entry.clone()); // Register the I/O event source in the poller. - let interest = mio::Ready::all(); - let opts = mio::PollOpt::edge(); - self.poller.register(source, token, interest, opts)?; + // TODO: ADD AIO and LIO? + const interest: Interest = Interest::READABLE.add(Interest::WRITABLE); + self.poller.registry().register(source, token, interest)?; Ok(entry) } /// Deregisters an I/O event source associated with an entry. - fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> { + fn deregister(&self, source: &mut dyn Source, entry: &Entry) -> io::Result<()> { // Deregister the I/O object from the mio instance. - self.poller.deregister(source)?; + self.poller.registry().deregister(source)?; // Remove the entry associated with the I/O object. self.entries.lock().unwrap().remove(entry.token.0); @@ -124,7 +125,8 @@ impl Reactor { /// Notifies the reactor so that polling stops blocking. pub fn notify(&self) -> io::Result<()> { - self.notify_reg.1.set_readiness(mio::Ready::readable()) + todo!() + // self.notify_reg.1.set_readiness(mio::Ready::readable()) } /// Waits on the poller for new events and wakes up tasks blocked on I/O handles. @@ -147,16 +149,13 @@ impl Reactor { if token == self.notify_token { // If this is the notification token, we just need the notification state. - self.notify_reg.1.set_readiness(mio::Ready::empty())?; + todo!() + // self.notify_reg.1.set_readiness(mio::Ready::empty())?; } else { // Otherwise, look for the entry associated with this token. if let Some(entry) = entries.get(token.0) { - // Set the readiness flags from this I/O event. - let readiness = event.readiness(); - // Wake up reader tasks blocked on this I/O handle. - let reader_interests = mio::Ready::all() - mio::Ready::writable(); - if !(readiness & reader_interests).is_empty() { + if event.is_readable() { let mut readers = entry.readers.lock().unwrap(); readers.ready = true; for w in readers.wakers.drain(..) { @@ -166,8 +165,7 @@ impl Reactor { } // Wake up writer tasks blocked on this I/O handle. - let writer_interests = mio::Ready::all() - mio::Ready::readable(); - if !(readiness & writer_interests).is_empty() { + if event.is_writable() { let mut writers = entry.writers.lock().unwrap(); writers.ready = true; for w in writers.wakers.drain(..) { @@ -187,7 +185,7 @@ impl Reactor { /// /// This handle wraps an I/O event source and exposes a "futurized" interface on top of it, /// implementing traits `AsyncRead` and `AsyncWrite`. -pub struct Watcher { +pub struct Watcher { /// Data associated with the I/O handle. entry: Arc, @@ -195,7 +193,7 @@ pub struct Watcher { source: Option, } -impl Watcher { +impl Watcher { /// Creates a new I/O handle. /// /// The provided I/O event source will be kept registered inside the reactor's poller for the @@ -204,7 +202,7 @@ impl Watcher { Watcher { entry: RUNTIME .reactor() - .register(&source) + .register(&mut source) .expect("cannot register an I/O event source"), source: Some(source), } @@ -327,15 +325,15 @@ impl Watcher { let source = self.source.take().unwrap(); RUNTIME .reactor() - .deregister(&source, &self.entry) + .deregister(&mut source, &self.entry) .expect("cannot deregister I/O event source"); source } } -impl Drop for Watcher { +impl Drop for Watcher { fn drop(&mut self) { - if let Some(ref source) = self.source { + if let Some(ref mut source) = self.source { RUNTIME .reactor() .deregister(source, &self.entry) @@ -344,7 +342,7 @@ impl Drop for Watcher { } } -impl fmt::Debug for Watcher { +impl fmt::Debug for Watcher { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Watcher") .field("entry", &self.entry) From 9d0f2addd3cd6be2481e893a71a4c726158e1d7a Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 8 Apr 2020 13:21:05 +0200 Subject: [PATCH 2/6] get reactor compiling --- Cargo.toml | 1 + src/rt/reactor.rs | 61 ++++++++++++++++++++++++----------------------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d5bff22d6..c07911a1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ default = [ "kv-log-macro", "log", "mio", + "mio/os-poll", "mio/tcp", "mio/udp", "mio/uds", diff --git a/src/rt/reactor.rs b/src/rt/reactor.rs index e03749fef..1ea52ca92 100644 --- a/src/rt/reactor.rs +++ b/src/rt/reactor.rs @@ -9,6 +9,9 @@ use crate::io; use crate::rt::RUNTIME; use crate::task::{Context, Poll, Waker}; +// TODO: ADD AIO and LIO? +const INTEREST_ALL: Interest = Interest::READABLE.add(Interest::WRITABLE); + /// Data associated with a registered I/O handle. #[derive(Debug)] struct Entry { @@ -25,7 +28,7 @@ struct Entry { /// The state of a networking driver. pub struct Reactor { /// A mio instance that polls for new events. - poller: mio::Poll, + poller: Mutex, /// A list into which mio stores events. events: Mutex, @@ -33,8 +36,8 @@ pub struct Reactor { /// A collection of registered I/O handles. entries: Mutex>>, - /// Dummy I/O handle that is only used to wake up the polling thread. - notify_reg: (mio::Registration, mio::SetReadiness), + /// Mio waker that is only used to wake up the polling thread. + notify_waker: mio::Waker, /// An identifier for the notification handle. notify_token: mio::Token, @@ -64,22 +67,20 @@ impl Reactor { /// Creates a new reactor for polling I/O events. pub fn new() -> io::Result { let poller = mio::Poll::new()?; - todo!(); - // let notify_reg = mio::Registration::new2(); - - // let mut reactor = Reactor { - // poller, - // events: Mutex::new(mio::Events::with_capacity(1000)), - // entries: Mutex::new(Slab::new()), - // notify_reg, - // notify_token: mio::Token(0), - // }; - - // // Register a dummy I/O handle for waking up the polling thread. - // let entry = reactor.register(&reactor.notify_reg.0)?; - // reactor.notify_token = entry.token; - - // Ok(reactor) + + // Register a waker for waking up the polling thread. + let notify_token = mio::Token(0); // TODO: is this being 0 okay? + let notify_waker = mio::Waker::new(poller.registry(), notify_token)?; + + let reactor = Reactor { + poller: Mutex::new(poller), + events: Mutex::new(mio::Events::with_capacity(1000)), + entries: Mutex::new(Slab::new()), + notify_waker, + notify_token, + }; + + Ok(reactor) } /// Registers an I/O event source and returns its associated entry. @@ -105,9 +106,11 @@ impl Reactor { vacant.insert(entry.clone()); // Register the I/O event source in the poller. - // TODO: ADD AIO and LIO? - const interest: Interest = Interest::READABLE.add(Interest::WRITABLE); - self.poller.registry().register(source, token, interest)?; + self.poller + .lock() + .unwrap() + .registry() + .register(source, token, INTEREST_ALL)?; Ok(entry) } @@ -115,7 +118,7 @@ impl Reactor { /// Deregisters an I/O event source associated with an entry. fn deregister(&self, source: &mut dyn Source, entry: &Entry) -> io::Result<()> { // Deregister the I/O object from the mio instance. - self.poller.registry().deregister(source)?; + self.poller.lock().unwrap().registry().deregister(source)?; // Remove the entry associated with the I/O object. self.entries.lock().unwrap().remove(entry.token.0); @@ -125,8 +128,7 @@ impl Reactor { /// Notifies the reactor so that polling stops blocking. pub fn notify(&self) -> io::Result<()> { - todo!() - // self.notify_reg.1.set_readiness(mio::Ready::readable()) + self.notify_waker.wake() } /// Waits on the poller for new events and wakes up tasks blocked on I/O handles. @@ -136,7 +138,7 @@ impl Reactor { let mut events = self.events.lock().unwrap(); // Block on the poller until at least one new event comes in. - self.poller.poll(&mut events, timeout)?; + self.poller.lock().unwrap().poll(&mut events, timeout)?; // Lock the entire entry table while we're processing new events. let entries = self.entries.lock().unwrap(); @@ -149,8 +151,7 @@ impl Reactor { if token == self.notify_token { // If this is the notification token, we just need the notification state. - todo!() - // self.notify_reg.1.set_readiness(mio::Ready::empty())?; + self.notify_waker.wake()?; } else { // Otherwise, look for the entry associated with this token. if let Some(entry) = entries.get(token.0) { @@ -198,7 +199,7 @@ impl Watcher { /// /// The provided I/O event source will be kept registered inside the reactor's poller for the /// lifetime of the returned I/O handle. - pub fn new(source: T) -> Watcher { + pub fn new(mut source: T) -> Watcher { Watcher { entry: RUNTIME .reactor() @@ -322,7 +323,7 @@ impl Watcher { /// This method is typically used to convert `Watcher`s to raw file descriptors/handles. #[allow(dead_code)] pub fn into_inner(mut self) -> T { - let source = self.source.take().unwrap(); + let mut source = self.source.take().unwrap(); RUNTIME .reactor() .deregister(&mut source, &self.entry) From 121bc7d726ac8938e9f98e895f2241b7737092df Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 8 Apr 2020 13:45:39 +0200 Subject: [PATCH 3/6] set nonblocking mode on conversion from std --- src/net/tcp/listener.rs | 5 +++++ src/net/tcp/stream.rs | 5 +++++ src/net/udp/mod.rs | 5 +++++ src/os/unix/net/datagram.rs | 5 +++++ src/os/unix/net/listener.rs | 5 +++++ src/os/unix/net/stream.rs | 5 +++++ 6 files changed, 30 insertions(+) diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index dc0b9172d..fc99d9f69 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -204,6 +204,11 @@ impl<'a> Stream for Incoming<'a> { impl From for TcpListener { /// Converts a `std::net::TcpListener` into its asynchronous equivalent. fn from(listener: std::net::TcpListener) -> TcpListener { + // Make sure we are in nonblocking mode. + listener + .set_nonblocking(true) + .expect("failed to set nonblocking mode"); + let mio_listener = mio::net::TcpListener::from_std(listener); TcpListener { watcher: Watcher::new(mio_listener), diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index e1bb067b4..7952964f8 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -370,6 +370,11 @@ impl Write for &TcpStream { impl From for TcpStream { /// Converts a `std::net::TcpStream` into its asynchronous equivalent. fn from(stream: std::net::TcpStream) -> TcpStream { + // Make sure we are in nonblocking mode. + stream + .set_nonblocking(true) + .expect("failed to set nonblocking mode"); + let mio_stream = mio::net::TcpStream::from_std(stream); TcpStream { watcher: Arc::new(Watcher::new(mio_stream)), diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index b1a702593..4698b45ad 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -496,6 +496,11 @@ impl UdpSocket { impl From for UdpSocket { /// Converts a `std::net::UdpSocket` into its asynchronous equivalent. fn from(socket: std::net::UdpSocket) -> UdpSocket { + // Make sure we are in nonblocking mode. + socket + .set_nonblocking(true) + .expect("failed to set nonblocking mode"); + let mio_socket = mio::net::UdpSocket::from_std(socket); UdpSocket { watcher: Watcher::new(mio_socket), diff --git a/src/os/unix/net/datagram.rs b/src/os/unix/net/datagram.rs index 20b1046a0..c7c9403aa 100644 --- a/src/os/unix/net/datagram.rs +++ b/src/os/unix/net/datagram.rs @@ -313,6 +313,11 @@ impl fmt::Debug for UnixDatagram { impl From for UnixDatagram { /// Converts a `std::os::unix::net::UnixDatagram` into its asynchronous equivalent. fn from(datagram: std::os::unix::net::UnixDatagram) -> UnixDatagram { + // Make sure we are in nonblocking mode. + datagram + .set_nonblocking(true) + .expect("failed to set nonblocking mode"); + let mio_datagram = mio::net::UnixDatagram::from_std(datagram); UnixDatagram { watcher: Watcher::new(mio_datagram), diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index ecced860c..e77d42393 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -195,6 +195,11 @@ impl Stream for Incoming<'_> { impl From for UnixListener { /// Converts a `std::os::unix::net::UnixListener` into its asynchronous equivalent. fn from(listener: std::os::unix::net::UnixListener) -> UnixListener { + // Make sure we are in nonblocking mode. + listener + .set_nonblocking(true) + .expect("failed to set nonblocking mode"); + let mio_listener = mio::net::UnixListener::from_std(listener); UnixListener { watcher: Watcher::new(mio_listener), diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs index 169ad22d2..3cd8fc94a 100644 --- a/src/os/unix/net/stream.rs +++ b/src/os/unix/net/stream.rs @@ -235,6 +235,11 @@ impl fmt::Debug for UnixStream { impl From for UnixStream { /// Converts a `std::os::unix::net::UnixStream` into its asynchronous equivalent. fn from(stream: std::os::unix::net::UnixStream) -> UnixStream { + // Make sure we are in nonblocking mode. + stream + .set_nonblocking(true) + .expect("failed to set nonblocking mode"); + let mio_stream = mio::net::UnixStream::from_std(stream); UnixStream { watcher: Watcher::new(mio_stream), From 56769d8b6c39f560229ec354452ba802aefc6755 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 8 Apr 2020 17:08:44 +0200 Subject: [PATCH 4/6] fix token handling --- src/os/unix/net/datagram.rs | 5 ++--- src/os/unix/net/listener.rs | 11 +++++------ src/os/unix/net/stream.rs | 1 + src/rt/reactor.rs | 18 ++++++++++++++++-- 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/os/unix/net/datagram.rs b/src/os/unix/net/datagram.rs index c7c9403aa..1c4ea7d5e 100644 --- a/src/os/unix/net/datagram.rs +++ b/src/os/unix/net/datagram.rs @@ -9,7 +9,6 @@ use crate::io; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::path::Path; use crate::rt::Watcher; -use crate::task::spawn_blocking; /// A Unix datagram socket. /// @@ -65,8 +64,8 @@ impl UnixDatagram { /// ``` pub async fn bind>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - let socket = spawn_blocking(move || mio::net::UnixDatagram::bind(path)).await?; - Ok(UnixDatagram::new(socket)) + let mio_socket = mio::net::UnixDatagram::bind(path)?; + Ok(UnixDatagram::new(mio_socket)) } /// Creates a Unix datagram which is not bound to any address. diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index e77d42393..579180858 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -12,7 +12,7 @@ use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::path::Path; use crate::rt::Watcher; use crate::stream::Stream; -use crate::task::{spawn_blocking, Context, Poll}; +use crate::task::{Context, Poll}; /// A Unix domain socket server, listening for connections. /// @@ -67,10 +67,10 @@ impl UnixListener { /// ``` pub async fn bind>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - let listener = spawn_blocking(move || mio::net::UnixListener::bind(path)).await?; + let mio_listener = mio::net::UnixListener::bind(path)?; Ok(UnixListener { - watcher: Watcher::new(listener), + watcher: Watcher::new(mio_listener), }) } @@ -92,10 +92,9 @@ impl UnixListener { /// ``` pub async fn accept(&self) -> io::Result<(UnixStream, mio::net::SocketAddr)> { future::poll_fn(|cx| { - let res = - futures_core::ready!(self.watcher.poll_read_with(cx, |inner| { inner.accept() })); + let (mio_stream, addr) = + futures_core::ready!(self.watcher.poll_read_with(cx, |inner| { inner.accept() }))?; - let (mio_stream, addr) = res?; let stream = UnixStream { watcher: Watcher::new(mio_stream), }; diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs index 3cd8fc94a..86063c235 100644 --- a/src/os/unix/net/stream.rs +++ b/src/os/unix/net/stream.rs @@ -211,6 +211,7 @@ impl Write for &UnixStream { } fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + self.shutdown(std::net::Shutdown::Write)?; Poll::Ready(Ok(())) } } diff --git a/src/rt/reactor.rs b/src/rt/reactor.rs index 1ea52ca92..afef99e5c 100644 --- a/src/rt/reactor.rs +++ b/src/rt/reactor.rs @@ -67,15 +67,29 @@ impl Reactor { /// Creates a new reactor for polling I/O events. pub fn new() -> io::Result { let poller = mio::Poll::new()?; + let mut entries = Slab::new(); // Register a waker for waking up the polling thread. - let notify_token = mio::Token(0); // TODO: is this being 0 okay? + let vacant = entries.vacant_entry(); + let notify_token = mio::Token(vacant.key()); let notify_waker = mio::Waker::new(poller.registry(), notify_token)?; + // dumy entry to avoid reusing the same token + vacant.insert(Arc::new(Entry { + token: notify_token.clone(), + readers: Mutex::new(Readers { + ready: false, + wakers: Vec::new(), + }), + writers: Mutex::new(Writers { + ready: false, + wakers: Vec::new(), + }), + })); let reactor = Reactor { poller: Mutex::new(poller), events: Mutex::new(mio::Events::with_capacity(1000)), - entries: Mutex::new(Slab::new()), + entries: Mutex::new(entries), notify_waker, notify_token, }; From 1e99e3453fc3657d147ef8cc55f592ca9f2b076c Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 8 Apr 2020 18:53:18 +0200 Subject: [PATCH 5/6] always call shutdown --- src/net/tcp/stream.rs | 1 + src/os/unix/net/stream.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 7952964f8..ef9bc500e 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -343,6 +343,7 @@ impl Write for TcpStream { } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.shutdown(std::net::Shutdown::Write)?; Pin::new(&mut &*self).poll_close(cx) } } diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs index 86063c235..eb4cd4ec2 100644 --- a/src/os/unix/net/stream.rs +++ b/src/os/unix/net/stream.rs @@ -192,6 +192,7 @@ impl Write for UnixStream { } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.shutdown(std::net::Shutdown::Write)?; Pin::new(&mut &*self).poll_close(cx) } } From 4fd8fe03f3bea6a37804178a78d22f6f1faef0db Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 8 Apr 2020 18:54:06 +0200 Subject: [PATCH 6/6] cleanup imports --- src/os/unix/net/datagram.rs | 1 - src/os/unix/net/listener.rs | 1 - src/os/unix/net/stream.rs | 1 - 3 files changed, 3 deletions(-) diff --git a/src/os/unix/net/datagram.rs b/src/os/unix/net/datagram.rs index 1c4ea7d5e..ab51c6430 100644 --- a/src/os/unix/net/datagram.rs +++ b/src/os/unix/net/datagram.rs @@ -3,7 +3,6 @@ use std::fmt; use std::net::Shutdown; -use super::SocketAddr; use crate::future; use crate::io; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index 579180858..029670846 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -4,7 +4,6 @@ use std::fmt; use std::future::Future; use std::pin::Pin; -use super::SocketAddr; use super::UnixStream; use crate::future; use crate::io; diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs index eb4cd4ec2..897360671 100644 --- a/src/os/unix/net/stream.rs +++ b/src/os/unix/net/stream.rs @@ -5,7 +5,6 @@ use std::io::{Read as _, Write as _}; use std::net::Shutdown; use std::pin::Pin; -use super::SocketAddr; use crate::future; use crate::io::{self, Read, Write}; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};