Skip to content

Commit 31cf932

Browse files
committed
wip: Add stream unzip
1 parent 3c6d41c commit 31cf932

File tree

2 files changed

+66
-0
lines changed

2 files changed

+66
-0
lines changed

src/stream/stream/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ cfg_unstable! {
130130
pub use flat_map::FlatMap;
131131
pub use timeout::{TimeoutError, Timeout};
132132
pub use throttle::Throttle;
133+
pub use unzip::UnzipFuture;
133134

134135
mod count;
135136
mod merge;
@@ -138,6 +139,7 @@ cfg_unstable! {
138139
mod partition;
139140
mod timeout;
140141
mod throttle;
142+
mod unzip;
141143
}
142144

143145
extension_trait! {
@@ -1717,6 +1719,17 @@ extension_trait! {
17171719
Zip::new(self, other)
17181720
}
17191721

1722+
#[cfg(feature = "unstable")]
1723+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1724+
fn unzip<A, B, FromA, FromB>(self) -> impl Future<Output = (FromA, FromB)> [UnzipFuture<Self, FromA, FromB>]
1725+
where
1726+
FromA: Default + Extend<A>,
1727+
FromB: Default + Extend<B>,
1728+
Self: Stream<Item = (A, B)> + Sized,
1729+
{
1730+
UnzipFuture::new(self)
1731+
}
1732+
17201733
#[doc = r#"
17211734
Transforms a stream into a collection.
17221735

src/stream/stream/unzip.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use std::future::Future;
2+
use std::pin::Pin;
3+
4+
use pin_project_lite::pin_project;
5+
6+
use crate::stream::Stream;
7+
use crate::task::{Context, Poll};
8+
9+
pin_project! {
10+
#[cfg(all(feature = "default", feature = "unstable"))]
11+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
12+
pub struct UnzipFuture<S: Stream, FromA, FromB> {
13+
#[pin]
14+
stream: S,
15+
res: (FromA, FromB),
16+
}
17+
}
18+
19+
impl<S: Stream, FromA, FromB> UnzipFuture<S, FromA, FromB>
20+
where
21+
FromA: Default,
22+
FromB: Default,
23+
{
24+
pub(super) fn new(stream: S) -> Self {
25+
UnzipFuture {
26+
stream,
27+
res: (FromA::default(), FromB::default()),
28+
}
29+
}
30+
}
31+
32+
impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
33+
where
34+
S: Stream<Item = (A, B)>,
35+
FromA: Default + Extend<A> + Copy,
36+
FromB: Default + Extend<B> + Copy,
37+
{
38+
type Output = (FromA, FromB);
39+
40+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
41+
let mut this = self.project();
42+
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
43+
44+
match next {
45+
Some((a, b)) => {
46+
this.res.0.extend(Some(a));
47+
this.res.1.extend(Some(b));
48+
Poll::Pending
49+
}
50+
None => Poll::Ready(*this.res),
51+
}
52+
}
53+
}

0 commit comments

Comments
 (0)