Skip to content

Commit 8b9dc55

Browse files
author
Tyler Neely
committed
Add initial Fuse implementation for Stream
1 parent 9b3e8b8 commit 8b9dc55

File tree

2 files changed

+84
-1
lines changed

2 files changed

+84
-1
lines changed

src/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
pub use empty::{empty, Empty};
2626
pub use once::{once, Once};
2727
pub use repeat::{repeat, Repeat};
28-
pub use stream::{Stream, Take};
28+
pub use stream::{Fuse, Stream, Take};
2929

3030
mod empty;
3131
mod once;

src/stream/stream.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,38 @@ pub trait Stream {
114114
remaining: n,
115115
}
116116
}
117+
118+
/// Transforms this `Stream` into a "fused" `Stream`
119+
/// such that after the first time `poll` returns
120+
/// `Poll::Ready(None)`, all future calls to
121+
/// `poll` will also return `Poll::Ready(None)`.
122+
///
123+
/// # Examples
124+
///
125+
/// ```
126+
/// # #![feature(async_await)]
127+
/// # fn main() { async_std::task::block_on(async {
128+
/// #
129+
/// use async_std::prelude::*;
130+
/// use async_std::stream;
131+
///
132+
/// let mut s = stream::repeat(9).take(3);
133+
///
134+
/// while let Some(v) = s.next().await {
135+
/// assert_eq!(v, 9);
136+
/// }
137+
/// #
138+
/// # }) }
139+
/// ```
140+
fn fuse(self) -> Fuse<Self>
141+
where
142+
Self: Sized,
143+
{
144+
Fuse {
145+
stream: self,
146+
done: false,
147+
}
148+
}
117149
}
118150

119151
impl<T: futures::Stream + Unpin + ?Sized> Stream for T {
@@ -171,3 +203,54 @@ impl<S: futures::Stream> futures::Stream for Take<S> {
171203
}
172204
}
173205
}
206+
207+
/// A `Stream` that is permanently closed
208+
/// once a single call to `poll` results in
209+
/// `Poll::Ready(None)`, returning `Poll::Ready(None)`
210+
/// for all future calls to `poll`.
211+
#[derive(Clone, Debug)]
212+
pub struct Fuse<S> {
213+
stream: S,
214+
done: bool,
215+
}
216+
217+
impl<S: Unpin> Unpin for Fuse<S> {}
218+
219+
impl<S: futures::Stream> Fuse<S> {
220+
pin_utils::unsafe_pinned!(stream: S);
221+
pin_utils::unsafe_unpinned!(done: bool);
222+
223+
/// Returns `true` if the underlying stream is fused.
224+
///
225+
/// If this `Stream` is fused, all future calls to
226+
/// `poll` will return `Poll::Ready(None)`.
227+
pub fn is_done(&self) -> bool {
228+
self.done
229+
}
230+
231+
/// Consumes this `Fuse` and returns the inner
232+
/// `Stream`, unfusing it if it had become
233+
/// fused.
234+
pub fn into_inner(self) -> S
235+
where
236+
S: Sized,
237+
{
238+
self.stream
239+
}
240+
}
241+
242+
impl<S: futures::Stream> futures::Stream for Fuse<S> {
243+
type Item = S::Item;
244+
245+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
246+
if self.done {
247+
Poll::Ready(None)
248+
} else {
249+
let next = futures::ready!(self.as_mut().stream().poll_next(cx));
250+
if next.is_none() {
251+
*self.as_mut().done() = true;
252+
}
253+
Poll::Ready(next)
254+
}
255+
}
256+
}

0 commit comments

Comments
 (0)