Skip to content

Commit 91aa069

Browse files
committed
Adds cmp
1 parent 6ab154b commit 91aa069

File tree

2 files changed

+137
-0
lines changed

2 files changed

+137
-0
lines changed

src/stream/stream/cmp.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
use std::cmp::Ordering;
2+
use std::pin::Pin;
3+
4+
use super::fuse::Fuse;
5+
use crate::prelude::*;
6+
use crate::future::Future;
7+
use crate::stream::Stream;
8+
use crate::task::{Context, Poll};
9+
10+
// Lexicographically compares the elements of this `Stream` with those
11+
// of another using `Ord`.
12+
#[doc(hidden)]
13+
#[allow(missing_debug_implementations)]
14+
pub struct CmpFuture<L: Stream, R: Stream> {
15+
l: Fuse<L>,
16+
r: Fuse<R>,
17+
l_cache: Option<L::Item>,
18+
r_cache: Option<R::Item>,
19+
}
20+
21+
impl<L: Stream, R: Stream> CmpFuture<L, R> {
22+
pin_utils::unsafe_pinned!(l: Fuse<L>);
23+
pin_utils::unsafe_pinned!(r: Fuse<R>);
24+
pin_utils::unsafe_unpinned!(l_cache: Option<L::Item>);
25+
pin_utils::unsafe_unpinned!(r_cache: Option<R::Item>);
26+
27+
pub(super) fn new(l: L, r: R) -> Self {
28+
CmpFuture {
29+
l: l.fuse(),
30+
r: r.fuse(),
31+
l_cache: None,
32+
r_cache: None,
33+
}
34+
}
35+
}
36+
37+
impl<L: Stream, R: Stream> Future for CmpFuture<L, R>
38+
where
39+
L: Stream + Sized,
40+
R: Stream<Item = L::Item> + Sized,
41+
L::Item: Ord,
42+
{
43+
type Output = Ordering;
44+
45+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
46+
loop {
47+
// Stream that completes earliest can be considered Less, etc
48+
let l_complete = self.l.done && self.as_mut().l_cache.is_none();
49+
let r_complete = self.r.done && self.as_mut().r_cache.is_none();
50+
51+
if l_complete && r_complete {
52+
return Poll::Ready(Ordering::Equal)
53+
} else if l_complete {
54+
return Poll::Ready(Ordering::Less)
55+
} else if r_complete {
56+
return Poll::Ready(Ordering::Greater)
57+
}
58+
59+
// Get next value if possible and necesary
60+
if !self.l.done && self.as_mut().l_cache.is_none() {
61+
let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx));
62+
if let Some(item) = l_next {
63+
*self.as_mut().l_cache() = Some(item);
64+
}
65+
}
66+
67+
if !self.r.done && self.as_mut().r_cache.is_none() {
68+
let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx));
69+
if let Some(item) = r_next {
70+
*self.as_mut().r_cache() = Some(item);
71+
}
72+
}
73+
74+
// Compare if both values are available.
75+
if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() {
76+
let l_value = self.as_mut().l_cache().take().unwrap();
77+
let r_value = self.as_mut().r_cache().take().unwrap();
78+
let result = l_value.cmp(&r_value);
79+
80+
if let Ordering::Equal = result {
81+
// Reset cache to prepare for next comparison
82+
*self.as_mut().l_cache() = None;
83+
*self.as_mut().r_cache() = None;
84+
} else {
85+
// Return non equal value
86+
return Poll::Ready(result);
87+
}
88+
}
89+
}
90+
}
91+
}

src/stream/stream/mod.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
mod all;
2525
mod any;
2626
mod chain;
27+
mod cmp;
2728
mod enumerate;
2829
mod filter;
2930
mod filter_map;
@@ -47,6 +48,7 @@ mod zip;
4748

4849
use all::AllFuture;
4950
use any::AnyFuture;
51+
use cmp::CmpFuture;
5052
use enumerate::Enumerate;
5153
use filter_map::FilterMap;
5254
use find::FindFuture;
@@ -1147,6 +1149,50 @@ extension_trait! {
11471149
{
11481150
FromStream::from_stream(self)
11491151
}
1152+
1153+
#[doc = r#"
1154+
Lexicographically compares the elements of this `Stream` with those
1155+
of another using 'Ord'.
1156+
1157+
# Examples
1158+
```
1159+
# fn main() { async_std::task::block_on(async {
1160+
#
1161+
use async_std::prelude::*;
1162+
use std::collections::VecDeque;
1163+
1164+
use std::cmp::Ordering;
1165+
let result_equal = vec![1].into_iter().collect::<VecDeque<i64>>()
1166+
.cmp(vec![1].into_iter().collect::<VecDeque<i64>>()).await;
1167+
let result_less_count = vec![1].into_iter().collect::<VecDeque<i64>>()
1168+
.cmp(vec![1, 2].into_iter().collect::<VecDeque<i64>>()).await;
1169+
let result_greater_count = vec![1, 2].into_iter().collect::<VecDeque<i64>>()
1170+
.cmp(vec![1].into_iter().collect::<VecDeque<i64>>()).await;
1171+
let result_less_vals = vec![1, 2, 3].into_iter().collect::<VecDeque<i64>>()
1172+
.cmp(vec![1, 2, 4].into_iter().collect::<VecDeque<i64>>()).await;
1173+
let result_greater_vals = vec![1, 2, 4].into_iter().collect::<VecDeque<i64>>()
1174+
.cmp(vec![1, 2, 3].into_iter().collect::<VecDeque<i64>>()).await;
1175+
assert_eq!(result_equal, Ordering::Equal);
1176+
assert_eq!(result_less_count, Ordering::Less);
1177+
assert_eq!(result_greater_count, Ordering::Greater);
1178+
assert_eq!(result_less_vals, Ordering::Less);
1179+
assert_eq!(result_greater_vals, Ordering::Greater);
1180+
#
1181+
# }) }
1182+
```
1183+
"#]
1184+
fn cmp<S>(
1185+
self,
1186+
other: S
1187+
) -> impl Future<Output = Ordering> + '_ [CmpFuture<Self, S>]
1188+
where
1189+
Self: Sized + Stream,
1190+
S: Stream<Item = Self::Item>,
1191+
Self::Item: Ord,
1192+
{
1193+
CmpFuture::new(self, other)
1194+
}
1195+
11501196
}
11511197

11521198
impl<S: Stream + Unpin + ?Sized> Stream for Box<S> {

0 commit comments

Comments
 (0)