From 9f604f97135c2b859afbd1dbe92f771506242e71 Mon Sep 17 00:00:00 2001 From: arheard Date: Mon, 30 Sep 2019 22:19:44 -0400 Subject: [PATCH 01/14] Adds partial_cmp.rs file and partial_cmp signature to mod.rs --- src/stream/stream/mod.rs | 45 ++++++++++++++++ src/stream/stream/partial_cmp.rs | 92 ++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100644 src/stream/stream/partial_cmp.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 054d81b63..858faafc2 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -37,6 +37,7 @@ mod map; mod min_by; mod next; mod nth; +mod partial_cmp; mod scan; mod skip; mod skip_while; @@ -56,6 +57,7 @@ use for_each::ForEachFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; +use partial_cmp::PartialCmpFuture; use try_for_each::TryForEeachFuture; pub use chain::Chain; @@ -1187,6 +1189,49 @@ extension_trait! { { Merge::new(self, other) } + + #[doc = r#" + Lexicographically compares the elements of this `Stream` with those + of another. + + # Examples + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + use std::cmp::Ordering; + + let result_equal = vec![1.].into_iter().collect::>() + .partial_cmp(vec![1.].into_iter().collect::>()).await; + let result_less = vec![1.].into_iter().collect::>() + .partial_cmp(vec![1., 2.].into_iter().collect::>()).await; + let result_greater = vec![1., 2.].into_iter().collect::>() + .partial_cmp(vec![1.].into_iter().collect::>()).await; + let result_none = vec![std::f64::NAN].into_iter().collect::>() + .partial_cmp(vec![1.].into_iter().collect::>()).await; + + assert_eq!(result_equal, Some(Ordering::Equal)); + assert_eq!(result_less, Some(Ordering::Less)); + assert_eq!(result_greater, Some(Ordering::Greater)); + assert_eq!(result_none, None); + + # + # }) } + ``` + "#] + fn partial_cmp( + self, + other: S + ) -> impl Future> + '_ [PartialCmpFuture] + where + Self: Sized + Stream, + S: Stream, + Self::Item: PartialOrd, + { + PartialCmpFuture::new(self, other) + } } impl Stream for Box { diff --git a/src/stream/stream/partial_cmp.rs b/src/stream/stream/partial_cmp.rs new file mode 100644 index 000000000..f7de5cd12 --- /dev/null +++ b/src/stream/stream/partial_cmp.rs @@ -0,0 +1,92 @@ +use std::cmp::Ordering; +use std::pin::Pin; + +use super::fuse::Fuse; +use crate::future::Future; +use crate::prelude::*; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +// Lexicographically compares the elements of this `Stream` with those +// of another. +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct PartialCmpFuture { + l: Fuse, + r: Fuse, + l_cache: Option, + r_cache: Option, +} + +impl PartialCmpFuture { + pin_utils::unsafe_pinned!(l: Fuse); + pin_utils::unsafe_pinned!(r: Fuse); + pin_utils::unsafe_unpinned!(l_cache: Option); + pin_utils::unsafe_unpinned!(r_cache: Option); + + pub(super) fn new(l: L, r: R) -> Self { + PartialCmpFuture { + l: l.fuse(), + r: r.fuse(), + l_cache: None, + r_cache: None, + } + } +} + +impl Future for PartialCmpFuture +where + L: Stream + Sized, + R: Stream + Sized, + L::Item: PartialOrd, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + // Short circuit logic + // Stream that completes earliest can be considered Less, etc + let l_complete = self.l.done && self.as_mut().l_cache.is_none(); + let r_complete = self.r.done && self.as_mut().r_cache.is_none(); + + if l_complete && r_complete { + return Poll::Ready(Some(Ordering::Equal)); + } else if l_complete { + return Poll::Ready(Some(Ordering::Less)); + } else if r_complete { + return Poll::Ready(Some(Ordering::Greater)); + } + + // Get next value if possible and necesary + if !self.l.done && self.as_mut().l_cache.is_none() { + let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx)); + if let Some(item) = l_next { + *self.as_mut().l_cache() = Some(item); + } + } + + if !self.r.done && self.as_mut().r_cache.is_none() { + let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx)); + if let Some(item) = r_next { + *self.as_mut().r_cache() = Some(item); + } + } + + // Compare if both values are available. + if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() { + let l_value = self.as_mut().l_cache().take().unwrap(); + let r_value = self.as_mut().r_cache().take().unwrap(); + let result = l_value.partial_cmp(&r_value); + + if let Some(Ordering::Equal) = result { + // Reset cache to prepare for next comparison + *self.as_mut().l_cache() = None; + *self.as_mut().r_cache() = None; + } else { + // Return non equal value + return Poll::Ready(result); + } + } + } + } +} \ No newline at end of file From f145485dc33efcab3f926b8bea1b25c4c887de8f Mon Sep 17 00:00:00 2001 From: assemblaj Date: Wed, 2 Oct 2019 13:14:10 -0400 Subject: [PATCH 02/14] Update src/stream/stream/partial_cmp.rs Hides docs as standard in our codebase for all Future types Co-Authored-By: Fedor Sakharov --- src/stream/stream/partial_cmp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/partial_cmp.rs b/src/stream/stream/partial_cmp.rs index f7de5cd12..fac2705dc 100644 --- a/src/stream/stream/partial_cmp.rs +++ b/src/stream/stream/partial_cmp.rs @@ -89,4 +89,4 @@ where } } } -} \ No newline at end of file +} From 8e009d0445bb224ea3c52c80ff85347e8aedb2b8 Mon Sep 17 00:00:00 2001 From: arheard Date: Wed, 2 Oct 2019 17:54:21 -0400 Subject: [PATCH 03/14] adds tests that compare streams of same length --- src/stream/stream/mod.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 858faafc2..48f12b38c 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1205,16 +1205,22 @@ extension_trait! { let result_equal = vec![1.].into_iter().collect::>() .partial_cmp(vec![1.].into_iter().collect::>()).await; - let result_less = vec![1.].into_iter().collect::>() + let result_less_count = vec![1.].into_iter().collect::>() .partial_cmp(vec![1., 2.].into_iter().collect::>()).await; - let result_greater = vec![1., 2.].into_iter().collect::>() - .partial_cmp(vec![1.].into_iter().collect::>()).await; + let result_greater_count = vec![1., 2.].into_iter().collect::>() + .partial_cmp(vec![1.].into_iter().collect::>()).await; + let result_less_vals = vec![1., 2., 3.].into_iter().collect::>() + .partial_cmp(vec![1., 2., 4.].into_iter().collect::>()).await; + let result_greater_vals = vec![1., 2., 4.].into_iter().collect::>() + .partial_cmp(vec![1., 2., 3.].into_iter().collect::>()).await; let result_none = vec![std::f64::NAN].into_iter().collect::>() .partial_cmp(vec![1.].into_iter().collect::>()).await; assert_eq!(result_equal, Some(Ordering::Equal)); - assert_eq!(result_less, Some(Ordering::Less)); - assert_eq!(result_greater, Some(Ordering::Greater)); + assert_eq!(result_less_count, Some(Ordering::Less)); + assert_eq!(result_greater_count, Some(Ordering::Greater)); + assert_eq!(result_less_vals, Some(Ordering::Less)); + assert_eq!(result_greater_vals, Some(Ordering::Greater)); assert_eq!(result_none, None); # From 243c52a5c590cf91e27373ddef56b04189d1acdb Mon Sep 17 00:00:00 2001 From: arheard Date: Wed, 2 Oct 2019 17:57:01 -0400 Subject: [PATCH 04/14] Replaces waker function with loop, reomves Unpin trait bounds --- src/stream/stream/partial_cmp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/partial_cmp.rs b/src/stream/stream/partial_cmp.rs index fac2705dc..f7de5cd12 100644 --- a/src/stream/stream/partial_cmp.rs +++ b/src/stream/stream/partial_cmp.rs @@ -89,4 +89,4 @@ where } } } -} +} \ No newline at end of file From 838f752ebea610af6e8168443a2b85147b61eabe Mon Sep 17 00:00:00 2001 From: arheard Date: Wed, 9 Oct 2019 16:04:34 -0400 Subject: [PATCH 05/14] cleans up examples --- src/stream/stream/mod.rs | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 48f12b38c..cca298565 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1200,28 +1200,19 @@ extension_trait! { # use async_std::prelude::*; use std::collections::VecDeque; - + use std::cmp::Ordering; - let result_equal = vec![1.].into_iter().collect::>() - .partial_cmp(vec![1.].into_iter().collect::>()).await; - let result_less_count = vec![1.].into_iter().collect::>() - .partial_cmp(vec![1., 2.].into_iter().collect::>()).await; - let result_greater_count = vec![1., 2.].into_iter().collect::>() - .partial_cmp(vec![1.].into_iter().collect::>()).await; - let result_less_vals = vec![1., 2., 3.].into_iter().collect::>() - .partial_cmp(vec![1., 2., 4.].into_iter().collect::>()).await; - let result_greater_vals = vec![1., 2., 4.].into_iter().collect::>() - .partial_cmp(vec![1., 2., 3.].into_iter().collect::>()).await; - let result_none = vec![std::f64::NAN].into_iter().collect::>() - .partial_cmp(vec![1.].into_iter().collect::>()).await; - - assert_eq!(result_equal, Some(Ordering::Equal)); - assert_eq!(result_less_count, Some(Ordering::Less)); - assert_eq!(result_greater_count, Some(Ordering::Greater)); - assert_eq!(result_less_vals, Some(Ordering::Less)); - assert_eq!(result_greater_vals, Some(Ordering::Greater)); - assert_eq!(result_none, None); + let s1 = VecDeque::from(vec![1]); + let s2 = VecDeque::from(vec![1, 2]); + let s3 = VecDeque::from(vec![1, 2, 3]); + let s4 = VecDeque::from(vec![1, 2, 4]); + + assert_eq!(s1.clone().partial_cmp(s1.clone()).await, Some(Ordering::Equal)); + assert_eq!(s1.clone().partial_cmp(s2.clone()).await, Some(Ordering::Less)); + assert_eq!(s2.clone().partial_cmp(s1.clone()).await, Some(Ordering::Greater)); + assert_eq!(s3.clone().partial_cmp(s4.clone()).await, Some(Ordering::Less)); + assert_eq!(s4.clone().partial_cmp(s3.clone()).await, Some(Ordering::Greater)); # # }) } From bfea84aa3f5ca00e39369aca5a726a81216a8819 Mon Sep 17 00:00:00 2001 From: arheard Date: Fri, 11 Oct 2019 17:41:10 -0400 Subject: [PATCH 06/14] Adds latest version of relevant files --- src/stream/stream/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index cca298565..928753677 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1221,11 +1221,11 @@ extension_trait! { fn partial_cmp( self, other: S - ) -> impl Future> + '_ [PartialCmpFuture] + ) -> impl Future> [PartialCmpFuture] where Self: Sized + Stream, - S: Stream, - Self::Item: PartialOrd, + S: Stream, + ::Item: PartialOrd, { PartialCmpFuture::new(self, other) } From 5f6742028957caac9bee3c4fcb5cb38d429de6fc Mon Sep 17 00:00:00 2001 From: arheard Date: Fri, 11 Oct 2019 17:49:41 -0400 Subject: [PATCH 07/14] Revert "Adds latest version of relevant files" This reverts commit aaf263881238db026d2ba5145d08259ccee6aa49. --- src/stream/stream/mod.rs | 123 +------------------------------ src/stream/stream/partial_cmp.rs | 36 ++++----- 2 files changed, 21 insertions(+), 138 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 928753677..573e1cc9e 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -30,10 +30,8 @@ mod filter_map; mod find; mod find_map; mod fold; -mod for_each; mod fuse; mod inspect; -mod map; mod min_by; mod next; mod nth; @@ -43,7 +41,6 @@ mod skip; mod skip_while; mod step_by; mod take; -mod try_for_each; mod zip; use all::AllFuture; @@ -53,18 +50,15 @@ use filter_map::FilterMap; use find::FindFuture; use find_map::FindMapFuture; use fold::FoldFuture; -use for_each::ForEachFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; use partial_cmp::PartialCmpFuture; -use try_for_each::TryForEeachFuture; pub use chain::Chain; pub use filter::Filter; pub use fuse::Fuse; pub use inspect::Inspect; -pub use map::Map; pub use scan::Scan; pub use skip::Skip; pub use skip_while::SkipWhile; @@ -346,37 +340,6 @@ extension_trait! { Enumerate::new(self) } - #[doc = r#" - Takes a closure and creates a stream that calls that closure on every element of this stream. - - # Examples - - ``` - # fn main() { async_std::task::block_on(async { - # - use async_std::prelude::*; - use std::collections::VecDeque; - - let s: VecDeque<_> = vec![1, 2, 3].into_iter().collect(); - let mut s = s.map(|x| 2 * x); - - assert_eq!(s.next().await, Some(2)); - assert_eq!(s.next().await, Some(4)); - assert_eq!(s.next().await, Some(6)); - assert_eq!(s.next().await, None); - - # - # }) } - ``` - "#] - fn map(self, f: F) -> Map - where - Self: Sized, - F: FnMut(Self::Item) -> B, - { - Map::new(self, f) - } - #[doc = r#" A combinator that does something with each element in the stream, passing the value on. @@ -793,41 +756,6 @@ extension_trait! { FoldFuture::new(self, init, f) } - #[doc = r#" - Call a closure on each element of the stream. - - # Examples - - ``` - # fn main() { async_std::task::block_on(async { - # - use async_std::prelude::*; - use std::collections::VecDeque; - use std::sync::mpsc::channel; - - let (tx, rx) = channel(); - - let s: VecDeque = vec![1, 2, 3].into_iter().collect(); - let sum = s.for_each(move |x| tx.clone().send(x).unwrap()).await; - - let v: Vec<_> = rx.iter().collect(); - - assert_eq!(v, vec![1, 2, 3]); - # - # }) } - ``` - "#] - fn for_each( - self, - f: F, - ) -> impl Future [ForEachFuture] - where - Self: Sized, - F: FnMut(Self::Item), - { - ForEachFuture::new(self, f) - } - #[doc = r#" Tests if any element of the stream matches a predicate. @@ -999,51 +927,6 @@ extension_trait! { Skip::new(self, n) } - #[doc = r#" - Applies a falliable function to each element in a stream, stopping at first error and returning it. - - # Examples - - ``` - # fn main() { async_std::task::block_on(async { - # - use std::collections::VecDeque; - use std::sync::mpsc::channel; - use async_std::prelude::*; - - let (tx, rx) = channel(); - - let s: VecDeque = vec![1, 2, 3].into_iter().collect(); - let s = s.try_for_each(|v| { - if v % 2 == 1 { - tx.clone().send(v).unwrap(); - Ok(()) - } else { - Err("even") - } - }); - - let res = s.await; - drop(tx); - let values: Vec<_> = rx.iter().collect(); - - assert_eq!(values, vec![1]); - assert_eq!(res, Err("even")); - # - # }) } - ``` - "#] - fn try_for_each( - self, - f: F, - ) -> impl Future [TryForEeachFuture] - where - Self: Sized, - F: FnMut(Self::Item) -> Result<(), E>, - { - TryForEeachFuture::new(self, f) - } - #[doc = r#" 'Zips up' two streams into a single stream of pairs. @@ -1221,11 +1104,11 @@ extension_trait! { fn partial_cmp( self, other: S - ) -> impl Future> [PartialCmpFuture] + ) -> impl Future> + '_ [PartialCmpFuture] where Self: Sized + Stream, - S: Stream, - ::Item: PartialOrd, + S: Stream, + Self::Item: PartialOrd, { PartialCmpFuture::new(self, other) } diff --git a/src/stream/stream/partial_cmp.rs b/src/stream/stream/partial_cmp.rs index f7de5cd12..5fc4b4f54 100644 --- a/src/stream/stream/partial_cmp.rs +++ b/src/stream/stream/partial_cmp.rs @@ -2,8 +2,8 @@ use std::cmp::Ordering; use std::pin::Pin; use super::fuse::Fuse; -use crate::future::Future; use crate::prelude::*; +use crate::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; @@ -15,7 +15,7 @@ pub struct PartialCmpFuture { l: Fuse, r: Fuse, l_cache: Option, - r_cache: Option, + r_cache: Option, } impl PartialCmpFuture { @@ -28,36 +28,36 @@ impl PartialCmpFuture { PartialCmpFuture { l: l.fuse(), r: r.fuse(), - l_cache: None, - r_cache: None, + l_cache: None, + r_cache: None, } } } -impl Future for PartialCmpFuture -where +impl Future for PartialCmpFuture +where L: Stream + Sized, R: Stream + Sized, - L::Item: PartialOrd, + L::Item: PartialOrd { type Output = Option; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - // Short circuit logic + // Short circuit logic // Stream that completes earliest can be considered Less, etc let l_complete = self.l.done && self.as_mut().l_cache.is_none(); let r_complete = self.r.done && self.as_mut().r_cache.is_none(); if l_complete && r_complete { - return Poll::Ready(Some(Ordering::Equal)); + return Poll::Ready(Some(Ordering::Equal)) } else if l_complete { - return Poll::Ready(Some(Ordering::Less)); + return Poll::Ready(Some(Ordering::Less)) } else if r_complete { - return Poll::Ready(Some(Ordering::Greater)); + return Poll::Ready(Some(Ordering::Greater)) } - // Get next value if possible and necesary + // Get next value if possible and necesary if !self.l.done && self.as_mut().l_cache.is_none() { let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx)); if let Some(item) = l_next { @@ -75,17 +75,17 @@ where // Compare if both values are available. if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() { let l_value = self.as_mut().l_cache().take().unwrap(); - let r_value = self.as_mut().r_cache().take().unwrap(); + let r_value = self.as_mut().r_cache().take().unwrap(); let result = l_value.partial_cmp(&r_value); if let Some(Ordering::Equal) = result { - // Reset cache to prepare for next comparison - *self.as_mut().l_cache() = None; + // Reset cache to prepare for next comparison + *self.as_mut().l_cache() = None; *self.as_mut().r_cache() = None; } else { - // Return non equal value + // Return non equal value return Poll::Ready(result); - } + } } } } From 5317d4fd563ebbb3e4725b60cfc65975aa20e49e Mon Sep 17 00:00:00 2001 From: arheard Date: Fri, 11 Oct 2019 18:42:56 -0400 Subject: [PATCH 08/14] removes new methods unrelated to branch --- src/stream/stream/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 573e1cc9e..7c5678133 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1104,11 +1104,11 @@ extension_trait! { fn partial_cmp( self, other: S - ) -> impl Future> + '_ [PartialCmpFuture] + ) -> impl Future> [PartialCmpFuture] where Self: Sized + Stream, - S: Stream, - Self::Item: PartialOrd, + S: Stream, + ::Item: PartialOrd, { PartialCmpFuture::new(self, other) } From 1c0ee5f0857b5742efaab216bb58a105cb302075 Mon Sep 17 00:00:00 2001 From: arheard Date: Mon, 30 Sep 2019 22:19:44 -0400 Subject: [PATCH 09/14] Adds partial_cmp.rs file and partial_cmp signature to mod.rs --- src/stream/stream/mod.rs | 1 + src/stream/stream/partial_cmp.rs | 36 ++++++++++++++++---------------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 7c5678133..9b888c641 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -54,6 +54,7 @@ use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; use partial_cmp::PartialCmpFuture; +use try_for_each::TryForEeachFuture; pub use chain::Chain; pub use filter::Filter; diff --git a/src/stream/stream/partial_cmp.rs b/src/stream/stream/partial_cmp.rs index 5fc4b4f54..f7de5cd12 100644 --- a/src/stream/stream/partial_cmp.rs +++ b/src/stream/stream/partial_cmp.rs @@ -2,8 +2,8 @@ use std::cmp::Ordering; use std::pin::Pin; use super::fuse::Fuse; -use crate::prelude::*; use crate::future::Future; +use crate::prelude::*; use crate::stream::Stream; use crate::task::{Context, Poll}; @@ -15,7 +15,7 @@ pub struct PartialCmpFuture { l: Fuse, r: Fuse, l_cache: Option, - r_cache: Option, + r_cache: Option, } impl PartialCmpFuture { @@ -28,36 +28,36 @@ impl PartialCmpFuture { PartialCmpFuture { l: l.fuse(), r: r.fuse(), - l_cache: None, - r_cache: None, + l_cache: None, + r_cache: None, } } } -impl Future for PartialCmpFuture -where +impl Future for PartialCmpFuture +where L: Stream + Sized, R: Stream + Sized, - L::Item: PartialOrd + L::Item: PartialOrd, { type Output = Option; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - // Short circuit logic + // Short circuit logic // Stream that completes earliest can be considered Less, etc let l_complete = self.l.done && self.as_mut().l_cache.is_none(); let r_complete = self.r.done && self.as_mut().r_cache.is_none(); if l_complete && r_complete { - return Poll::Ready(Some(Ordering::Equal)) + return Poll::Ready(Some(Ordering::Equal)); } else if l_complete { - return Poll::Ready(Some(Ordering::Less)) + return Poll::Ready(Some(Ordering::Less)); } else if r_complete { - return Poll::Ready(Some(Ordering::Greater)) + return Poll::Ready(Some(Ordering::Greater)); } - // Get next value if possible and necesary + // Get next value if possible and necesary if !self.l.done && self.as_mut().l_cache.is_none() { let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx)); if let Some(item) = l_next { @@ -75,17 +75,17 @@ where // Compare if both values are available. if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() { let l_value = self.as_mut().l_cache().take().unwrap(); - let r_value = self.as_mut().r_cache().take().unwrap(); + let r_value = self.as_mut().r_cache().take().unwrap(); let result = l_value.partial_cmp(&r_value); if let Some(Ordering::Equal) = result { - // Reset cache to prepare for next comparison - *self.as_mut().l_cache() = None; + // Reset cache to prepare for next comparison + *self.as_mut().l_cache() = None; *self.as_mut().r_cache() = None; } else { - // Return non equal value + // Return non equal value return Poll::Ready(result); - } + } } } } From 815a36eab9048810e15d6b863a33284655cd0bce Mon Sep 17 00:00:00 2001 From: assemblaj Date: Wed, 2 Oct 2019 13:14:10 -0400 Subject: [PATCH 10/14] Update src/stream/stream/partial_cmp.rs Hides docs as standard in our codebase for all Future types Co-Authored-By: Fedor Sakharov --- src/stream/stream/partial_cmp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/partial_cmp.rs b/src/stream/stream/partial_cmp.rs index f7de5cd12..fac2705dc 100644 --- a/src/stream/stream/partial_cmp.rs +++ b/src/stream/stream/partial_cmp.rs @@ -89,4 +89,4 @@ where } } } -} \ No newline at end of file +} From 4370787ac2b5afecc954b638feb5824782e53f80 Mon Sep 17 00:00:00 2001 From: arheard Date: Wed, 2 Oct 2019 17:57:01 -0400 Subject: [PATCH 11/14] Replaces waker function with loop, reomves Unpin trait bounds --- src/stream/stream/partial_cmp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/partial_cmp.rs b/src/stream/stream/partial_cmp.rs index fac2705dc..f7de5cd12 100644 --- a/src/stream/stream/partial_cmp.rs +++ b/src/stream/stream/partial_cmp.rs @@ -89,4 +89,4 @@ where } } } -} +} \ No newline at end of file From ded41379cc3797f01f6565ee9a4b69e57c4bca99 Mon Sep 17 00:00:00 2001 From: arheard Date: Fri, 11 Oct 2019 17:49:41 -0400 Subject: [PATCH 12/14] Revert "Adds latest version of relevant files" This reverts commit aaf263881238db026d2ba5145d08259ccee6aa49. --- src/stream/stream/mod.rs | 7 +++---- src/stream/stream/partial_cmp.rs | 36 ++++++++++++++++---------------- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 9b888c641..573e1cc9e 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -54,7 +54,6 @@ use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; use partial_cmp::PartialCmpFuture; -use try_for_each::TryForEeachFuture; pub use chain::Chain; pub use filter::Filter; @@ -1105,11 +1104,11 @@ extension_trait! { fn partial_cmp( self, other: S - ) -> impl Future> [PartialCmpFuture] + ) -> impl Future> + '_ [PartialCmpFuture] where Self: Sized + Stream, - S: Stream, - ::Item: PartialOrd, + S: Stream, + Self::Item: PartialOrd, { PartialCmpFuture::new(self, other) } diff --git a/src/stream/stream/partial_cmp.rs b/src/stream/stream/partial_cmp.rs index f7de5cd12..5fc4b4f54 100644 --- a/src/stream/stream/partial_cmp.rs +++ b/src/stream/stream/partial_cmp.rs @@ -2,8 +2,8 @@ use std::cmp::Ordering; use std::pin::Pin; use super::fuse::Fuse; -use crate::future::Future; use crate::prelude::*; +use crate::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; @@ -15,7 +15,7 @@ pub struct PartialCmpFuture { l: Fuse, r: Fuse, l_cache: Option, - r_cache: Option, + r_cache: Option, } impl PartialCmpFuture { @@ -28,36 +28,36 @@ impl PartialCmpFuture { PartialCmpFuture { l: l.fuse(), r: r.fuse(), - l_cache: None, - r_cache: None, + l_cache: None, + r_cache: None, } } } -impl Future for PartialCmpFuture -where +impl Future for PartialCmpFuture +where L: Stream + Sized, R: Stream + Sized, - L::Item: PartialOrd, + L::Item: PartialOrd { type Output = Option; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - // Short circuit logic + // Short circuit logic // Stream that completes earliest can be considered Less, etc let l_complete = self.l.done && self.as_mut().l_cache.is_none(); let r_complete = self.r.done && self.as_mut().r_cache.is_none(); if l_complete && r_complete { - return Poll::Ready(Some(Ordering::Equal)); + return Poll::Ready(Some(Ordering::Equal)) } else if l_complete { - return Poll::Ready(Some(Ordering::Less)); + return Poll::Ready(Some(Ordering::Less)) } else if r_complete { - return Poll::Ready(Some(Ordering::Greater)); + return Poll::Ready(Some(Ordering::Greater)) } - // Get next value if possible and necesary + // Get next value if possible and necesary if !self.l.done && self.as_mut().l_cache.is_none() { let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx)); if let Some(item) = l_next { @@ -75,17 +75,17 @@ where // Compare if both values are available. if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() { let l_value = self.as_mut().l_cache().take().unwrap(); - let r_value = self.as_mut().r_cache().take().unwrap(); + let r_value = self.as_mut().r_cache().take().unwrap(); let result = l_value.partial_cmp(&r_value); if let Some(Ordering::Equal) = result { - // Reset cache to prepare for next comparison - *self.as_mut().l_cache() = None; + // Reset cache to prepare for next comparison + *self.as_mut().l_cache() = None; *self.as_mut().r_cache() = None; } else { - // Return non equal value + // Return non equal value return Poll::Ready(result); - } + } } } } From 1fd82050edd781fb3294b44a92105654aa62d94d Mon Sep 17 00:00:00 2001 From: arheard Date: Fri, 11 Oct 2019 18:42:56 -0400 Subject: [PATCH 13/14] removes new methods unrelated to branch --- src/stream/stream/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 573e1cc9e..7c5678133 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1104,11 +1104,11 @@ extension_trait! { fn partial_cmp( self, other: S - ) -> impl Future> + '_ [PartialCmpFuture] + ) -> impl Future> [PartialCmpFuture] where Self: Sized + Stream, - S: Stream, - Self::Item: PartialOrd, + S: Stream, + ::Item: PartialOrd, { PartialCmpFuture::new(self, other) } From 17a3afb7cd834c338012f965a9a9dd7d2741a4f9 Mon Sep 17 00:00:00 2001 From: arheard Date: Fri, 11 Oct 2019 20:15:58 -0400 Subject: [PATCH 14/14] applies cargo fmt --- src/stream/stream/partial_cmp.rs | 36 ++++++++++++++++---------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/stream/stream/partial_cmp.rs b/src/stream/stream/partial_cmp.rs index 5fc4b4f54..f7de5cd12 100644 --- a/src/stream/stream/partial_cmp.rs +++ b/src/stream/stream/partial_cmp.rs @@ -2,8 +2,8 @@ use std::cmp::Ordering; use std::pin::Pin; use super::fuse::Fuse; -use crate::prelude::*; use crate::future::Future; +use crate::prelude::*; use crate::stream::Stream; use crate::task::{Context, Poll}; @@ -15,7 +15,7 @@ pub struct PartialCmpFuture { l: Fuse, r: Fuse, l_cache: Option, - r_cache: Option, + r_cache: Option, } impl PartialCmpFuture { @@ -28,36 +28,36 @@ impl PartialCmpFuture { PartialCmpFuture { l: l.fuse(), r: r.fuse(), - l_cache: None, - r_cache: None, + l_cache: None, + r_cache: None, } } } -impl Future for PartialCmpFuture -where +impl Future for PartialCmpFuture +where L: Stream + Sized, R: Stream + Sized, - L::Item: PartialOrd + L::Item: PartialOrd, { type Output = Option; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - // Short circuit logic + // Short circuit logic // Stream that completes earliest can be considered Less, etc let l_complete = self.l.done && self.as_mut().l_cache.is_none(); let r_complete = self.r.done && self.as_mut().r_cache.is_none(); if l_complete && r_complete { - return Poll::Ready(Some(Ordering::Equal)) + return Poll::Ready(Some(Ordering::Equal)); } else if l_complete { - return Poll::Ready(Some(Ordering::Less)) + return Poll::Ready(Some(Ordering::Less)); } else if r_complete { - return Poll::Ready(Some(Ordering::Greater)) + return Poll::Ready(Some(Ordering::Greater)); } - // Get next value if possible and necesary + // Get next value if possible and necesary if !self.l.done && self.as_mut().l_cache.is_none() { let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx)); if let Some(item) = l_next { @@ -75,17 +75,17 @@ where // Compare if both values are available. if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() { let l_value = self.as_mut().l_cache().take().unwrap(); - let r_value = self.as_mut().r_cache().take().unwrap(); + let r_value = self.as_mut().r_cache().take().unwrap(); let result = l_value.partial_cmp(&r_value); if let Some(Ordering::Equal) = result { - // Reset cache to prepare for next comparison - *self.as_mut().l_cache() = None; + // Reset cache to prepare for next comparison + *self.as_mut().l_cache() = None; *self.as_mut().r_cache() = None; } else { - // Return non equal value + // Return non equal value return Poll::Ready(result); - } + } } } }