Skip to content

Commit 6be8467

Browse files
starsheriffStjepan Glavina
authored and
Stjepan Glavina
committed
impl Stream::take_while adapter (#332)
* impl take_while stream adapter * fmt * add comment * unindent where clauses
1 parent 529a58a commit 6be8467

File tree

3 files changed

+81
-1
lines changed

3 files changed

+81
-1
lines changed

src/stream/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ use cfg_if::cfg_if;
2626
pub use empty::{empty, Empty};
2727
pub use once::{once, Once};
2828
pub use repeat::{repeat, Repeat};
29-
pub use stream::{Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, Zip};
29+
pub use stream::{
30+
Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, TakeWhile, Zip,
31+
};
3032

3133
pub(crate) mod stream;
3234

src/stream/stream/mod.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ mod skip;
4343
mod skip_while;
4444
mod step_by;
4545
mod take;
46+
mod take_while;
4647
mod try_for_each;
4748
mod zip;
4849

@@ -70,6 +71,7 @@ pub use skip::Skip;
7071
pub use skip_while::SkipWhile;
7172
pub use step_by::StepBy;
7273
pub use take::Take;
74+
pub use take_while::TakeWhile;
7375
pub use zip::Zip;
7476

7577
use std::cmp::Ordering;
@@ -241,6 +243,35 @@ extension_trait! {
241243
}
242244
}
243245

246+
#[doc = r#"
247+
Creates a stream that yields elements based on a predicate.
248+
249+
# Examples
250+
```
251+
# fn main() { async_std::task::block_on(async {
252+
#
253+
use std::collections::VecDeque;
254+
255+
use async_std::prelude::*;
256+
257+
let s: VecDeque<usize> = vec![1, 2, 3, 4].into_iter().collect();
258+
let mut s = s.take_while(|x| x < &3 );
259+
260+
assert_eq!(s.next().await, Some(1));
261+
assert_eq!(s.next().await, Some(2));
262+
assert_eq!(s.next().await, None);
263+
264+
#
265+
# }) }
266+
"#]
267+
fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P, Self::Item>
268+
where
269+
Self: Sized,
270+
P: FnMut(&Self::Item) -> bool,
271+
{
272+
TakeWhile::new(self, predicate)
273+
}
274+
244275
#[doc = r#"
245276
Creates a stream that yields each `step`th element.
246277

src/stream/stream/take_while.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use std::marker::PhantomData;
2+
use std::pin::Pin;
3+
4+
use crate::stream::Stream;
5+
use crate::task::{Context, Poll};
6+
7+
/// A stream that yields elements based on a predicate.
8+
#[derive(Debug)]
9+
pub struct TakeWhile<S, P, T> {
10+
stream: S,
11+
predicate: P,
12+
__t: PhantomData<T>,
13+
}
14+
15+
impl<S, P, T> TakeWhile<S, P, T> {
16+
pin_utils::unsafe_pinned!(stream: S);
17+
pin_utils::unsafe_unpinned!(predicate: P);
18+
19+
pub(super) fn new(stream: S, predicate: P) -> Self {
20+
TakeWhile {
21+
stream,
22+
predicate,
23+
__t: PhantomData,
24+
}
25+
}
26+
}
27+
28+
impl<S, P> Stream for TakeWhile<S, P, S::Item>
29+
where
30+
S: Stream,
31+
P: FnMut(&S::Item) -> bool,
32+
{
33+
type Item = S::Item;
34+
35+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
37+
38+
match next {
39+
Some(v) if (self.as_mut().predicate())(&v) => Poll::Ready(Some(v)),
40+
Some(_) => {
41+
cx.waker().wake_by_ref();
42+
Poll::Pending
43+
}
44+
None => Poll::Ready(None),
45+
}
46+
}
47+
}

0 commit comments

Comments
 (0)