Skip to content

Commit ff7b5d3

Browse files
bors[bot]montekki
andauthored
Merge #233
233: adds stream::chain combinator r=stjepang a=montekki Adds a `Chain` combinator and Introduces some changes to `Fuse` so it's usable in this case, but those need a closer look. --- Ref: #129 Stdlib: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.chain Co-authored-by: Fedor Sakharov <fedor.sakharov@gmail.com>
2 parents 697a720 + 2a2a473 commit ff7b5d3

File tree

3 files changed

+90
-3
lines changed

3 files changed

+90
-3
lines changed

src/stream/stream/chain.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use std::pin::Pin;
2+
3+
use super::fuse::Fuse;
4+
use crate::stream::Stream;
5+
use crate::task::{Context, Poll};
6+
7+
/// Chains two streams one after another.
8+
#[derive(Debug)]
9+
pub struct Chain<S, U> {
10+
first: Fuse<S>,
11+
second: Fuse<U>,
12+
}
13+
14+
impl<S: Stream, U: Stream> Chain<S, U> {
15+
pin_utils::unsafe_pinned!(first: Fuse<S>);
16+
pin_utils::unsafe_pinned!(second: Fuse<U>);
17+
18+
pub(super) fn new(first: S, second: U) -> Self {
19+
Chain {
20+
first: first.fuse(),
21+
second: second.fuse(),
22+
}
23+
}
24+
}
25+
26+
impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
27+
type Item = S::Item;
28+
29+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
30+
if !self.first.done {
31+
let next = futures_core::ready!(self.as_mut().first().poll_next(cx));
32+
if let Some(next) = next {
33+
return Poll::Ready(Some(next));
34+
}
35+
}
36+
37+
if !self.second.done {
38+
let next = futures_core::ready!(self.as_mut().second().poll_next(cx));
39+
if let Some(next) = next {
40+
return Poll::Ready(Some(next));
41+
}
42+
}
43+
44+
if self.first.done && self.second.done {
45+
return Poll::Ready(None);
46+
}
47+
48+
Poll::Pending
49+
}
50+
}

src/stream/stream/fuse.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use std::pin::Pin;
2-
use std::task::{Context, Poll};
2+
3+
use crate::stream::Stream;
4+
use crate::task::{Context, Poll};
35

46
/// A `Stream` that is permanently closed once a single call to `poll` results in
57
/// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`.
@@ -11,12 +13,12 @@ pub struct Fuse<S> {
1113

1214
impl<S: Unpin> Unpin for Fuse<S> {}
1315

14-
impl<S: futures_core::Stream> Fuse<S> {
16+
impl<S: Stream> Fuse<S> {
1517
pin_utils::unsafe_pinned!(stream: S);
1618
pin_utils::unsafe_unpinned!(done: bool);
1719
}
1820

19-
impl<S: futures_core::Stream> futures_core::Stream for Fuse<S> {
21+
impl<S: Stream> Stream for Fuse<S> {
2022
type Item = S::Item;
2123

2224
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {

src/stream/stream/mod.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
2424
mod all;
2525
mod any;
26+
mod chain;
2627
mod enumerate;
2728
mod filter;
2829
mod filter_map;
@@ -41,6 +42,7 @@ mod step_by;
4142
mod take;
4243
mod zip;
4344

45+
pub use chain::Chain;
4446
pub use filter::Filter;
4547
pub use fuse::Fuse;
4648
pub use inspect::Inspect;
@@ -268,6 +270,39 @@ pub trait Stream {
268270
StepBy::new(self, step)
269271
}
270272

273+
/// Takes two streams and creates a new stream over both in sequence.
274+
///
275+
/// # Examples
276+
///
277+
/// Basic usage:
278+
///
279+
/// ```
280+
/// # fn main() { async_std::task::block_on(async {
281+
/// #
282+
/// use async_std::prelude::*;
283+
/// use std::collections::VecDeque;
284+
///
285+
/// let first: VecDeque<_> = vec![0u8, 1].into_iter().collect();
286+
/// let second: VecDeque<_> = vec![2, 3].into_iter().collect();
287+
/// let mut c = first.chain(second);
288+
///
289+
/// assert_eq!(c.next().await, Some(0));
290+
/// assert_eq!(c.next().await, Some(1));
291+
/// assert_eq!(c.next().await, Some(2));
292+
/// assert_eq!(c.next().await, Some(3));
293+
/// assert_eq!(c.next().await, None);
294+
///
295+
/// #
296+
/// # }) }
297+
/// ```
298+
fn chain<U>(self, other: U) -> Chain<Self, U>
299+
where
300+
Self: Sized,
301+
U: Stream<Item = Self::Item> + Sized,
302+
{
303+
Chain::new(self, other)
304+
}
305+
271306
/// Creates a stream that gives the current element's count as well as the next value.
272307
///
273308
/// # Overflow behaviour.

0 commit comments

Comments
 (0)