Skip to content

Commit 36d24cd

Browse files
author
Stjepan Glavina
committed
New scheduler resilient to blocking
1 parent 9311fd7 commit 36d24cd

22 files changed

+623
-456
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ default = [
2626
"async-task",
2727
"crossbeam-channel",
2828
"crossbeam-deque",
29+
"crossbeam-queue",
2930
"futures-timer",
3031
"kv-log-macro",
3132
"log",
@@ -56,6 +57,7 @@ async-task = { version = "1.0.0", optional = true }
5657
broadcaster = { version = "0.2.6", optional = true, default-features = false, features = ["default-channels"] }
5758
crossbeam-channel = { version = "0.4.0", optional = true }
5859
crossbeam-deque = { version = "0.7.2", optional = true }
60+
crossbeam-queue = { version = "0.2.0", optional = true }
5961
crossbeam-utils = { version = "0.7.0", optional = true }
6062
futures-core = { version = "0.3.1", optional = true }
6163
futures-io = { version = "0.3.1", optional = true }

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,8 @@ cfg_std! {
246246
pub mod stream;
247247
pub mod sync;
248248
pub mod task;
249+
250+
pub(crate) mod rt;
249251
}
250252

251253
cfg_default! {

src/net/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,5 @@ pub use tcp::{Incoming, TcpListener, TcpStream};
6666
pub use udp::UdpSocket;
6767

6868
mod addr;
69-
pub(crate) mod driver;
7069
mod tcp;
7170
mod udp;

src/net/tcp/listener.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::pin::Pin;
44

55
use crate::future;
66
use crate::io;
7-
use crate::net::driver::Watcher;
7+
use crate::rt::Watcher;
88
use crate::net::{TcpStream, ToSocketAddrs};
99
use crate::stream::Stream;
1010
use crate::task::{Context, Poll};

src/net/tcp/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::pin::Pin;
44

55
use crate::future;
66
use crate::io::{self, Read, Write};
7-
use crate::net::driver::Watcher;
7+
use crate::rt::Watcher;
88
use crate::net::ToSocketAddrs;
99
use crate::task::{spawn_blocking, Context, Poll};
1010
use crate::utils::Context as _;

src/net/udp/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use std::net::SocketAddr;
33
use std::net::{Ipv4Addr, Ipv6Addr};
44

55
use crate::future;
6-
use crate::net::driver::Watcher;
76
use crate::net::ToSocketAddrs;
7+
use crate::rt::Watcher;
88
use crate::utils::Context as _;
99

1010
/// A UDP socket.
@@ -102,7 +102,7 @@ impl UdpSocket {
102102
/// ```no_run
103103
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
104104
/// #
105-
/// use async_std::net::UdpSocket;
105+
/// use async_std::net::UdpSocket;
106106
///
107107
/// let socket = UdpSocket::bind("127.0.0.1:0").await?;
108108
/// let addr = socket.local_addr()?;

src/os/unix/net/datagram.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use mio_uds;
88
use super::SocketAddr;
99
use crate::future;
1010
use crate::io;
11-
use crate::net::driver::Watcher;
11+
use crate::rt::Watcher;
1212
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
1313
use crate::path::Path;
1414
use crate::task::spawn_blocking;

src/os/unix/net/listener.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use super::SocketAddr;
1010
use super::UnixStream;
1111
use crate::future;
1212
use crate::io;
13-
use crate::net::driver::Watcher;
13+
use crate::rt::Watcher;
1414
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
1515
use crate::path::Path;
1616
use crate::stream::Stream;

src/os/unix/net/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use mio_uds;
99

1010
use super::SocketAddr;
1111
use crate::io::{self, Read, Write};
12-
use crate::net::driver::Watcher;
12+
use crate::rt::Watcher;
1313
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
1414
use crate::path::Path;
1515
use crate::task::{spawn_blocking, Context, Poll};

src/rt/mod.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
//! The runtime.
2+
3+
use std::thread;
4+
5+
use once_cell::sync::Lazy;
6+
7+
use crate::utils::abort_on_panic;
8+
9+
pub use reactor::{Reactor, Watcher};
10+
pub use runtime::Runtime;
11+
12+
mod reactor;
13+
mod runtime;
14+
15+
/// The global runtime.
16+
pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
17+
thread::Builder::new()
18+
.name("async-std/runtime".to_string())
19+
.spawn(|| abort_on_panic(|| RUNTIME.run()))
20+
.expect("cannot start a runtime thread");
21+
22+
Runtime::new()
23+
});

src/net/driver/mod.rs renamed to src/rt/reactor.rs

Lines changed: 40 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use std::fmt;
22
use std::sync::{Arc, Mutex};
3+
use std::time::Duration;
34

45
use mio::{self, Evented};
5-
use once_cell::sync::Lazy;
66
use slab::Slab;
77

88
use crate::io;
9+
use crate::rt::RUNTIME;
910
use crate::task::{Context, Poll, Waker};
10-
use crate::utils::abort_on_panic;
1111

1212
/// Data associated with a registered I/O handle.
1313
#[derive(Debug)]
@@ -18,15 +18,18 @@ struct Entry {
1818
/// Tasks that are blocked on reading from this I/O handle.
1919
readers: Mutex<Vec<Waker>>,
2020

21-
/// Thasks that are blocked on writing to this I/O handle.
21+
/// Tasks that are blocked on writing to this I/O handle.
2222
writers: Mutex<Vec<Waker>>,
2323
}
2424

2525
/// The state of a networking driver.
26-
struct Reactor {
26+
pub struct Reactor {
2727
/// A mio instance that polls for new events.
2828
poller: mio::Poll,
2929

30+
/// A list into which mio stores events.
31+
events: Mutex<mio::Events>,
32+
3033
/// A collection of registered I/O handles.
3134
entries: Mutex<Slab<Arc<Entry>>>,
3235

@@ -39,12 +42,13 @@ struct Reactor {
3942

4043
impl Reactor {
4144
/// Creates a new reactor for polling I/O events.
42-
fn new() -> io::Result<Reactor> {
45+
pub fn new() -> io::Result<Reactor> {
4346
let poller = mio::Poll::new()?;
4447
let notify_reg = mio::Registration::new2();
4548

4649
let mut reactor = Reactor {
4750
poller,
51+
events: Mutex::new(mio::Events::with_capacity(1000)),
4852
entries: Mutex::new(Slab::new()),
4953
notify_reg,
5054
notify_token: mio::Token(0),
@@ -92,72 +96,60 @@ impl Reactor {
9296
Ok(())
9397
}
9498

95-
// fn notify(&self) {
96-
// self.notify_reg
97-
// .1
98-
// .set_readiness(mio::Ready::readable())
99-
// .unwrap();
100-
// }
101-
}
99+
/// Notifies the reactor so that polling stops blocking.
100+
pub fn notify(&self) -> io::Result<()> {
101+
self.notify_reg.1.set_readiness(mio::Ready::readable())
102+
}
103+
104+
/// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
105+
///
106+
/// Returns `Ok(true)` if at least one new task was woken.
107+
pub fn poll(&self, timeout: Option<Duration>) -> io::Result<bool> {
108+
let mut events = self.events.lock().unwrap();
102109

103-
/// The state of the global networking driver.
104-
static REACTOR: Lazy<Reactor> = Lazy::new(|| {
105-
// Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O
106-
// handles.
107-
std::thread::Builder::new()
108-
.name("async-std/net".to_string())
109-
.spawn(move || {
110-
// If the driver thread panics, there's not much we can do. It is not a
111-
// recoverable error and there is no place to propagate it into so we just abort.
112-
abort_on_panic(|| {
113-
main_loop().expect("async networking thread has panicked");
114-
})
115-
})
116-
.expect("cannot start a thread driving blocking tasks");
117-
118-
Reactor::new().expect("cannot initialize reactor")
119-
});
120-
121-
/// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
122-
fn main_loop() -> io::Result<()> {
123-
let reactor = &REACTOR;
124-
let mut events = mio::Events::with_capacity(1000);
125-
126-
loop {
127110
// Block on the poller until at least one new event comes in.
128-
reactor.poller.poll(&mut events, None)?;
111+
self.poller.poll(&mut events, timeout)?;
129112

130113
// Lock the entire entry table while we're processing new events.
131-
let entries = reactor.entries.lock().unwrap();
114+
let entries = self.entries.lock().unwrap();
115+
116+
// The number of woken tasks.
117+
let mut progress = false;
132118

133119
for event in events.iter() {
134120
let token = event.token();
135121

136-
if token == reactor.notify_token {
122+
if token == self.notify_token {
137123
// If this is the notification token, we just need the notification state.
138-
reactor.notify_reg.1.set_readiness(mio::Ready::empty())?;
124+
self.notify_reg.1.set_readiness(mio::Ready::empty())?;
139125
} else {
140126
// Otherwise, look for the entry associated with this token.
141127
if let Some(entry) = entries.get(token.0) {
142128
// Set the readiness flags from this I/O event.
143129
let readiness = event.readiness();
144130

145131
// Wake up reader tasks blocked on this I/O handle.
146-
if !(readiness & reader_interests()).is_empty() {
132+
let reader_interests = mio::Ready::all() - mio::Ready::writable();
133+
if !(readiness & reader_interests).is_empty() {
147134
for w in entry.readers.lock().unwrap().drain(..) {
148135
w.wake();
136+
progress = true;
149137
}
150138
}
151139

152140
// Wake up writer tasks blocked on this I/O handle.
153-
if !(readiness & writer_interests()).is_empty() {
141+
let writer_interests = mio::Ready::all() - mio::Ready::readable();
142+
if !(readiness & writer_interests).is_empty() {
154143
for w in entry.writers.lock().unwrap().drain(..) {
155144
w.wake();
145+
progress = true;
156146
}
157147
}
158148
}
159149
}
160150
}
151+
152+
Ok(progress)
161153
}
162154
}
163155

@@ -180,7 +172,8 @@ impl<T: Evented> Watcher<T> {
180172
/// lifetime of the returned I/O handle.
181173
pub fn new(source: T) -> Watcher<T> {
182174
Watcher {
183-
entry: REACTOR
175+
entry: RUNTIME
176+
.reactor()
184177
.register(&source)
185178
.expect("cannot register an I/O event source"),
186179
source: Some(source),
@@ -264,7 +257,8 @@ impl<T: Evented> Watcher<T> {
264257
#[allow(dead_code)]
265258
pub fn into_inner(mut self) -> T {
266259
let source = self.source.take().unwrap();
267-
REACTOR
260+
RUNTIME
261+
.reactor()
268262
.deregister(&source, &self.entry)
269263
.expect("cannot deregister I/O event source");
270264
source
@@ -274,7 +268,8 @@ impl<T: Evented> Watcher<T> {
274268
impl<T: Evented> Drop for Watcher<T> {
275269
fn drop(&mut self) {
276270
if let Some(ref source) = self.source {
277-
REACTOR
271+
RUNTIME
272+
.reactor()
278273
.deregister(source, &self.entry)
279274
.expect("cannot deregister I/O event source");
280275
}
@@ -289,27 +284,3 @@ impl<T: Evented + fmt::Debug> fmt::Debug for Watcher<T> {
289284
.finish()
290285
}
291286
}
292-
293-
/// Returns a mask containing flags that interest tasks reading from I/O handles.
294-
#[inline]
295-
fn reader_interests() -> mio::Ready {
296-
mio::Ready::all() - mio::Ready::writable()
297-
}
298-
299-
/// Returns a mask containing flags that interest tasks writing into I/O handles.
300-
#[inline]
301-
fn writer_interests() -> mio::Ready {
302-
mio::Ready::writable() | hup()
303-
}
304-
305-
/// Returns a flag containing the hangup status.
306-
#[inline]
307-
fn hup() -> mio::Ready {
308-
#[cfg(unix)]
309-
let ready = mio::unix::UnixReady::hup().into();
310-
311-
#[cfg(not(unix))]
312-
let ready = mio::Ready::empty();
313-
314-
ready
315-
}

0 commit comments

Comments
 (0)