diff --git a/src/stream/stream/count.rs b/src/stream/stream/count.rs new file mode 100644 index 000000000..998375ebe --- /dev/null +++ b/src/stream/stream/count.rs @@ -0,0 +1,47 @@ +use std::pin::Pin; +use std::future::Future; + +use pin_project_lite::pin_project; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct CountFuture { + #[pin] + stream: S, + count: usize, + } +} + +impl CountFuture { + pub(super) fn new(stream: S) -> Self { + Self { + stream, + count: 0, + } + } +} + +impl Future for CountFuture +where + S: Stream, +{ + type Output = usize; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); + + match next { + Some(_) => { + cx.waker().wake_by_ref(); + *this.count += 1; + Poll::Pending + } + None => Poll::Ready(*this.count), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 5756a21e6..9d6b9f9b2 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -27,6 +27,7 @@ mod chain; mod cloned; mod cmp; mod copied; +mod count; mod cycle; mod enumerate; mod eq; @@ -67,6 +68,7 @@ mod zip; use all::AllFuture; use any::AnyFuture; use cmp::CmpFuture; +use count::CountFuture; use cycle::Cycle; use enumerate::Enumerate; use eq::EqFuture; @@ -912,6 +914,35 @@ extension_trait! { MinByFuture::new(self, compare) } + #[doc = r#" + Counting the number of elements and returning it. + + # Examples + + ```ignore + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let s = stream::from_iter(vec![1, 2, 3]); + + let count = s.count().await; + assert_eq!(count, 3); + + # + # }) } + ``` + "#] + fn count( + self, + ) -> impl Future [CountFuture] + where + Self: Sized, + { + CountFuture::new(self) + } + #[doc = r#" Returns the element that gives the minimum value. If several elements are equally minimum, the first element is returned. If the stream is empty, `None` is returned.