Skip to content

Commit 335fa5f

Browse files
committed
Replaces waker function with loop, reomves Unpin trait bounds
1 parent e66e312 commit 335fa5f

File tree

1 file changed

+42
-44
lines changed

1 file changed

+42
-44
lines changed

src/stream/stream/partial_cmp.rs

Lines changed: 42 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ pub struct PartialCmpFuture<L: Stream, R: Stream> {
1818
r_cache: Option<R::Item>,
1919
}
2020

21-
impl<L: Stream + Unpin, R: Stream + Unpin> Unpin for PartialCmpFuture<L, R> {}
22-
2321
impl<L: Stream, R: Stream> PartialCmpFuture<L, R> {
2422
pin_utils::unsafe_pinned!(l: Fuse<L>);
2523
pin_utils::unsafe_pinned!(r: Fuse<R>);
24+
pin_utils::unsafe_unpinned!(l_cache: Option<L::Item>);
25+
pin_utils::unsafe_unpinned!(r_cache: Option<R::Item>);
2626

2727
pub(super) fn new(l: L, r: R) -> Self {
2828
PartialCmpFuture {
@@ -36,59 +36,57 @@ impl<L: Stream, R: Stream> PartialCmpFuture<L, R> {
3636

3737
impl<L: Stream, R: Stream> Future for PartialCmpFuture<L, R>
3838
where
39-
L: Stream + Unpin + Sized,
40-
R: Stream + Unpin + Sized,
39+
L: Stream + Sized,
40+
R: Stream + Sized,
4141
L::Item: PartialOrd<R::Item>
4242
{
4343
type Output = Option<Ordering>;
4444

4545
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
46-
// Short circuit logic
47-
// Stream that completes earliest can be considered Less, etc
48-
let l_complete = self.l.done && self.as_mut().l_cache.is_none();
49-
let r_complete = self.r.done && self.as_mut().r_cache.is_none();
46+
loop {
47+
// Short circuit logic
48+
// Stream that completes earliest can be considered Less, etc
49+
let l_complete = self.l.done && self.as_mut().l_cache.is_none();
50+
let r_complete = self.r.done && self.as_mut().r_cache.is_none();
5051

51-
if l_complete && r_complete {
52-
return Poll::Ready(Some(Ordering::Equal))
53-
} else if l_complete {
54-
return Poll::Ready(Some(Ordering::Less))
55-
} else if r_complete {
56-
return Poll::Ready(Some(Ordering::Greater))
57-
}
52+
if l_complete && r_complete {
53+
return Poll::Ready(Some(Ordering::Equal))
54+
} else if l_complete {
55+
return Poll::Ready(Some(Ordering::Less))
56+
} else if r_complete {
57+
return Poll::Ready(Some(Ordering::Greater))
58+
}
5859

59-
// Get next value if possible and necesary
60-
if !self.l.done && self.as_mut().l_cache.is_none() {
61-
let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx));
62-
if let Some(item) = l_next {
63-
self.as_mut().l_cache = Some(item);
60+
// Get next value if possible and necesary
61+
if !self.l.done && self.as_mut().l_cache.is_none() {
62+
let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx));
63+
if let Some(item) = l_next {
64+
*self.as_mut().l_cache() = Some(item);
65+
}
6466
}
65-
}
6667

67-
if !self.r.done && self.as_mut().r_cache.is_none() {
68-
let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx));
69-
if let Some(item) = r_next {
70-
self.as_mut().r_cache = Some(item);
68+
if !self.r.done && self.as_mut().r_cache.is_none() {
69+
let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx));
70+
if let Some(item) = r_next {
71+
*self.as_mut().r_cache() = Some(item);
72+
}
7173
}
72-
}
7374

74-
// Compare if both values are available.
75-
if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() {
76-
let l_value = self.as_mut().l_cache.take().unwrap();
77-
let r_value = self.as_mut().r_cache.take().unwrap();
78-
let result = l_value.partial_cmp(&r_value);
75+
// Compare if both values are available.
76+
if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() {
77+
let l_value = self.as_mut().l_cache().take().unwrap();
78+
let r_value = self.as_mut().r_cache().take().unwrap();
79+
let result = l_value.partial_cmp(&r_value);
7980

80-
if let Some(Ordering::Equal) = result {
81-
// Reset cache to prepare for next comparison
82-
self.as_mut().l_cache = None;
83-
self.as_mut().r_cache = None;
84-
} else {
85-
// Return non equal value
86-
return Poll::Ready(result);
87-
}
81+
if let Some(Ordering::Equal) = result {
82+
// Reset cache to prepare for next comparison
83+
*self.as_mut().l_cache() = None;
84+
*self.as_mut().r_cache() = None;
85+
} else {
86+
// Return non equal value
87+
return Poll::Ready(result);
88+
}
89+
}
8890
}
89-
90-
// wakes task to pull the next item from stream
91-
cx.waker().wake_by_ref();
92-
Poll::Pending
9391
}
94-
}
92+
}

0 commit comments

Comments
 (0)