Skip to content

Commit 11268a8

Browse files
committed
add stream-partition
1 parent c4ba11f commit 11268a8

File tree

2 files changed

+95
-0
lines changed

2 files changed

+95
-0
lines changed

src/stream/stream/mod.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ mod ne;
5454
mod next;
5555
mod nth;
5656
mod partial_cmp;
57+
mod partition;
5758
mod position;
5859
mod scan;
5960
mod skip;
@@ -91,6 +92,7 @@ use ne::NeFuture;
9192
use next::NextFuture;
9293
use nth::NthFuture;
9394
use partial_cmp::PartialCmpFuture;
95+
use partition::PartitionFuture;
9496
use position::PositionFuture;
9597
use try_fold::TryFoldFuture;
9698
use try_for_each::TryForEachFuture;
@@ -1308,6 +1310,42 @@ extension_trait! {
13081310
FoldFuture::new(self, init, f)
13091311
}
13101312

1313+
#[doc = r#"
1314+
A combinator that applies a function to every element in a stream
1315+
creating two collections from it.
1316+
1317+
# Examples
1318+
1319+
Basic usage:
1320+
1321+
```
1322+
# fn main() { async_std::task::block_on(async {
1323+
#
1324+
use async_std::prelude::*;
1325+
use async_std::stream;
1326+
1327+
let (even, odd): (Vec<i32>, Vec<i32>) = stream::from_iter(vec![1, 2, 3])
1328+
.partition(|&n| n % 2 == 0).await;
1329+
1330+
assert_eq!(even, vec![2]);
1331+
assert_eq!(odd, vec![1, 3]);
1332+
1333+
#
1334+
# }) }
1335+
```
1336+
"#]
1337+
fn partition<B, F>(
1338+
self,
1339+
f: F,
1340+
) -> impl Future<Output = (B, B)> [PartitionFuture<Self, F, B>]
1341+
where
1342+
Self: Sized,
1343+
F: FnMut(&Self::Item) -> bool,
1344+
B: Default,
1345+
{
1346+
PartitionFuture::new(self, f)
1347+
}
1348+
13111349
#[doc = r#"
13121350
Call a closure on each element of the stream.
13131351

src/stream/stream/partition.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use std::future::Future;
2+
use std::pin::Pin;
3+
use std::default::Default;
4+
use pin_project_lite::pin_project;
5+
6+
use crate::stream::Stream;
7+
use crate::task::{Context, Poll};
8+
9+
pin_project! {
10+
#[derive(Debug)]
11+
pub struct PartitionFuture<S, F, B> {
12+
#[pin]
13+
stream: S,
14+
f: F,
15+
res: Option<(B, B)>,
16+
}
17+
}
18+
19+
impl<S, F, B: Default> PartitionFuture<S, F, B> {
20+
pub(super) fn new(stream: S, f: F) -> Self {
21+
Self {
22+
stream,
23+
f,
24+
res: Some((B::default(), B::default())),
25+
}
26+
}
27+
}
28+
29+
impl<S, F, B> Future for PartitionFuture<S, F, B>
30+
where
31+
S: Stream + Sized,
32+
F: FnMut(&S::Item) -> bool,
33+
B: Default + Extend<S::Item>,
34+
{
35+
type Output = (B, B);
36+
37+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38+
let mut this = self.project();
39+
40+
loop {
41+
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
42+
43+
match next {
44+
Some(v) => {
45+
let mut res = this.res.take().unwrap();
46+
match (this.f)(&v) {
47+
true => res.0.extend(Some(v)),
48+
false => res.1.extend(Some(v)),
49+
};
50+
51+
*this.res = Some(res);
52+
}
53+
None => return Poll::Ready(this.res.take().unwrap()),
54+
}
55+
}
56+
}
57+
}

0 commit comments

Comments
 (0)