Skip to content

Commit 2acc070

Browse files
bors[bot]montekki
andauthored
Merge #223
223: adds stream::skip_while combinator r=stjepang a=montekki Ref: #129 Stdlib: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.skip_while Co-authored-by: Fedor Sakharov <fedor.sakharov@gmail.com>
2 parents 9972449 + ea080e7 commit 2acc070

File tree

2 files changed

+84
-1
lines changed

2 files changed

+84
-1
lines changed

src/stream/stream/mod.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,15 @@ mod next;
3535
mod nth;
3636
mod scan;
3737
mod skip;
38+
mod skip_while;
3839
mod take;
3940
mod zip;
4041

4142
pub use filter::Filter;
4243
pub use fuse::Fuse;
4344
pub use scan::Scan;
4445
pub use skip::Skip;
46+
pub use skip_while::SkipWhile;
4547
pub use take::Take;
4648
pub use zip::Zip;
4749

@@ -693,7 +695,13 @@ pub trait Stream {
693695
Scan::new(self, initial_state, f)
694696
}
695697

696-
/// Creates a combinator that skips the first `n` elements.
698+
/// Combinator that `skip`s elements based on a predicate.
699+
///
700+
/// Takes a closure argument. It will call this closure on every element in
701+
/// the stream and ignore elements until it returns `false`.
702+
///
703+
/// After `false` is returned, `SkipWhile`'s job is over and all further
704+
/// elements in the strem are yeilded.
697705
///
698706
/// ## Examples
699707
///
@@ -703,6 +711,27 @@ pub trait Stream {
703711
/// use std::collections::VecDeque;
704712
/// use async_std::stream::Stream;
705713
///
714+
/// let a: VecDeque<_> = vec![-1i32, 0, 1].into_iter().collect();
715+
/// let mut s = a.skip_while(|x| x.is_negative());
716+
///
717+
/// assert_eq!(s.next().await, Some(0));
718+
/// assert_eq!(s.next().await, Some(1));
719+
/// assert_eq!(s.next().await, None);
720+
/// #
721+
/// # }) }
722+
/// ```
723+
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P, Self::Item>
724+
where
725+
Self: Sized,
726+
P: FnMut(&Self::Item) -> bool,
727+
{
728+
SkipWhile::new(self, predicate)
729+
}
730+
731+
/// Creates a combinator that skips the first `n` elements.
732+
///
733+
/// ## Examples
734+
///
706735
/// let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
707736
/// let mut skipped = s.skip(2);
708737
///

src/stream/stream/skip_while.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use std::marker::PhantomData;
2+
use std::pin::Pin;
3+
4+
use crate::stream::Stream;
5+
use crate::task::{Context, Poll};
6+
7+
/// A stream to skip elements of another stream based on a predicate.
8+
#[derive(Debug)]
9+
pub struct SkipWhile<S, P, T> {
10+
stream: S,
11+
predicate: Option<P>,
12+
__t: PhantomData<T>,
13+
}
14+
15+
impl<S, P, T> SkipWhile<S, P, T> {
16+
pin_utils::unsafe_pinned!(stream: S);
17+
pin_utils::unsafe_unpinned!(predicate: Option<P>);
18+
19+
pub(crate) fn new(stream: S, predicate: P) -> Self {
20+
SkipWhile {
21+
stream,
22+
predicate: Some(predicate),
23+
__t: PhantomData,
24+
}
25+
}
26+
}
27+
28+
impl<S, P> Stream for SkipWhile<S, P, S::Item>
29+
where
30+
S: Stream,
31+
P: FnMut(&S::Item) -> bool,
32+
{
33+
type Item = S::Item;
34+
35+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36+
loop {
37+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
38+
39+
match next {
40+
Some(v) => match self.as_mut().predicate() {
41+
Some(p) => match p(&v) {
42+
true => (),
43+
false => {
44+
*self.as_mut().predicate() = None;
45+
return Poll::Ready(Some(v));
46+
}
47+
},
48+
None => return Poll::Ready(Some(v)),
49+
},
50+
None => return Poll::Ready(None),
51+
}
52+
}
53+
}
54+
}

0 commit comments

Comments
 (0)