Skip to content

Commit 1175a37

Browse files
authored
Merge pull request #367 from k-nasa/add_stream_flatten
Add Stream::flatten and Stream::flat_map
2 parents 206bedf + 1554b04 commit 1175a37

File tree

3 files changed

+196
-2
lines changed

3 files changed

+196
-2
lines changed

src/stream/stream/flat_map.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use pin_project_lite::pin_project;
2+
use std::pin::Pin;
3+
4+
use crate::prelude::*;
5+
use crate::stream::stream::map::Map;
6+
use crate::stream::{IntoStream, Stream};
7+
use crate::task::{Context, Poll};
8+
9+
pin_project! {
10+
/// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its
11+
/// documentation for more.
12+
///
13+
/// [`flat_map`]: trait.Stream.html#method.flat_map
14+
/// [`Stream`]: trait.Stream.html
15+
#[allow(missing_debug_implementations)]
16+
pub struct FlatMap<S, U, T, F> {
17+
#[pin]
18+
stream: Map<S, F, T, U>,
19+
#[pin]
20+
inner_stream: Option<U>,
21+
}
22+
}
23+
24+
impl<S, U, F> FlatMap<S, U, S::Item, F>
25+
where
26+
S: Stream,
27+
U: IntoStream,
28+
F: FnMut(S::Item) -> U,
29+
{
30+
pub(super) fn new(stream: S, f: F) -> FlatMap<S, U, S::Item, F> {
31+
FlatMap {
32+
stream: stream.map(f),
33+
inner_stream: None,
34+
}
35+
}
36+
}
37+
38+
impl<S, U, F> Stream for FlatMap<S, U, S::Item, F>
39+
where
40+
S: Stream,
41+
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
42+
U: Stream,
43+
F: FnMut(S::Item) -> U,
44+
{
45+
type Item = U::Item;
46+
47+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
48+
let mut this = self.project();
49+
loop {
50+
if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
51+
if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) {
52+
return Poll::Ready(item);
53+
}
54+
}
55+
56+
match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
57+
None => return Poll::Ready(None),
58+
Some(inner) => this.inner_stream.set(Some(inner.into_stream())),
59+
}
60+
}
61+
}
62+
}

src/stream/stream/flatten.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use pin_project_lite::pin_project;
2+
use std::pin::Pin;
3+
4+
use crate::stream::{IntoStream, Stream};
5+
use crate::task::{Context, Poll};
6+
7+
pin_project! {
8+
/// This `struct` is created by the [`flatten`] method on [`Stream`]. See its
9+
/// documentation for more.
10+
///
11+
/// [`flatten`]: trait.Stream.html#method.flatten
12+
/// [`Stream`]: trait.Stream.html
13+
#[allow(missing_debug_implementations)]
14+
pub struct Flatten<S, U> {
15+
#[pin]
16+
stream: S,
17+
#[pin]
18+
inner_stream: Option<U>,
19+
}
20+
}
21+
22+
impl<S> Flatten<S, S::Item>
23+
where
24+
S: Stream,
25+
S::Item: IntoStream,
26+
{
27+
pub(super) fn new(stream: S) -> Flatten<S, S::Item> {
28+
Flatten {
29+
stream,
30+
inner_stream: None,
31+
}
32+
}
33+
}
34+
35+
impl<S, U> Stream for Flatten<S, <S::Item as IntoStream>::IntoStream>
36+
where
37+
S: Stream,
38+
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
39+
U: Stream,
40+
{
41+
type Item = U::Item;
42+
43+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
44+
let mut this = self.project();
45+
loop {
46+
if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
47+
if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) {
48+
return Poll::Ready(item);
49+
}
50+
}
51+
52+
match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
53+
None => return Poll::Ready(None),
54+
Some(inner) => this.inner_stream.set(Some(inner.into_stream())),
55+
}
56+
}
57+
}
58+
}

src/stream/stream/mod.rs

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,17 @@ cfg_unstable! {
9696
use std::time::Duration;
9797

9898
use crate::future::Future;
99-
use crate::stream::FromStream;
100-
use crate::stream::{Product, Sum};
99+
use crate::stream::into_stream::IntoStream;
100+
use crate::stream::{FromStream, Product, Sum};
101101

102102
pub use merge::Merge;
103+
pub use flatten::Flatten;
104+
pub use flat_map::FlatMap;
103105
pub use timeout::{TimeoutError, Timeout};
104106

105107
mod merge;
108+
mod flatten;
109+
mod flat_map;
106110
mod timeout;
107111
}
108112

@@ -563,6 +567,76 @@ extension_trait! {
563567
Filter::new(self, predicate)
564568
}
565569

570+
#[doc= r#"
571+
Creates an stream that works like map, but flattens nested structure.
572+
573+
# Examples
574+
575+
Basic usage:
576+
577+
```
578+
# async_std::task::block_on(async {
579+
580+
use std::collections::VecDeque;
581+
use async_std::prelude::*;
582+
use async_std::stream::IntoStream;
583+
584+
let inner1: VecDeque<u8> = vec![1,2,3].into_iter().collect();
585+
let inner2: VecDeque<u8> = vec![4,5,6].into_iter().collect();
586+
587+
let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect();
588+
589+
let v :Vec<_> = s.flat_map(|s| s.into_stream()).collect().await;
590+
591+
assert_eq!(v, vec![1,2,3,4,5,6]);
592+
593+
# });
594+
```
595+
"#]
596+
#[cfg(feature = "unstable")]
597+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
598+
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, Self::Item, F>
599+
where
600+
Self: Sized,
601+
U: IntoStream,
602+
F: FnMut(Self::Item) -> U,
603+
{
604+
FlatMap::new(self, f)
605+
}
606+
607+
#[doc = r#"
608+
Creates an stream that flattens nested structure.
609+
610+
# Examples
611+
612+
Basic usage:
613+
614+
```
615+
# async_std::task::block_on(async {
616+
617+
use std::collections::VecDeque;
618+
use async_std::prelude::*;
619+
620+
let inner1: VecDeque<u8> = vec![1,2,3].into_iter().collect();
621+
let inner2: VecDeque<u8> = vec![4,5,6].into_iter().collect();
622+
let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect();
623+
624+
let v: Vec<_> = s.flatten().collect().await;
625+
626+
assert_eq!(v, vec![1,2,3,4,5,6]);
627+
628+
# });
629+
"#]
630+
#[cfg(feature = "unstable")]
631+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
632+
fn flatten(self) -> Flatten<Self, Self::Item>
633+
where
634+
Self: Sized,
635+
Self::Item: IntoStream,
636+
{
637+
Flatten::new(self)
638+
}
639+
566640
#[doc = r#"
567641
Both filters and maps a stream.
568642

0 commit comments

Comments
 (0)