Skip to content

Commit 6889762

Browse files
committed
fix: Split FlattenCompat logic to Flatten and FlatMap
1 parent a42ae2f commit 6889762

File tree

3 files changed

+72
-106
lines changed

3 files changed

+72
-106
lines changed

src/stream/stream/flat_map.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use pin_project_lite::pin_project;
2+
use std::pin::Pin;
3+
4+
use crate::prelude::*;
5+
use crate::stream::stream::map::Map;
6+
use crate::stream::{IntoStream, Stream};
7+
use crate::task::{Context, Poll};
8+
9+
pin_project! {
10+
/// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its
11+
/// documentation for more.
12+
///
13+
/// [`flat_map`]: trait.Stream.html#method.flat_map
14+
/// [`Stream`]: trait.Stream.html
15+
#[allow(missing_debug_implementations)]
16+
pub struct FlatMap<S, U, T, F> {
17+
#[pin]
18+
stream: Map<S, F, T, U>,
19+
#[pin]
20+
inner_stream: Option<U>,
21+
}
22+
}
23+
24+
impl<S, U, F> FlatMap<S, U, S::Item, F>
25+
where
26+
S: Stream,
27+
U: IntoStream,
28+
F: FnMut(S::Item) -> U,
29+
{
30+
pub(super) fn new(stream: S, f: F) -> FlatMap<S, U, S::Item, F> {
31+
FlatMap {
32+
stream: stream.map(f),
33+
inner_stream: None,
34+
}
35+
}
36+
}
37+
38+
impl<S, U, F> Stream for FlatMap<S, U, S::Item, F>
39+
where
40+
S: Stream,
41+
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
42+
U: Stream,
43+
F: FnMut(S::Item) -> U,
44+
{
45+
type Item = U::Item;
46+
47+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
48+
let mut this = self.project();
49+
loop {
50+
if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
51+
if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) {
52+
return Poll::Ready(item);
53+
}
54+
}
55+
56+
match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
57+
None => return Poll::Ready(None),
58+
Some(inner) => this.inner_stream.set(Some(inner.into_stream())),
59+
}
60+
}
61+
}
62+
}

src/stream/stream/flatten.rs

Lines changed: 7 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -6,46 +6,6 @@ use crate::stream::stream::map::Map;
66
use crate::stream::{IntoStream, Stream};
77
use crate::task::{Context, Poll};
88

9-
pin_project! {
10-
/// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its
11-
/// documentation for more.
12-
///
13-
/// [`flat_map`]: trait.Stream.html#method.flat_map
14-
/// [`Stream`]: trait.Stream.html
15-
#[allow(missing_debug_implementations)]
16-
pub struct FlatMap<S, U, T, F> {
17-
#[pin]
18-
inner: FlattenCompat<Map<S, F, T, U>, U>,
19-
}
20-
}
21-
22-
impl<S, U, F> FlatMap<S, U, S::Item, F>
23-
where
24-
S: Stream,
25-
U: IntoStream,
26-
F: FnMut(S::Item) -> U,
27-
{
28-
pub(super) fn new(stream: S, f: F) -> FlatMap<S, U, S::Item, F> {
29-
FlatMap {
30-
inner: FlattenCompat::new(stream.map(f)),
31-
}
32-
}
33-
}
34-
35-
impl<S, U, F> Stream for FlatMap<S, U, S::Item, F>
36-
where
37-
S: Stream,
38-
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
39-
U: Stream,
40-
F: FnMut(S::Item) -> U,
41-
{
42-
type Item = U::Item;
43-
44-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
45-
self.project().inner.poll_next(cx)
46-
}
47-
}
48-
499
pin_project! {
5010
/// This `struct` is created by the [`flatten`] method on [`Stream`]. See its
5111
/// documentation for more.
@@ -55,7 +15,9 @@ pin_project! {
5515
#[allow(missing_debug_implementations)]
5616
pub struct Flatten<S, U> {
5717
#[pin]
58-
inner: FlattenCompat<S, U>
18+
stream: S,
19+
#[pin]
20+
inner_stream: Option<U>,
5921
}
6022
}
6123

@@ -66,47 +28,13 @@ where
6628
{
6729
pub(super) fn new(stream: S) -> Flatten<S, S::Item> {
6830
Flatten {
69-
inner: FlattenCompat::new(stream),
70-
}
71-
}
72-
}
73-
74-
impl<S, U> Stream for Flatten<S, <S::Item as IntoStream>::IntoStream>
75-
where
76-
S: Stream,
77-
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
78-
U: Stream,
79-
{
80-
type Item = U::Item;
81-
82-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83-
self.project().inner.poll_next(cx)
84-
}
85-
}
86-
87-
pin_project! {
88-
/// Real logic of both `Flatten` and `FlatMap` which simply delegate to
89-
/// this type.
90-
#[derive(Clone, Debug)]
91-
struct FlattenCompat<S, U> {
92-
#[pin]
93-
stream: S,
94-
#[pin]
95-
frontiter: Option<U>,
96-
}
97-
}
98-
99-
impl<S, U> FlattenCompat<S, U> {
100-
/// Adapts an iterator by flattening it, for use in `flatten()` and `flat_map()`.
101-
fn new(stream: S) -> FlattenCompat<S, U> {
102-
FlattenCompat {
10331
stream,
104-
frontiter: None,
32+
inner_stream: None,
10533
}
10634
}
10735
}
10836

109-
impl<S, U> Stream for FlattenCompat<S, U>
37+
impl<S, U> Stream for Flatten<S, <S::Item as IntoStream>::IntoStream>
11038
where
11139
S: Stream,
11240
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
@@ -117,42 +45,16 @@ where
11745
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
11846
let mut this = self.project();
11947
loop {
120-
if let Some(inner) = this.frontiter.as_mut().as_pin_mut() {
48+
if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
12149
if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) {
12250
return Poll::Ready(item);
12351
}
12452
}
12553

12654
match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
12755
None => return Poll::Ready(None),
128-
Some(inner) => this.frontiter.set(Some(inner.into_stream())),
56+
Some(inner) => this.inner_stream.set(Some(inner.into_stream())),
12957
}
13058
}
13159
}
13260
}
133-
134-
#[cfg(test)]
135-
mod tests {
136-
use super::FlattenCompat;
137-
138-
use crate::prelude::*;
139-
use crate::task;
140-
141-
use std::collections::VecDeque;
142-
143-
#[test]
144-
fn test_poll_next() -> std::io::Result<()> {
145-
let inner1: VecDeque<u8> = vec![1, 2, 3].into_iter().collect();
146-
let inner2: VecDeque<u8> = vec![4, 5, 6].into_iter().collect();
147-
148-
let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect();
149-
150-
task::block_on(async move {
151-
let flat = FlattenCompat::new(s);
152-
let v: Vec<u8> = flat.collect().await;
153-
154-
assert_eq!(v, vec![1, 2, 3, 4, 5, 6]);
155-
Ok(())
156-
})
157-
}
158-
}

src/stream/stream/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,12 @@ cfg_unstable! {
9999
use crate::stream::into_stream::IntoStream;
100100

101101
pub use merge::Merge;
102-
pub use flatten::{FlatMap, Flatten};
102+
pub use flatten::Flatten;
103+
pub use flat_map::FlatMap;
103104

104105
mod merge;
105106
mod flatten;
107+
mod flat_map;
106108
}
107109

108110
extension_trait! {

0 commit comments

Comments
 (0)