From 504ef07ce95514740e2d8b5bf0cd2588df276720 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Wed, 16 Oct 2019 07:24:39 +0200 Subject: [PATCH 1/8] add stream::LastFuture (not compiling) Struggling with the associated type, pinning and how to move/copy LastFuture.last. --- src/stream/stream/last.rs | 52 +++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 34 +++++++++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 src/stream/stream/last.rs diff --git a/src/stream/stream/last.rs b/src/stream/stream/last.rs new file mode 100644 index 000000000..1e7917125 --- /dev/null +++ b/src/stream/stream/last.rs @@ -0,0 +1,52 @@ +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct LastFuture<'a, S> +where + S: Stream +{ + stream: &'a mut S, + last: Option, +} + +impl Unpin for LastFuture<'_, S> +where + S: Stream +{} + +impl<'a, S> LastFuture<'a, S> +where + S: Stream +{ + //pin_utils::unsafe_pinned!(stream: S); + + pub(crate) fn new(stream: &'a mut S) -> Self + { + LastFuture { stream, last: None } + } +} + +impl<'a, S> Future for LastFuture<'a, S> +where + S: Stream + Unpin + Sized, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); + + match next { + Some(v) => { + self.last = Some(v); + cx.waker().wake_by_ref(); + return Poll::Pending; + } + None => return Poll::Ready(self.last), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 0e563f6d3..a3bbdc1fc 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -33,6 +33,7 @@ mod fold; mod for_each; mod fuse; mod inspect; +mod last; mod map; mod min_by; mod next; @@ -55,6 +56,7 @@ use find::FindFuture; use find_map::FindMapFuture; use fold::FoldFuture; use for_each::ForEachFuture; +use last::LastFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; @@ -442,6 +444,38 @@ extension_trait! { Inspect::new(self, f) } + #[doc = r#" + Returns the last element of the stream. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); + + let second = s.nth(1).await; + assert_eq!(second, Some(2)); + # + # }) } + ``` + + "#] + fn last( + &mut self, + ) -> impl Future> + '_ [LastFuture<'_, Self>] + where + Self: Sized, + { + LastFuture::new(self) + } + #[doc = r#" Transforms this `Stream` into a "fused" `Stream` such that after the first time `poll` returns `Poll::Ready(None)`, all future calls to `poll` will also return From d120c580cb85f854d8619a8e661cf2e14b56c5eb Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Wed, 16 Oct 2019 08:05:34 +0200 Subject: [PATCH 2/8] fix type signature -> still cannot assign still problems assigning the new value to self.last --- src/stream/stream/last.rs | 36 ++++++++++++++++-------------------- src/stream/stream/mod.rs | 4 ++-- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/src/stream/stream/last.rs b/src/stream/stream/last.rs index 1e7917125..4eb6821c0 100644 --- a/src/stream/stream/last.rs +++ b/src/stream/stream/last.rs @@ -6,47 +6,43 @@ use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct LastFuture<'a, S> +pub struct LastFuture where - S: Stream + S: Stream, { - stream: &'a mut S, - last: Option, + stream: S, + last: Option, } -impl Unpin for LastFuture<'_, S> +impl LastFuture where - S: Stream -{} - -impl<'a, S> LastFuture<'a, S> -where - S: Stream + S: Stream, { - //pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_pinned!(last: Option); - pub(crate) fn new(stream: &'a mut S) -> Self - { + pub(crate) fn new(stream: S) -> Self { LastFuture { stream, last: None } } } -impl<'a, S> Future for LastFuture<'a, S> +impl Future for LastFuture where S: Stream + Unpin + Sized, + S::Item: Copy, { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); match next { - Some(v) => { - self.last = Some(v); + Some(new) => { cx.waker().wake_by_ref(); - return Poll::Pending; + *self.as_mut().last() = Some(new); + Poll::Pending } - None => return Poll::Ready(self.last), + None => Poll::Ready(self.last), } } } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 2248cba75..2c4884b69 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -482,8 +482,8 @@ extension_trait! { "#] fn last( - &mut self, - ) -> impl Future> + '_ [LastFuture<'_, Self>] + self, + ) -> impl Future> + '_ [LastFuture] where Self: Sized, { From 54aaf296aedd32eae2ed41ecba52c44d2c0db49a Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Wed, 16 Oct 2019 08:08:50 +0200 Subject: [PATCH 3/8] remove unused bound --- src/stream/stream/last.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/stream/stream/last.rs b/src/stream/stream/last.rs index 4eb6821c0..5c46ecbd9 100644 --- a/src/stream/stream/last.rs +++ b/src/stream/stream/last.rs @@ -6,18 +6,12 @@ use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct LastFuture -where - S: Stream, -{ +pub struct LastFuture { stream: S, last: Option, } -impl LastFuture -where - S: Stream, -{ +impl LastFuture { pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_pinned!(last: Option); From 50558215fa5f4648921d6cc32e2c79f491357ec1 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Wed, 16 Oct 2019 08:22:11 +0200 Subject: [PATCH 4/8] add doctest --- src/stream/stream/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 2c4884b69..13d55e5d8 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -474,8 +474,8 @@ extension_trait! { let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); - let second = s.nth(1).await; - assert_eq!(second, Some(2)); + let last = s.last().await; + assert_eq!(last, Some(3)); # # }) } ``` From cd5e439ef3bb7645d1ad3bc002ff0037e5796665 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Wed, 16 Oct 2019 12:13:07 +0200 Subject: [PATCH 5/8] unpin LastFuture.last --- src/stream/stream/last.rs | 2 +- src/stream/stream/mod.rs | 18 +++++++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/stream/stream/last.rs b/src/stream/stream/last.rs index 5c46ecbd9..c58dd66a9 100644 --- a/src/stream/stream/last.rs +++ b/src/stream/stream/last.rs @@ -13,7 +13,7 @@ pub struct LastFuture { impl LastFuture { pin_utils::unsafe_pinned!(stream: S); - pin_utils::unsafe_pinned!(last: Option); + pin_utils::unsafe_unpinned!(last: Option); pub(crate) fn new(stream: S) -> Self { LastFuture { stream, last: None } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 13d55e5d8..eb96cfeae 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -472,13 +472,29 @@ extension_trait! { use async_std::prelude::*; - let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); let last = s.last().await; assert_eq!(last, Some(3)); # # }) } ``` + + An empty stream will return `None`: + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![].into_iter().collect(); + + let last = s.last().await; + assert_eq!(last, None); + # + # }) } + ``` "#] fn last( From 5445b2b530798f84474ecacd5934b044f65efc9a Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Wed, 16 Oct 2019 12:37:36 +0200 Subject: [PATCH 6/8] RustFmt --- src/stream/stream/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index eb96cfeae..8d875a7ec 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -63,9 +63,9 @@ use find::FindFuture; use find_map::FindMapFuture; use fold::FoldFuture; use for_each::ForEachFuture; -use last::LastFuture; use ge::GeFuture; use gt::GtFuture; +use last::LastFuture; use le::LeFuture; use lt::LtFuture; use min_by::MinByFuture; @@ -480,7 +480,7 @@ extension_trait! { # }) } ``` - An empty stream will return `None`: + An empty stream will return `None: ``` # fn main() { async_std::task::block_on(async { # From 6b9043a97d494880093ee587584084a2d743d162 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Wed, 16 Oct 2019 12:49:08 +0200 Subject: [PATCH 7/8] add static lifetime --- src/stream/stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8d875a7ec..a55b4ba38 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -499,7 +499,7 @@ extension_trait! { "#] fn last( self, - ) -> impl Future> + '_ [LastFuture] + ) -> impl Future> + 'static [LastFuture] where Self: Sized, { From 4b6065952a6ab5811f7e83011390e6397800f382 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Wed, 16 Oct 2019 13:04:58 +0200 Subject: [PATCH 8/8] remove redundant lifetime --- src/stream/stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index a55b4ba38..764dc9782 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -499,7 +499,7 @@ extension_trait! { "#] fn last( self, - ) -> impl Future> + 'static [LastFuture] + ) -> impl Future> [LastFuture] where Self: Sized, {