Skip to content

Commit 9972449

Browse files
bors[bot]montekki
andauthored
Merge #221
221: adds stream::filter combinator r=stjepang a=montekki Ref: #129 Stdlib: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.filter Co-authored-by: Fedor Sakharov <fedor.sakharov@gmail.com>
2 parents 47ce009 + e430851 commit 9972449

File tree

2 files changed

+79
-0
lines changed

2 files changed

+79
-0
lines changed

src/stream/stream/filter.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use std::marker::PhantomData;
2+
use std::pin::Pin;
3+
4+
use crate::stream::Stream;
5+
use crate::task::{Context, Poll};
6+
7+
/// A stream to filter elements of another stream with a predicate.
8+
#[derive(Debug)]
9+
pub struct Filter<S, P, T> {
10+
stream: S,
11+
predicate: P,
12+
__t: PhantomData<T>,
13+
}
14+
15+
impl<S, P, T> Filter<S, P, T> {
16+
pin_utils::unsafe_pinned!(stream: S);
17+
pin_utils::unsafe_unpinned!(predicate: P);
18+
19+
pub(super) fn new(stream: S, predicate: P) -> Self {
20+
Filter {
21+
stream,
22+
predicate,
23+
__t: PhantomData,
24+
}
25+
}
26+
}
27+
28+
impl<S, P> futures_core::stream::Stream for Filter<S, P, S::Item>
29+
where
30+
S: Stream,
31+
P: FnMut(&S::Item) -> bool,
32+
{
33+
type Item = S::Item;
34+
35+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
37+
38+
match next {
39+
Some(v) => match (self.as_mut().predicate())(&v) {
40+
true => Poll::Ready(Some(v)),
41+
false => {
42+
cx.waker().wake_by_ref();
43+
Poll::Pending
44+
}
45+
},
46+
None => Poll::Ready(None),
47+
}
48+
}
49+
}

src/stream/stream/mod.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
mod all;
2525
mod any;
2626
mod enumerate;
27+
mod filter;
2728
mod filter_map;
2829
mod find;
2930
mod find_map;
@@ -37,6 +38,7 @@ mod skip;
3738
mod take;
3839
mod zip;
3940

41+
pub use filter::Filter;
4042
pub use fuse::Fuse;
4143
pub use scan::Scan;
4244
pub use skip::Skip;
@@ -284,6 +286,34 @@ pub trait Stream {
284286
done: false,
285287
}
286288
}
289+
/// Creates a stream that uses a predicate to determine if an element
290+
/// should be yeilded.
291+
///
292+
/// # Examples
293+
///
294+
/// Basic usage:
295+
///
296+
///```
297+
/// # fn main() { async_std::task::block_on(async {
298+
/// #
299+
/// use std::collections::VecDeque;
300+
/// use async_std::stream::Stream;
301+
///
302+
/// let s: VecDeque<usize> = vec![1, 2, 3, 4].into_iter().collect();
303+
/// let mut s = s.filter(|i| i % 2 == 0);
304+
///
305+
/// assert_eq!(s.next().await, Some(2));
306+
/// assert_eq!(s.next().await, Some(4));
307+
/// assert_eq!(s.next().await, None);
308+
/// #
309+
/// # }) }
310+
fn filter<P>(self, predicate: P) -> Filter<Self, P, Self::Item>
311+
where
312+
Self: Sized,
313+
P: FnMut(&Self::Item) -> bool,
314+
{
315+
Filter::new(self, predicate)
316+
}
287317

288318
/// Both filters and maps a stream.
289319
///

0 commit comments

Comments
 (0)