Skip to content

Add stream timeout #348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 28, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,16 @@ cfg_if! {
cfg_if! {
if #[cfg(any(feature = "unstable", feature = "docs"))] {
mod merge;
mod timeout;

use std::pin::Pin;
use std::time::Duration;

use crate::future::Future;
use crate::stream::FromStream;

pub use merge::Merge;
pub use timeout::TimeoutStream;
}
}

Expand Down Expand Up @@ -1044,6 +1047,40 @@ extension_trait! {
Skip::new(self, n)
}

#[doc=r#"
Await a stream or times out after a duration of time.

If you want to await an I/O future consider using
[`io::timeout`](../io/fn.timeout.html) instead.

# Examples

```
# fn main() -> std::io::Result<()> { async_std::task::block_on(async {
#
use std::time::Duration;

use async_std::stream;
use async_std::prelude::*;

let mut s = stream::repeat(1).take(3).timeout(Duration::from_secs(1));

while let Some(v) = s.next().await {
assert_eq!(v, Ok(1));
}
#
# Ok(()) }) }
```
"#]
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn timeout(self, dur: Duration) -> TimeoutStream<Self>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't add the "Stream" suffix to stream types so this should be just Timeout<Self>. See other similar stream types for reference (e.g. Empty, Repeat, Chain).

where
Self: Stream + Sized,
{
TimeoutStream::new(self, dur)
}

#[doc = r#"
A combinator that applies a function as long as it returns successfully, producing a single, final value.
Immediately returns the error when the function returns unsuccessfully.
Expand Down
57 changes: 57 additions & 0 deletions src/stream/stream/timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::error::Error;
use std::fmt;
use std::pin::Pin;
use std::time::Duration;

use futures_timer::Delay;

use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};

#[doc(hidden)]
#[derive(Debug)]
pub struct TimeoutStream<S> {
stream: S,
delay: Delay,
}

impl<S> TimeoutStream<S> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_pinned!(delay: Delay);

pub fn new(stream: S, dur: Duration) -> TimeoutStream<S> {
let delay = Delay::new(dur);

TimeoutStream { stream, delay }
}
}

impl<S: Stream> Stream for TimeoutStream<S> {
type Item = Result<S::Item, TimeoutError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.as_mut().stream().poll_next(cx) {
Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => match self.delay().poll(cx) {
Poll::Ready(_) => Poll::Ready(Some(Err(TimeoutError))),
Poll::Pending => Poll::Pending,
},
}
}
}

/// An error returned when a stream times out.
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct TimeoutError;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we export this too?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this type should not be constructible by end-users. To do prevent users from being able to create instances of TimeoutError, do:

pub struct TimeoutError {
    _private: (),
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed it. b6edd12


impl Error for TimeoutError {}

impl fmt::Display for TimeoutError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"stream has timed out".fmt(f)
}
}