Skip to content

Commit ea080e7

Browse files
committed
Merge branch 'master' into fs-stream-skip-while
2 parents 93463e8 + 9972449 commit ea080e7

File tree

4 files changed

+150
-8
lines changed

4 files changed

+150
-8
lines changed

src/fs/metadata.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ cfg_if! {
7070
/// ```no_run
7171
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
7272
/// #
73-
/// use std::fs;
73+
/// use async_std::fs;
7474
///
7575
/// let metadata = fs::metadata("a.txt").await?;
7676
/// println!("{:?}", metadata.file_type());
@@ -90,7 +90,7 @@ cfg_if! {
9090
/// ```no_run
9191
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
9292
/// #
93-
/// use std::fs;
93+
/// use async_std::fs;
9494
///
9595
/// let metadata = fs::metadata(".").await?;
9696
/// println!("{:?}", metadata.is_dir());
@@ -110,7 +110,7 @@ cfg_if! {
110110
/// ```no_run
111111
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
112112
/// #
113-
/// use std::fs;
113+
/// use async_std::fs;
114114
///
115115
/// let metadata = fs::metadata("a.txt").await?;
116116
/// println!("{:?}", metadata.is_file());
@@ -128,7 +128,7 @@ cfg_if! {
128128
/// ```no_run
129129
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
130130
/// #
131-
/// use std::fs;
131+
/// use async_std::fs;
132132
///
133133
/// let metadata = fs::metadata("a.txt").await?;
134134
/// println!("{}", metadata.len());
@@ -146,7 +146,7 @@ cfg_if! {
146146
/// ```no_run
147147
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
148148
/// #
149-
/// use std::fs;
149+
/// use async_std::fs;
150150
///
151151
/// let metadata = fs::metadata("a.txt").await?;
152152
/// println!("{:?}", metadata.permissions());
@@ -169,7 +169,7 @@ cfg_if! {
169169
/// ```no_run
170170
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
171171
/// #
172-
/// use std::fs;
172+
/// use async_std::fs;
173173
///
174174
/// let metadata = fs::metadata("a.txt").await?;
175175
/// println!("{:?}", metadata.modified());
@@ -192,7 +192,7 @@ cfg_if! {
192192
/// ```no_run
193193
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
194194
/// #
195-
/// use std::fs;
195+
/// use async_std::fs;
196196
///
197197
/// let metadata = fs::metadata("a.txt").await?;
198198
/// println!("{:?}", metadata.accessed());
@@ -215,7 +215,7 @@ cfg_if! {
215215
/// ```no_run
216216
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
217217
/// #
218-
/// use std::fs;
218+
/// use async_std::fs;
219219
///
220220
/// let metadata = fs::metadata("a.txt").await?;
221221
/// println!("{:?}", metadata.created());

src/stream/stream/filter.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use std::marker::PhantomData;
2+
use std::pin::Pin;
3+
4+
use crate::stream::Stream;
5+
use crate::task::{Context, Poll};
6+
7+
/// A stream to filter elements of another stream with a predicate.
8+
#[derive(Debug)]
9+
pub struct Filter<S, P, T> {
10+
stream: S,
11+
predicate: P,
12+
__t: PhantomData<T>,
13+
}
14+
15+
impl<S, P, T> Filter<S, P, T> {
16+
pin_utils::unsafe_pinned!(stream: S);
17+
pin_utils::unsafe_unpinned!(predicate: P);
18+
19+
pub(super) fn new(stream: S, predicate: P) -> Self {
20+
Filter {
21+
stream,
22+
predicate,
23+
__t: PhantomData,
24+
}
25+
}
26+
}
27+
28+
impl<S, P> futures_core::stream::Stream for Filter<S, P, S::Item>
29+
where
30+
S: Stream,
31+
P: FnMut(&S::Item) -> bool,
32+
{
33+
type Item = S::Item;
34+
35+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
37+
38+
match next {
39+
Some(v) => match (self.as_mut().predicate())(&v) {
40+
true => Poll::Ready(Some(v)),
41+
false => {
42+
cx.waker().wake_by_ref();
43+
Poll::Pending
44+
}
45+
},
46+
None => Poll::Ready(None),
47+
}
48+
}
49+
}

src/stream/stream/mod.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
mod all;
2525
mod any;
2626
mod enumerate;
27+
mod filter;
2728
mod filter_map;
2829
mod find;
2930
mod find_map;
@@ -33,12 +34,15 @@ mod min_by;
3334
mod next;
3435
mod nth;
3536
mod scan;
37+
mod skip;
3638
mod skip_while;
3739
mod take;
3840
mod zip;
3941

42+
pub use filter::Filter;
4043
pub use fuse::Fuse;
4144
pub use scan::Scan;
45+
pub use skip::Skip;
4246
pub use skip_while::SkipWhile;
4347
pub use take::Take;
4448
pub use zip::Zip;
@@ -284,6 +288,34 @@ pub trait Stream {
284288
done: false,
285289
}
286290
}
291+
/// Creates a stream that uses a predicate to determine if an element
292+
/// should be yeilded.
293+
///
294+
/// # Examples
295+
///
296+
/// Basic usage:
297+
///
298+
///```
299+
/// # fn main() { async_std::task::block_on(async {
300+
/// #
301+
/// use std::collections::VecDeque;
302+
/// use async_std::stream::Stream;
303+
///
304+
/// let s: VecDeque<usize> = vec![1, 2, 3, 4].into_iter().collect();
305+
/// let mut s = s.filter(|i| i % 2 == 0);
306+
///
307+
/// assert_eq!(s.next().await, Some(2));
308+
/// assert_eq!(s.next().await, Some(4));
309+
/// assert_eq!(s.next().await, None);
310+
/// #
311+
/// # }) }
312+
fn filter<P>(self, predicate: P) -> Filter<Self, P, Self::Item>
313+
where
314+
Self: Sized,
315+
P: FnMut(&Self::Item) -> bool,
316+
{
317+
Filter::new(self, predicate)
318+
}
287319

288320
/// Both filters and maps a stream.
289321
///
@@ -672,6 +704,7 @@ pub trait Stream {
672704
/// elements in the strem are yeilded.
673705
///
674706
/// ## Examples
707+
///
675708
/// ```
676709
/// # fn main() { async_std::task::block_on(async {
677710
/// #
@@ -695,6 +728,25 @@ pub trait Stream {
695728
SkipWhile::new(self, predicate)
696729
}
697730

731+
/// Creates a combinator that skips the first `n` elements.
732+
///
733+
/// ## Examples
734+
///
735+
/// let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
736+
/// let mut skipped = s.skip(2);
737+
///
738+
/// assert_eq!(skipped.next().await, Some(3));
739+
/// assert_eq!(skipped.next().await, None);
740+
/// #
741+
/// # }) }
742+
/// ```
743+
fn skip(self, n: usize) -> Skip<Self>
744+
where
745+
Self: Sized,
746+
{
747+
Skip::new(self, n)
748+
}
749+
698750
/// 'Zips up' two streams into a single stream of pairs.
699751
///
700752
/// `zip()` returns a new stream that will iterate over two other streams, returning a tuple

src/stream/stream/skip.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use std::pin::Pin;
2+
use std::task::{Context, Poll};
3+
4+
use crate::stream::Stream;
5+
6+
/// A stream to skip first n elements of another stream.
7+
#[derive(Debug)]
8+
pub struct Skip<S> {
9+
stream: S,
10+
n: usize,
11+
}
12+
13+
impl<S> Skip<S> {
14+
pin_utils::unsafe_pinned!(stream: S);
15+
pin_utils::unsafe_unpinned!(n: usize);
16+
17+
pub(crate) fn new(stream: S, n: usize) -> Self {
18+
Skip { stream, n }
19+
}
20+
}
21+
22+
impl<S> Stream for Skip<S>
23+
where
24+
S: Stream,
25+
{
26+
type Item = S::Item;
27+
28+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
29+
loop {
30+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
31+
32+
match next {
33+
Some(v) => match self.n {
34+
0 => return Poll::Ready(Some(v)),
35+
_ => *self.as_mut().n() -= 1,
36+
},
37+
None => return Poll::Ready(None),
38+
}
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)