Skip to content

Commit 603b3c5

Browse files
committed
add: Add stream unzip
1 parent 31cf932 commit 603b3c5

File tree

2 files changed

+19
-13
lines changed

2 files changed

+19
-13
lines changed

src/stream/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,13 @@ cfg_unstable! {
124124

125125
use count::CountFuture;
126126
use partition::PartitionFuture;
127+
use unzip::UnzipFuture;
127128

128129
pub use merge::Merge;
129130
pub use flatten::Flatten;
130131
pub use flat_map::FlatMap;
131132
pub use timeout::{TimeoutError, Timeout};
132133
pub use throttle::Throttle;
133-
pub use unzip::UnzipFuture;
134134

135135
mod count;
136136
mod merge;

src/stream/stream/unzip.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@ use crate::stream::Stream;
77
use crate::task::{Context, Poll};
88

99
pin_project! {
10+
#[derive(Clone, Debug)]
1011
#[cfg(all(feature = "default", feature = "unstable"))]
1112
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
12-
pub struct UnzipFuture<S: Stream, FromA, FromB> {
13+
pub struct UnzipFuture<S, FromA, FromB> {
1314
#[pin]
1415
stream: S,
15-
res: (FromA, FromB),
16+
res: Option<(FromA, FromB)>,
1617
}
1718
}
1819

@@ -24,30 +25,35 @@ where
2425
pub(super) fn new(stream: S) -> Self {
2526
UnzipFuture {
2627
stream,
27-
res: (FromA::default(), FromB::default()),
28+
res: Some((FromA::default(), FromB::default())),
2829
}
2930
}
3031
}
3132

3233
impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
3334
where
3435
S: Stream<Item = (A, B)>,
35-
FromA: Default + Extend<A> + Copy,
36-
FromB: Default + Extend<B> + Copy,
36+
FromA: Default + Extend<A>,
37+
FromB: Default + Extend<B>,
3738
{
3839
type Output = (FromA, FromB);
3940

4041
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
4142
let mut this = self.project();
42-
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
4343

44-
match next {
45-
Some((a, b)) => {
46-
this.res.0.extend(Some(a));
47-
this.res.1.extend(Some(b));
48-
Poll::Pending
44+
loop {
45+
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
46+
47+
match next {
48+
Some((a, b)) => {
49+
let mut res = this.res.take().unwrap();
50+
res.0.extend(Some(a));
51+
res.1.extend(Some(b));
52+
53+
*this.res = Some(res);
54+
}
55+
None => return Poll::Ready(this.res.take().unwrap()),
4956
}
50-
None => Poll::Ready(*this.res),
5157
}
5258
}
5359
}

0 commit comments

Comments
 (0)