From 570329b17648727a2800b3ecb1bf21dfd13bb320 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Sat, 21 Sep 2019 14:40:25 +0300 Subject: [PATCH 1/3] adds stream::skip combinator --- src/stream/stream/mod.rs | 27 ++++++++++++++++++++++++++ src/stream/stream/skip.rs | 41 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 src/stream/stream/skip.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8ddab2126..5f4010ca6 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -33,6 +33,7 @@ mod min_by; mod next; mod nth; mod scan; +mod skip; mod take; mod zip; @@ -51,6 +52,7 @@ use fold::FoldFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; +use skip::Skip; use std::cmp::Ordering; use std::marker::PhantomData; @@ -661,6 +663,31 @@ pub trait Stream { Scan::new(self, initial_state, f) } + /// Creates a combinator that skips the first `n` elements. + /// + /// ## Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// let mut skipped = s.skip(2); + /// + /// assert_eq!(skipped.next().await, Some(3)); + /// assert_eq!(skipped.next().await, None); + /// # + /// # }) } + /// ``` + fn skip(self, n: usize) -> Skip + where + Self: Sized, + { + Skip::new(self, n) + } + /// 'Zips up' two streams into a single stream of pairs. /// /// `zip()` returns a new stream that will iterate over two other streams, returning a tuple diff --git a/src/stream/stream/skip.rs b/src/stream/stream/skip.rs new file mode 100644 index 000000000..09f5cab8f --- /dev/null +++ b/src/stream/stream/skip.rs @@ -0,0 +1,41 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::stream::Stream; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct Skip { + stream: S, + n: usize, +} + +impl Skip { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(n: usize); + + pub(crate) fn new(stream: S, n: usize) -> Self { + Skip { stream, n } + } +} + +impl Stream for Skip +where + S: Stream, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(v) => match self.n { + 0 => return Poll::Ready(Some(v)), + _ => *self.as_mut().n() -= 1, + }, + None => return Poll::Ready(None), + } + } + } +} From 75da138696b495b65775aab356ad8146f1c2b8e8 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Sat, 21 Sep 2019 16:37:30 +0300 Subject: [PATCH 2/3] export Skip type --- src/stream/stream/mod.rs | 2 +- src/stream/stream/skip.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 5f4010ca6..6444651ab 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -39,6 +39,7 @@ mod zip; pub use fuse::Fuse; pub use scan::Scan; +use skip::Skip; pub use take::Take; pub use zip::Zip; @@ -52,7 +53,6 @@ use fold::FoldFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; -use skip::Skip; use std::cmp::Ordering; use std::marker::PhantomData; diff --git a/src/stream/stream/skip.rs b/src/stream/stream/skip.rs index 09f5cab8f..8a2d966d8 100644 --- a/src/stream/stream/skip.rs +++ b/src/stream/stream/skip.rs @@ -3,8 +3,8 @@ use std::task::{Context, Poll}; use crate::stream::Stream; -#[doc(hidden)] -#[allow(missing_debug_implementations)] +/// A stream to skip first n elements of another stream. +#[derive(Debug)] pub struct Skip { stream: S, n: usize, From fdd81e1b2a04d174fb8f4a0f1f7e170dcffa501d Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Sat, 21 Sep 2019 16:40:58 +0300 Subject: [PATCH 3/3] Actually export Skip --- src/stream/stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 6444651ab..6284062d2 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -39,7 +39,7 @@ mod zip; pub use fuse::Fuse; pub use scan::Scan; -use skip::Skip; +pub use skip::Skip; pub use take::Take; pub use zip::Zip;