From f00d32ee7d5c6afd1abc9db5a0e60081af7296ea Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 16 Oct 2019 15:30:52 +0900 Subject: [PATCH 1/7] Add TimeoutStream struct --- src/stream/stream/timeout.rs | 57 ++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 src/stream/stream/timeout.rs diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs new file mode 100644 index 000000000..7a8cf477b --- /dev/null +++ b/src/stream/stream/timeout.rs @@ -0,0 +1,57 @@ +use std::error::Error; +use std::fmt; +use std::pin::Pin; +use std::time::Duration; + +use futures_timer::Delay; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[derive(Debug)] +pub struct TimeoutStream { + stream: S, + delay: Delay, +} + +impl TimeoutStream { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_pinned!(delay: Delay); + + pub fn new(stream: S, dur: Duration) -> TimeoutStream { + let delay = Delay::new(dur); + + TimeoutStream { stream, delay } + } +} + +impl Stream for TimeoutStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.as_mut().stream().poll_next(cx) { + Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => match self.delay().poll(cx) { + Poll::Ready(_) => Poll::Ready(Some(Err(TimeoutError))), + Poll::Pending => Poll::Pending, + }, + } + } +} + +/// An error returned when a stream times out. +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[cfg(any(feature = "unstable", feature = "docs"))] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct TimeoutError; + +impl Error for TimeoutError {} + +impl fmt::Display for TimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "stream has timed out".fmt(f) + } +} From 7a87dea085884eebbe7472be5b4658218b58cd32 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 16 Oct 2019 15:31:07 +0900 Subject: [PATCH 2/7] feat: Add Stream::timeout --- src/stream/stream/mod.rs | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index d582d700e..c44917d10 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -104,13 +104,16 @@ cfg_if! { cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { mod merge; + mod timeout; use std::pin::Pin; + use std::time::Duration; use crate::future::Future; use crate::stream::FromStream; pub use merge::Merge; + pub use timeout::TimeoutStream; } } @@ -1044,6 +1047,40 @@ extension_trait! { Skip::new(self, n) } + #[doc=r#" + Await a stream or times out after a duration of time. + + If you want to await an I/O future consider using + [`io::timeout`](../io/fn.timeout.html) instead. + + # Examples + + ``` + # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + # + use std::time::Duration; + + use async_std::stream; + use async_std::prelude::*; + + let mut s = stream::repeat(1).take(3).timeout(Duration::from_secs(1)); + + while let Some(v) = s.next().await { + assert_eq!(v, Ok(1)); + } + # + # Ok(()) }) } + ``` + "#] + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn timeout(self, dur: Duration) -> TimeoutStream + where + Self: Stream + Sized, + { + TimeoutStream::new(self, dur) + } + #[doc = r#" A combinator that applies a function as long as it returns successfully, producing a single, final value. Immediately returns the error when the function returns unsuccessfully. From 10f32ca817551565d6602911a79ad6a9736fde95 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 16 Oct 2019 19:49:07 +0900 Subject: [PATCH 3/7] Fix TimeoutError --- src/stream/stream/timeout.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs index 7a8cf477b..f73ae8715 100644 --- a/src/stream/stream/timeout.rs +++ b/src/stream/stream/timeout.rs @@ -35,7 +35,7 @@ impl Stream for TimeoutStream { Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => match self.delay().poll(cx) { - Poll::Ready(_) => Poll::Ready(Some(Err(TimeoutError))), + Poll::Ready(_) => Poll::Ready(Some(Err(TimeoutError { _private: () }))), Poll::Pending => Poll::Pending, }, } @@ -46,7 +46,9 @@ impl Stream for TimeoutStream { #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg(any(feature = "unstable", feature = "docs"))] #[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub struct TimeoutError; +pub struct TimeoutError { + _private: (), +} impl Error for TimeoutError {} From f1ed034600f17eb6ab2756e22e2fdc0f2944dd87 Mon Sep 17 00:00:00 2001 From: nasa Date: Wed, 16 Oct 2019 22:21:32 +0900 Subject: [PATCH 4/7] Update src/stream/stream/mod.rs Co-Authored-By: Yoshua Wuyts --- src/stream/stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index c44917d10..47b3f835a 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -113,7 +113,7 @@ cfg_if! { use crate::stream::FromStream; pub use merge::Merge; - pub use timeout::TimeoutStream; + pub use timeout::{TimeoutError, TimeoutStream}; } } From c3f6f969c51de4717ae4ef6639bb7f1ad916a6e8 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 16 Oct 2019 22:56:17 +0900 Subject: [PATCH 5/7] fix: Rename TimeoutStream to Timeout --- src/stream/stream/mod.rs | 6 +++--- src/stream/stream/timeout.rs | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index c44917d10..a881a7aec 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -113,7 +113,7 @@ cfg_if! { use crate::stream::FromStream; pub use merge::Merge; - pub use timeout::TimeoutStream; + pub use timeout::Timeout; } } @@ -1074,11 +1074,11 @@ extension_trait! { "#] #[cfg(any(feature = "unstable", feature = "docs"))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - fn timeout(self, dur: Duration) -> TimeoutStream + fn timeout(self, dur: Duration) -> Timeout where Self: Stream + Sized, { - TimeoutStream::new(self, dur) + Timeout::new(self, dur) } #[doc = r#" diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs index f73ae8715..042dc12b0 100644 --- a/src/stream/stream/timeout.rs +++ b/src/stream/stream/timeout.rs @@ -11,23 +11,23 @@ use crate::task::{Context, Poll}; #[doc(hidden)] #[derive(Debug)] -pub struct TimeoutStream { +pub struct Timeout { stream: S, delay: Delay, } -impl TimeoutStream { +impl Timeout { pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_pinned!(delay: Delay); - pub fn new(stream: S, dur: Duration) -> TimeoutStream { + pub fn new(stream: S, dur: Duration) -> Timeout { let delay = Delay::new(dur); - TimeoutStream { stream, delay } + Timeout { stream, delay } } } -impl Stream for TimeoutStream { +impl Stream for Timeout { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { From 0a4073449beac09b0bb2492a3754b57171147d5f Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 16 Oct 2019 22:56:48 +0900 Subject: [PATCH 6/7] doc: Add Stream::Timeout doc --- src/stream/stream/timeout.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs index 042dc12b0..7e0270e3b 100644 --- a/src/stream/stream/timeout.rs +++ b/src/stream/stream/timeout.rs @@ -9,7 +9,7 @@ use crate::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; -#[doc(hidden)] +/// A stream with timeout time set #[derive(Debug)] pub struct Timeout { stream: S, From feeb3c10df6725b35f67bc9c8cc73e4a68e46e4d Mon Sep 17 00:00:00 2001 From: k-nasa Date: Thu, 24 Oct 2019 08:41:01 +0900 Subject: [PATCH 7/7] fix: Remove Pin API related unsafe code --- src/stream/stream/timeout.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs index 7e0270e3b..3c14811fb 100644 --- a/src/stream/stream/timeout.rs +++ b/src/stream/stream/timeout.rs @@ -4,22 +4,24 @@ use std::pin::Pin; use std::time::Duration; use futures_timer::Delay; +use pin_project_lite::pin_project; use crate::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; -/// A stream with timeout time set -#[derive(Debug)] -pub struct Timeout { - stream: S, - delay: Delay, +pin_project! { + /// A stream with timeout time set + #[derive(Debug)] + pub struct Timeout { + #[pin] + stream: S, + #[pin] + delay: Delay, + } } impl Timeout { - pin_utils::unsafe_pinned!(stream: S); - pin_utils::unsafe_pinned!(delay: Delay); - pub fn new(stream: S, dur: Duration) -> Timeout { let delay = Delay::new(dur); @@ -30,11 +32,13 @@ impl Timeout { impl Stream for Timeout { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.as_mut().stream().poll_next(cx) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + match this.stream.poll_next(cx) { Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => match self.delay().poll(cx) { + Poll::Pending => match this.delay.poll(cx) { Poll::Ready(_) => Poll::Ready(Some(Err(TimeoutError { _private: () }))), Poll::Pending => Poll::Pending, },