Skip to content

Commit 3b80165

Browse files
author
Shady Khalifa
committed
add stream::all method
1 parent 374f0c9 commit 3b80165

File tree

1 file changed

+108
-0
lines changed

1 file changed

+108
-0
lines changed

src/stream/stream.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use cfg_if::cfg_if;
2727

2828
use crate::future::Future;
2929
use crate::task::{Context, Poll};
30+
use std::marker::PhantomData;
3031

3132
cfg_if! {
3233
if #[cfg(feature = "docs")] {
@@ -111,6 +112,71 @@ pub trait Stream {
111112
remaining: n,
112113
}
113114
}
115+
116+
/// Tests if every element of the stream matches a predicate.
117+
///
118+
/// `all()` takes a closure that returns `true` or `false`. It applies
119+
/// this closure to each element of the stream, and if they all return
120+
/// `true`, then so does `all()`. If any of them return `false`, it
121+
/// returns `false`.
122+
///
123+
/// `all()` is short-circuiting; in other words, it will stop processing
124+
/// as soon as it finds a `false`, given that no matter what else happens,
125+
/// the result will also be `false`.
126+
///
127+
/// An empty stream returns `true`.
128+
///
129+
/// # Examples
130+
///
131+
/// Basic usage:
132+
///
133+
/// ```
134+
/// # fn main() { async_std::task::block_on(async {
135+
/// #
136+
/// use async_std::prelude::*;
137+
/// use async_std::stream;
138+
///
139+
/// let mut s = stream::repeat(9).take(3);
140+
///
141+
/// while let Some(v) = s.next().await {
142+
/// assert_eq!(v, 9);
143+
/// }
144+
/// #
145+
/// # }) }
146+
/// ```
147+
/// ```
148+
/// let a = [1, 2, 3];
149+
///
150+
/// assert!(a.iter().all(|&x| x > 0));
151+
///
152+
/// assert!(!a.iter().all(|&x| x > 2));
153+
/// ```
154+
///
155+
/// Stopping at the first `false`:
156+
///
157+
/// ```
158+
/// let a = [1, 2, 3];
159+
///
160+
/// let mut iter = a.iter();
161+
///
162+
/// assert!(!iter.all(|&x| x != 2));
163+
///
164+
/// // we can still use `iter`, as there are more elements.
165+
/// assert_eq!(iter.next(), Some(&3));
166+
/// ```
167+
#[inline]
168+
fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F, Self::Item>
169+
where
170+
Self: Sized,
171+
F: FnMut(Self::Item) -> bool,
172+
{
173+
AllFuture {
174+
stream: self,
175+
result: true,
176+
__item: PhantomData,
177+
f,
178+
}
179+
}
114180
}
115181

116182
impl<T: futures::Stream + Unpin + ?Sized> Stream for T {
@@ -168,3 +234,45 @@ impl<S: futures::Stream> futures::Stream for Take<S> {
168234
}
169235
}
170236
}
237+
238+
pub struct AllFuture<'a, S, F, T>
239+
where
240+
S: ?Sized,
241+
F: FnMut(T) -> bool,
242+
{
243+
stream: &'a mut S,
244+
f: F,
245+
result: bool,
246+
__item: PhantomData<T>,
247+
}
248+
249+
impl<'a, S, F, T> AllFuture<'a, S, F, T>
250+
where
251+
S: ?Sized,
252+
F: FnMut(T) -> bool,
253+
{
254+
pin_utils::unsafe_pinned!(stream: &'a mut S);
255+
pin_utils::unsafe_unpinned!(result: bool);
256+
pin_utils::unsafe_unpinned!(f: F);
257+
}
258+
259+
impl<S, F> Future for AllFuture<'_, S, F, S::Item>
260+
where
261+
S: futures::Stream + Unpin + ?Sized,
262+
F: FnMut(S::Item) -> bool,
263+
{
264+
type Output = bool;
265+
266+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
267+
use futures::Stream;
268+
let next = futures::ready!(self.as_mut().stream().poll_next(cx));
269+
match next {
270+
Some(v) => {
271+
// me: *screams*
272+
*self.as_mut().result() = (self.as_mut().f())(v);
273+
Poll::Pending
274+
}
275+
None => Poll::Ready(self.result),
276+
}
277+
}
278+
}

0 commit comments

Comments
 (0)