From 80989d648a71c354e28fe46ab8b6d47c36f278b1 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Tue, 15 Oct 2019 07:07:34 +0200 Subject: [PATCH 1/4] impl take_while stream adapter --- src/stream/mod.rs | 2 +- src/stream/stream/mod.rs | 31 ++++++++++++++++++++++ src/stream/stream/take_while.rs | 47 +++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 src/stream/stream/take_while.rs diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 9f5097d0b..fc2d0ed6b 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -26,7 +26,7 @@ use cfg_if::cfg_if; pub use empty::{empty, Empty}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; -pub use stream::{Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, Zip}; +pub use stream::{Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, TakeWhile, Zip}; pub(crate) mod stream; diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 27539af30..959622655 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -43,6 +43,7 @@ mod skip; mod skip_while; mod step_by; mod take; +mod take_while; mod try_for_each; mod zip; @@ -70,6 +71,7 @@ pub use skip::Skip; pub use skip_while::SkipWhile; pub use step_by::StepBy; pub use take::Take; +pub use take_while::TakeWhile; pub use zip::Zip; use std::cmp::Ordering; @@ -241,6 +243,35 @@ extension_trait! { } } + #[doc = r#" + Creates a stream that yields elements based on a predicate. + + # Examples + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![1, 2, 3, 4].into_iter().collect(); + let mut s = s.take_while(|x| x < &3 ); + + assert_eq!(s.next().await, Some(1)); + assert_eq!(s.next().await, Some(2)); + assert_eq!(s.next().await, None); + + # + # }) } + "#] + fn take_while

(self, predicate: P) -> TakeWhile + where + Self: Sized, + P: FnMut(&Self::Item) -> bool, + { + TakeWhile::new(self, predicate) + } + #[doc = r#" Creates a stream that yields each `step`th element. diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs new file mode 100644 index 000000000..25ecc610f --- /dev/null +++ b/src/stream/stream/take_while.rs @@ -0,0 +1,47 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[derive(Debug)] +pub struct TakeWhile { + stream: S, + predicate: P, + __t: PhantomData, +} + +impl TakeWhile { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(predicate: P); + + pub(super) fn new(stream: S, predicate: P) -> Self { + TakeWhile { + stream, + predicate, + __t: PhantomData, + } + } +} + + +impl Stream for TakeWhile +where + S: Stream, + P: FnMut(&S::Item) -> bool, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(v) if (self.as_mut().predicate())(&v) => Poll::Ready(Some(v)), + Some(_) => { + cx.waker().wake_by_ref(); + Poll::Pending + } + None => Poll::Ready(None), + } + } +} From 8e556c6ef3ef66f97f2f5943a60669e17ab44b92 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Tue, 15 Oct 2019 07:37:00 +0200 Subject: [PATCH 2/4] fmt --- src/stream/mod.rs | 4 +++- src/stream/stream/take_while.rs | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index fc2d0ed6b..6372d8186 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -26,7 +26,9 @@ use cfg_if::cfg_if; pub use empty::{empty, Empty}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; -pub use stream::{Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, TakeWhile, Zip}; +pub use stream::{ + Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, TakeWhile, Zip, +}; pub(crate) mod stream; diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs index 25ecc610f..a078e5dd0 100644 --- a/src/stream/stream/take_while.rs +++ b/src/stream/stream/take_while.rs @@ -24,7 +24,6 @@ impl TakeWhile { } } - impl Stream for TakeWhile where S: Stream, From ef2e1eb3ff581d7bf06406461c93e026ac401729 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Tue, 15 Oct 2019 07:49:22 +0200 Subject: [PATCH 3/4] add comment --- src/stream/stream/take_while.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs index a078e5dd0..6f3cc8f2d 100644 --- a/src/stream/stream/take_while.rs +++ b/src/stream/stream/take_while.rs @@ -4,6 +4,7 @@ use std::pin::Pin; use crate::stream::Stream; use crate::task::{Context, Poll}; +/// A stream that yields elements based on a predicate. #[derive(Debug)] pub struct TakeWhile { stream: S, From c4752d6e6cc2715b1ad726d700518bd67594a0c8 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Tue, 15 Oct 2019 09:37:40 +0200 Subject: [PATCH 4/4] unindent where clauses --- src/stream/stream/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 959622655..0e563f6d3 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -265,9 +265,9 @@ extension_trait! { # }) } "#] fn take_while

(self, predicate: P) -> TakeWhile - where - Self: Sized, - P: FnMut(&Self::Item) -> bool, + where + Self: Sized, + P: FnMut(&Self::Item) -> bool, { TakeWhile::new(self, predicate) }