Skip to content

Commit d3b5cd4

Browse files
author
Stjepan Glavina
committed
Small refactor for v1.0
1 parent aa0dee8 commit d3b5cd4

File tree

6 files changed

+107
-47
lines changed

6 files changed

+107
-47
lines changed

README.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ This crate provides two tools:
1515

1616
* `Async`, an adapter for standard networking types (and [many other] types) to use in
1717
async programs.
18-
* `Timer`, a future that expires after a duration of time.
18+
* `Timer`, a future that expires at a point in time.
1919

2020
For concrete async networking types built on top of this crate, see [`async-net`].
2121

@@ -29,7 +29,8 @@ The purpose of this thread is to wait for I/O events reported by the operating s
2929
wake appropriate futures blocked on I/O or timers when they can be resumed.
3030

3131
To wait for the next I/O event, the "async-io" thread uses [epoll] on Linux/Android/illumos,
32-
[kqueue] on macOS/iOS/BSD, [event ports] on illumos/Solaris, and [wepoll] on Windows.
32+
[kqueue] on macOS/iOS/BSD, [event ports] on illumos/Solaris, and [wepoll] on Windows. That
33+
functionality is provided by the [`polling`] crate.
3334

3435
However, note that you can also process I/O events and wake futures manually if using the
3536
`parking` module. The "async-io" thread is therefore just a fallback mechanism processing I/O
@@ -41,6 +42,7 @@ See the `parking` module for more details.
4142
[kqueue]: https://en.wikipedia.org/wiki/Kqueue
4243
[event ports]: https://illumos.org/man/port_create
4344
[wepoll]: https://github.com/piscisaureus/wepoll
45+
[`polling`]: https://docs.rs/polling
4446

4547
## Examples
4648

@@ -56,7 +58,7 @@ use std::time::Duration;
5658
let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
5759

5860
let stream = Async::<TcpStream>::connect(addr).or(async {
59-
Timer::new(Duration::from_secs(10)).await;
61+
Timer::after(Duration::from_secs(10)).await;
6062
Err(io::ErrorKind::TimedOut.into())
6163
})
6264
.await?;

examples/block_on.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ use waker_fn::waker_fn;
1515

1616
fn block_on<T>(future: impl Future<Output = T>) -> T {
1717
let (p, u) = parking::pair();
18-
let waker = waker_fn(move || u.unpark());
18+
let waker = waker_fn(move || {
19+
u.unpark();
20+
});
1921
let cx = &mut Context::from_waker(&waker);
2022

2123
pin!(future);

src/lib.rs

Lines changed: 72 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//!
55
//! * [`Async`], an adapter for standard networking types (and [many other] types) to use in
66
//! async programs.
7-
//! * [`Timer`], a future that expires after a duration of time.
7+
//! * [`Timer`], a future that expires at a point in time.
88
//!
99
//! For concrete async networking types built on top of this crate, see [`async-net`].
1010
//!
@@ -18,7 +18,8 @@
1818
//! wake appropriate futures blocked on I/O or timers when they can be resumed.
1919
//!
2020
//! To wait for the next I/O event, the "async-io" thread uses [epoll] on Linux/Android/illumos,
21-
//! [kqueue] on macOS/iOS/BSD, [event ports] on illumos/Solaris, and [wepoll] on Windows.
21+
//! [kqueue] on macOS/iOS/BSD, [event ports] on illumos/Solaris, and [wepoll] on Windows. That
22+
//! functionality is provided by the [`polling`] crate.
2223
//!
2324
//! However, note that you can also process I/O events and wake futures manually if using the
2425
//! [`parking`] module. The "async-io" thread is therefore just a fallback mechanism processing I/O
@@ -30,6 +31,7 @@
3031
//! [kqueue]: https://en.wikipedia.org/wiki/Kqueue
3132
//! [event ports]: https://illumos.org/man/port_create
3233
//! [wepoll]: https://github.com/piscisaureus/wepoll
34+
//! [`polling`]: https://docs.rs/polling
3335
//!
3436
//! # Examples
3537
//!
@@ -46,7 +48,7 @@
4648
//! let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
4749
//!
4850
//! let stream = Async::<TcpStream>::connect(addr).or(async {
49-
//! Timer::new(Duration::from_secs(10)).await;
51+
//! Timer::after(Duration::from_secs(10)).await;
5052
//! Err(io::ErrorKind::TimedOut.into())
5153
//! })
5254
//! .await?;
@@ -82,7 +84,7 @@ use crate::reactor::{Reactor, Source};
8284
pub mod parking;
8385
mod reactor;
8486

85-
/// A timer that expires after a duration of time.
87+
/// A future that expires at a point in time.
8688
///
8789
/// Timers are futures that output the [`Instant`] at which they fired.
8890
///
@@ -95,7 +97,7 @@ mod reactor;
9597
/// use std::time::Duration;
9698
///
9799
/// async fn sleep(dur: Duration) {
98-
/// Timer::new(dur).await;
100+
/// Timer::after(dur).await;
99101
/// }
100102
///
101103
/// # futures_lite::future::block_on(async {
@@ -114,15 +116,48 @@ pub struct Timer {
114116
}
115117

116118
impl Timer {
117-
/// Creates a timer that expires at the specified instant in time.
118-
pub fn at(when: Instant) -> Timer {
119+
/// Creates a timer that expires after the given duration of time.
120+
///
121+
/// # Examples
122+
///
123+
/// ```
124+
/// use async_io::Timer;
125+
/// use std::time::Duration;
126+
///
127+
/// # futures_lite::future::block_on(async {
128+
/// Timer::after(Duration::from_secs(1)).await;
129+
/// # });
130+
/// ```
131+
pub fn after(duration: Duration) -> Timer {
132+
Timer::at(Instant::now() + duration)
133+
}
134+
135+
/// Creates a timer that expires at the given time instant.
136+
///
137+
/// # Examples
138+
///
139+
/// ```
140+
/// use async_io::Timer;
141+
/// use std::time::{Duration, Instant};
142+
///
143+
/// # futures_lite::future::block_on(async {
144+
/// let now = Instant::now();
145+
/// let when = now + Duration::from_secs(1);
146+
/// Timer::at(when).await;
147+
/// # });
148+
/// ```
149+
pub fn at(instant: Instant) -> Timer {
119150
Timer {
120151
id_and_waker: None,
121-
when,
152+
when: instant,
122153
}
123154
}
124155

125-
/// Creates a timer that expires after the given duration of time.
156+
/// Sets the timer to expire after the new duration of time.
157+
///
158+
/// Note that resetting a timer is different from creating a new timer because
159+
/// [`set_after()`][`Timer::set_after()`] does not remove the waker associated with the task
160+
/// that is polling the timer.
126161
///
127162
/// # Examples
128163
///
@@ -131,38 +166,42 @@ impl Timer {
131166
/// use std::time::Duration;
132167
///
133168
/// # futures_lite::future::block_on(async {
134-
/// Timer::new(Duration::from_secs(1)).await;
169+
/// let mut t = Timer::after(Duration::from_secs(1));
170+
/// t.set_after(Duration::from_millis(100));
135171
/// # });
136172
/// ```
137-
pub fn new(dur: Duration) -> Timer {
138-
Timer::at(Instant::now() + dur)
139-
}
173+
pub fn set_after(&mut self, duration: Duration) {
174+
self.set_at(Instant::now() + duration);
175+
}
140176

141-
/// Resets the timer to expire after the new duration of time.
177+
/// Sets the timer to expire at the new time instant.
142178
///
143179
/// Note that resetting a timer is different from creating a new timer because
144-
/// [`reset()`][`Timer::reset()`] does not remove the waker associated with the task that is
145-
/// polling the timer.
180+
/// [`set_after()`][`Timer::set_after()`] does not remove the waker associated with the task
181+
/// that is polling the timer.
146182
///
147183
/// # Examples
148184
///
149185
/// ```
150186
/// use async_io::Timer;
151-
/// use std::time::Duration;
187+
/// use std::time::{Duration, Instant};
152188
///
153189
/// # futures_lite::future::block_on(async {
154-
/// let mut t = Timer::new(Duration::from_secs(1));
155-
/// t.reset(Duration::from_millis(100));
190+
/// let mut t = Timer::after(Duration::from_secs(1));
191+
///
192+
/// let now = Instant::now();
193+
/// let when = now + Duration::from_secs(1);
194+
/// t.set_at(when);
156195
/// # });
157196
/// ```
158-
pub fn reset(&mut self, dur: Duration) {
197+
pub fn set_at(&mut self, instant: Instant) {
159198
if let Some((id, _)) = self.id_and_waker.as_ref() {
160199
// Deregister the timer from the reactor.
161200
Reactor::get().remove_timer(self.when, *id);
162201
}
163202

164203
// Update the timeout.
165-
self.when = Instant::now() + dur;
204+
self.when = instant;
166205

167206
if let Some((id, waker)) = self.id_and_waker.as_mut() {
168207
// Re-register the timer with the new timeout.
@@ -213,14 +252,14 @@ impl Future for Timer {
213252
}
214253
}
215254

216-
/// Async I/O.
255+
/// Async adapter for I/O types.
217256
///
218-
/// This type converts a blocking I/O type into an async type, provided it is supported by
219-
/// [epoll]/[kqueue]/[event ports]/[wepoll].
257+
/// This type puts an I/O handle into non-blocking mode, registers it in
258+
/// [epoll]/[kqueue]/[event ports]/[wepoll], and then provides an async interface for it.
220259
///
221260
/// **NOTE:** Do not use this type with [`File`][`std::fs::File`], [`Stdin`][`std::io::Stdin`],
222-
/// [`Stdout`][`std::io::Stdout`], or [`Stderr`][`std::io::Stderr`] because they're not
223-
/// supported.
261+
/// [`Stdout`][`std::io::Stdout`], or [`Stderr`][`std::io::Stderr`] because all of the supported
262+
/// operating systems have issues with them when put in non-blocking mode.
224263
///
225264
/// [epoll]: https://en.wikipedia.org/wiki/Epoll
226265
/// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
@@ -245,7 +284,7 @@ impl Future for Timer {
245284
/// # std::io::Result::Ok(()) });
246285
/// ```
247286
///
248-
/// You can use predefined async methods or wrap blocking I/O operations in
287+
/// You can use either predefined async methods or wrap blocking I/O operations in
249288
/// [`Async::read_with()`], [`Async::read_with_mut()`], [`Async::write_with()`], and
250289
/// [`Async::write_with_mut()`]:
251290
///
@@ -387,7 +426,9 @@ impl<T> Async<T> {
387426
self.io.as_mut().unwrap()
388427
}
389428

390-
/// Unwraps the inner non-blocking I/O handle.
429+
/// Unwraps the inner I/O handle.
430+
///
431+
/// This method will **not** put the I/O handle back into blocking mode.
391432
///
392433
/// # Examples
393434
///
@@ -398,6 +439,9 @@ impl<T> Async<T> {
398439
/// # futures_lite::future::block_on(async {
399440
/// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
400441
/// let inner = listener.into_inner()?;
442+
///
443+
/// // Put the listener back into blocking mode.
444+
/// inner.set_nonblocking(false)?;
401445
/// # std::io::Result::Ok(()) });
402446
/// ```
403447
pub fn into_inner(mut self) -> io::Result<T> {

src/parking.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
//! // Blocks on a future to complete, processing I/O events when idle.
2828
//! fn block_on<T>(future: impl Future<Output = T>) -> T {
2929
//! let (p, u) = parking::pair();
30-
//! let waker = waker_fn(move || u.unpark());
30+
//! let waker = waker_fn(move || {
31+
//! u.unpark();
32+
//! });
3133
//! let cx = &mut Context::from_waker(&waker);
3234
//!
3335
//! pin!(future);
@@ -159,6 +161,9 @@ impl Parker {
159161

160162
/// Notifies the parker.
161163
///
164+
/// Returns `true` if this call is the first to notify the parker, or `false` if the parker was
165+
/// already notified.
166+
///
162167
/// # Examples
163168
///
164169
/// ```
@@ -177,10 +182,12 @@ impl Parker {
177182
/// // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
178183
/// p.park();
179184
/// ```
180-
pub fn unpark(&self) {
181-
if self.inner.unpark() && self.io.load(Ordering::SeqCst) {
185+
pub fn unpark(&self) -> bool {
186+
let unparked = self.inner.unpark();
187+
if unparked && self.io.load(Ordering::SeqCst) {
182188
Reactor::get().notify();
183189
}
190+
unparked
184191
}
185192

186193
/// Returns a handle for unparking.
@@ -302,6 +309,9 @@ pub struct Unparker {
302309
impl Unparker {
303310
/// Notifies the associated parker.
304311
///
312+
/// Returns `true` if this call is the first to notify the parker, or `false` if the parker was
313+
/// already notified.
314+
///
305315
/// # Examples
306316
///
307317
/// ```
@@ -320,9 +330,11 @@ impl Unparker {
320330
/// // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
321331
/// p.park();
322332
/// ```
323-
pub fn unpark(&self) {
324-
if self.inner.unpark() && self.io.load(Ordering::SeqCst) {
333+
pub fn unpark(&self) -> bool {
334+
let unparked = self.inner.unpark();
335+
if unparked && self.io.load(Ordering::SeqCst) {
325336
Reactor::get().notify();
326337
}
338+
unparked
327339
}
328340
}

tests/async.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ fn tcp_reader_hangup() -> io::Result<()> {
9393
let stream1 = task.await?.0;
9494

9595
let task = spawn(async move {
96-
Timer::new(Duration::from_secs(1)).await;
96+
Timer::after(Duration::from_secs(1)).await;
9797
drop(stream1);
9898
});
9999

@@ -115,7 +115,7 @@ fn tcp_writer_hangup() -> io::Result<()> {
115115
let stream1 = task.await?.0;
116116

117117
let task = spawn(async move {
118-
Timer::new(Duration::from_secs(1)).await;
118+
Timer::after(Duration::from_secs(1)).await;
119119
drop(stream1);
120120
});
121121

@@ -251,7 +251,7 @@ fn uds_reader_hangup() -> io::Result<()> {
251251
let (socket1, mut socket2) = Async::<UnixStream>::pair()?;
252252

253253
let task = spawn(async move {
254-
Timer::new(Duration::from_secs(1)).await;
254+
Timer::after(Duration::from_secs(1)).await;
255255
drop(socket1);
256256
});
257257

@@ -269,7 +269,7 @@ fn uds_writer_hangup() -> io::Result<()> {
269269
let (socket1, mut socket2) = Async::<UnixStream>::pair()?;
270270

271271
let task = spawn(async move {
272-
Timer::new(Duration::from_secs(1)).await;
272+
Timer::after(Duration::from_secs(1)).await;
273273
drop(socket1);
274274
});
275275

@@ -317,7 +317,7 @@ fn tcp_duplex() -> io::Result<()> {
317317
let w1 = spawn(do_write(stream1));
318318

319319
// Sleep a bit, so that reading and writing are both blocked.
320-
Timer::new(Duration::from_millis(5)).await;
320+
Timer::after(Duration::from_millis(5)).await;
321321

322322
// Start reading stream2, make stream1 writable.
323323
let r2 = spawn(do_read(stream2.clone()));

0 commit comments

Comments
 (0)