-
Notifications
You must be signed in to change notification settings - Fork 339
Add Stream::flatten and Stream::flat_map #367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
ec98b41
bb14164
2dee289
2187a2a
cd86208
8138afb
176359a
1c1e223
410d16e
3297a0f
271b6f4
00e7e58
001368d
0c5abee
b66ffa6
7ce721f
b7b5df1
6168952
bf3508f
8932cec
61b7a09
13a08b0
37f14b0
a42ae2f
6889762
040227f
ae7adf2
1554b04
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
use pin_project_lite::pin_project; | ||
use std::pin::Pin; | ||
|
||
use crate::prelude::*; | ||
use crate::stream::stream::map::Map; | ||
use crate::stream::{IntoStream, Stream}; | ||
use crate::task::{Context, Poll}; | ||
|
||
pin_project! { | ||
/// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its | ||
/// documentation for more. | ||
/// | ||
/// [`flat_map`]: trait.Stream.html#method.flat_map | ||
/// [`Stream`]: trait.Stream.html | ||
#[allow(missing_debug_implementations)] | ||
pub struct FlatMap<S, U, T, F> { | ||
#[pin] | ||
inner: FlattenCompat<Map<S, F, T, U>, U>, | ||
} | ||
} | ||
|
||
impl<S, U, F> FlatMap<S, U, S::Item, F> | ||
where | ||
S: Stream, | ||
U: IntoStream, | ||
F: FnMut(S::Item) -> U, | ||
{ | ||
pub fn new(stream: S, f: F) -> FlatMap<S, U, S::Item, F> { | ||
k-nasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
FlatMap { | ||
inner: FlattenCompat::new(stream.map(f)), | ||
} | ||
} | ||
} | ||
|
||
impl<S, U, F> Stream for FlatMap<S, U, S::Item, F> | ||
where | ||
S: Stream<Item: IntoStream<IntoStream = U, Item = U::Item>> + std::marker::Unpin, | ||
U: Stream + std::marker::Unpin, | ||
k-nasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
F: FnMut(S::Item) -> U, | ||
{ | ||
type Item = U::Item; | ||
|
||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
self.project().inner.poll_next(cx) | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of exporting The Thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yh! |
||
|
||
pin_project! { | ||
/// This `struct` is created by the [`flatten`] method on [`Stream`]. See its | ||
/// documentation for more. | ||
/// | ||
/// [`flatten`]: trait.Stream.html#method.flatten | ||
/// [`Stream`]: trait.Stream.html | ||
#[allow(missing_debug_implementations)] | ||
pub struct Flatten<S, U> { | ||
#[pin] | ||
inner: FlattenCompat<S, U> | ||
} | ||
} | ||
|
||
impl<S: Stream<Item: IntoStream>> Flatten<S, S::Item> { | ||
k-nasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pub fn new(stream: S) -> Flatten<S, S::Item> { | ||
k-nasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Flatten { | ||
inner: FlattenCompat::new(stream), | ||
} | ||
} | ||
} | ||
|
||
impl<S, U> Stream for Flatten<S, <S::Item as IntoStream>::IntoStream> | ||
where | ||
S: Stream<Item: IntoStream<IntoStream = U, Item = U::Item>> + std::marker::Unpin, | ||
U: Stream + std::marker::Unpin, | ||
k-nasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
type Item = U::Item; | ||
|
||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
self.project().inner.poll_next(cx) | ||
} | ||
} | ||
|
||
pin_project! { | ||
/// Real logic of both `Flatten` and `FlatMap` which simply delegate to | ||
/// this type. | ||
#[derive(Clone, Debug)] | ||
struct FlattenCompat<S, U> { | ||
stream: S, | ||
frontiter: Option<U>, | ||
k-nasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
impl<S, U> FlattenCompat<S, U> { | ||
/// Adapts an iterator by flattening it, for use in `flatten()` and `flat_map()`. | ||
pub fn new(stream: S) -> FlattenCompat<S, U> { | ||
k-nasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
FlattenCompat { | ||
stream, | ||
frontiter: None, | ||
} | ||
} | ||
} | ||
|
||
impl<S, U> Stream for FlattenCompat<S, U> | ||
where | ||
S: Stream<Item: IntoStream<IntoStream = U, Item = U::Item>> + std::marker::Unpin, | ||
U: Stream + std::marker::Unpin, | ||
k-nasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
type Item = U::Item; | ||
|
||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
let mut this = self.project(); | ||
loop { | ||
if let Some(inner) = this.frontiter { | ||
k-nasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if let item @ Some(_) = futures_core::ready!(Pin::new(inner).poll_next(cx)) { | ||
k-nasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return Poll::Ready(item); | ||
} | ||
} | ||
|
||
match futures_core::ready!(Pin::new(&mut this.stream).poll_next(cx)) { | ||
k-nasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
None => return Poll::Ready(None), | ||
Some(inner) => *this.frontiter = Some(inner.into_stream()), | ||
k-nasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::FlattenCompat; | ||
|
||
use crate::prelude::*; | ||
use crate::task; | ||
|
||
use std::collections::VecDeque; | ||
|
||
#[test] | ||
fn test_poll_next() -> std::io::Result<()> { | ||
let inner1: VecDeque<u8> = vec![1, 2, 3].into_iter().collect(); | ||
let inner2: VecDeque<u8> = vec![4, 5, 6].into_iter().collect(); | ||
|
||
let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect(); | ||
|
||
task::block_on(async move { | ||
let flat = FlattenCompat::new(s); | ||
let v: Vec<u8> = flat.collect().await; | ||
|
||
assert_eq!(v, vec![1, 2, 3, 4, 5, 6]); | ||
Ok(()) | ||
}) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.