-
Notifications
You must be signed in to change notification settings - Fork 339
Add stream timeout #348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add stream timeout #348
Changes from 2 commits
f00d32e
7a87dea
10f32ca
f1ed034
c3f6f96
0a40734
b58bd8d
b17af61
feeb3c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -104,13 +104,16 @@ cfg_if! { | |
cfg_if! { | ||
if #[cfg(any(feature = "unstable", feature = "docs"))] { | ||
mod merge; | ||
mod timeout; | ||
|
||
use std::pin::Pin; | ||
use std::time::Duration; | ||
|
||
use crate::future::Future; | ||
use crate::stream::FromStream; | ||
|
||
pub use merge::Merge; | ||
pub use timeout::TimeoutStream; | ||
} | ||
} | ||
|
||
|
@@ -1044,6 +1047,40 @@ extension_trait! { | |
Skip::new(self, n) | ||
} | ||
|
||
#[doc=r#" | ||
Await a stream or times out after a duration of time. | ||
|
||
If you want to await an I/O future consider using | ||
[`io::timeout`](../io/fn.timeout.html) instead. | ||
|
||
# Examples | ||
|
||
``` | ||
# fn main() -> std::io::Result<()> { async_std::task::block_on(async { | ||
# | ||
use std::time::Duration; | ||
|
||
use async_std::stream; | ||
use async_std::prelude::*; | ||
|
||
let mut s = stream::repeat(1).take(3).timeout(Duration::from_secs(1)); | ||
|
||
while let Some(v) = s.next().await { | ||
assert_eq!(v, Ok(1)); | ||
} | ||
# | ||
# Ok(()) }) } | ||
``` | ||
"#] | ||
#[cfg(any(feature = "unstable", feature = "docs"))] | ||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] | ||
fn timeout(self, dur: Duration) -> TimeoutStream<Self> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't add the "Stream" suffix to stream types so this should be just |
||
where | ||
Self: Stream + Sized, | ||
{ | ||
TimeoutStream::new(self, dur) | ||
} | ||
|
||
#[doc = r#" | ||
A combinator that applies a function as long as it returns successfully, producing a single, final value. | ||
Immediately returns the error when the function returns unsuccessfully. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
use std::error::Error; | ||
use std::fmt; | ||
use std::pin::Pin; | ||
use std::time::Duration; | ||
|
||
use futures_timer::Delay; | ||
|
||
use crate::future::Future; | ||
use crate::stream::Stream; | ||
use crate::task::{Context, Poll}; | ||
|
||
#[doc(hidden)] | ||
k-nasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
#[derive(Debug)] | ||
pub struct TimeoutStream<S> { | ||
k-nasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
stream: S, | ||
delay: Delay, | ||
} | ||
|
||
impl<S> TimeoutStream<S> { | ||
pin_utils::unsafe_pinned!(stream: S); | ||
pin_utils::unsafe_pinned!(delay: Delay); | ||
|
||
pub fn new(stream: S, dur: Duration) -> TimeoutStream<S> { | ||
let delay = Delay::new(dur); | ||
|
||
TimeoutStream { stream, delay } | ||
} | ||
} | ||
|
||
impl<S: Stream> Stream for TimeoutStream<S> { | ||
type Item = Result<S::Item, TimeoutError>; | ||
|
||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
match self.as_mut().stream().poll_next(cx) { | ||
Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), | ||
Poll::Ready(None) => Poll::Ready(None), | ||
Poll::Pending => match self.delay().poll(cx) { | ||
Poll::Ready(_) => Poll::Ready(Some(Err(TimeoutError))), | ||
Poll::Pending => Poll::Pending, | ||
}, | ||
} | ||
} | ||
} | ||
|
||
/// An error returned when a stream times out. | ||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] | ||
#[cfg(any(feature = "unstable", feature = "docs"))] | ||
#[derive(Clone, Copy, Debug, Eq, PartialEq)] | ||
pub struct TimeoutError; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we export this too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, this type should not be constructible by end-users. To do prevent users from being able to create instances of pub struct TimeoutError {
_private: (),
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I fixed it. b6edd12 |
||
|
||
impl Error for TimeoutError {} | ||
|
||
impl fmt::Display for TimeoutError { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
"stream has timed out".fmt(f) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.