diff --git a/src/stream/stream/last.rs b/src/stream/stream/last.rs new file mode 100644 index 000000000..c58dd66a9 --- /dev/null +++ b/src/stream/stream/last.rs @@ -0,0 +1,42 @@ +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct LastFuture { + stream: S, + last: Option, +} + +impl LastFuture { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(last: Option); + + pub(crate) fn new(stream: S) -> Self { + LastFuture { stream, last: None } + } +} + +impl Future for LastFuture +where + S: Stream + Unpin + Sized, + S::Item: Copy, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(new) => { + cx.waker().wake_by_ref(); + *self.as_mut().last() = Some(new); + Poll::Pending + } + None => Poll::Ready(self.last), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index d582d700e..764dc9782 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -36,6 +36,7 @@ mod fuse; mod ge; mod gt; mod inspect; +mod last; mod le; mod lt; mod map; @@ -64,6 +65,7 @@ use fold::FoldFuture; use for_each::ForEachFuture; use ge::GeFuture; use gt::GtFuture; +use last::LastFuture; use le::LeFuture; use lt::LtFuture; use min_by::MinByFuture; @@ -456,6 +458,54 @@ extension_trait! { Inspect::new(self, f) } + #[doc = r#" + Returns the last element of the stream. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + + let last = s.last().await; + assert_eq!(last, Some(3)); + # + # }) } + ``` + + An empty stream will return `None: + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![].into_iter().collect(); + + let last = s.last().await; + assert_eq!(last, None); + # + # }) } + ``` + + "#] + fn last( + self, + ) -> impl Future> [LastFuture] + where + Self: Sized, + { + LastFuture::new(self) + } + #[doc = r#" Transforms this `Stream` into a "fused" `Stream` such that after the first time `poll` returns `Poll::Ready(None)`, all future calls to `poll` will also return