Skip to content

Commit 7a87dea

Browse files
committed
feat: Add Stream::timeout
1 parent f00d32e commit 7a87dea

File tree

1 file changed

+37
-0
lines changed

1 file changed

+37
-0
lines changed

src/stream/stream/mod.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,16 @@ cfg_if! {
104104
cfg_if! {
105105
if #[cfg(any(feature = "unstable", feature = "docs"))] {
106106
mod merge;
107+
mod timeout;
107108

108109
use std::pin::Pin;
110+
use std::time::Duration;
109111

110112
use crate::future::Future;
111113
use crate::stream::FromStream;
112114

113115
pub use merge::Merge;
116+
pub use timeout::TimeoutStream;
114117
}
115118
}
116119

@@ -1044,6 +1047,40 @@ extension_trait! {
10441047
Skip::new(self, n)
10451048
}
10461049

1050+
#[doc=r#"
1051+
Await a stream or times out after a duration of time.
1052+
1053+
If you want to await an I/O future consider using
1054+
[`io::timeout`](../io/fn.timeout.html) instead.
1055+
1056+
# Examples
1057+
1058+
```
1059+
# fn main() -> std::io::Result<()> { async_std::task::block_on(async {
1060+
#
1061+
use std::time::Duration;
1062+
1063+
use async_std::stream;
1064+
use async_std::prelude::*;
1065+
1066+
let mut s = stream::repeat(1).take(3).timeout(Duration::from_secs(1));
1067+
1068+
while let Some(v) = s.next().await {
1069+
assert_eq!(v, Ok(1));
1070+
}
1071+
#
1072+
# Ok(()) }) }
1073+
```
1074+
"#]
1075+
#[cfg(any(feature = "unstable", feature = "docs"))]
1076+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1077+
fn timeout(self, dur: Duration) -> TimeoutStream<Self>
1078+
where
1079+
Self: Stream + Sized,
1080+
{
1081+
TimeoutStream::new(self, dur)
1082+
}
1083+
10471084
#[doc = r#"
10481085
A combinator that applies a function as long as it returns successfully, producing a single, final value.
10491086
Immediately returns the error when the function returns unsuccessfully.

0 commit comments

Comments
 (0)