Skip to content

Commit 6470130

Browse files
bors[bot]montekki
andauthored
Merge #227
227: adds stream::inspect combinator r=stjepang a=montekki Ref: #129 Stdlib: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.inspect Co-authored-by: Fedor Sakharov <fedor.sakharov@gmail.com>
2 parents 2acc070 + bf7121d commit 6470130

File tree

2 files changed

+76
-0
lines changed

2 files changed

+76
-0
lines changed

src/stream/stream/inspect.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use std::marker::PhantomData;
2+
use std::pin::Pin;
3+
4+
use crate::stream::Stream;
5+
use crate::task::{Context, Poll};
6+
7+
/// A stream that does something with each element of another stream.
8+
#[derive(Debug)]
9+
pub struct Inspect<S, F, T> {
10+
stream: S,
11+
f: F,
12+
__t: PhantomData<T>,
13+
}
14+
15+
impl<S, F, T> Inspect<S, F, T> {
16+
pin_utils::unsafe_pinned!(stream: S);
17+
pin_utils::unsafe_unpinned!(f: F);
18+
19+
pub(super) fn new(stream: S, f: F) -> Self {
20+
Inspect {
21+
stream,
22+
f,
23+
__t: PhantomData,
24+
}
25+
}
26+
}
27+
28+
impl<S, F> Stream for Inspect<S, F, S::Item>
29+
where
30+
S: Stream,
31+
F: FnMut(&S::Item),
32+
{
33+
type Item = S::Item;
34+
35+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
37+
38+
Poll::Ready(next.and_then(|x| {
39+
(self.as_mut().f())(&x);
40+
Some(x)
41+
}))
42+
}
43+
}

src/stream/stream/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ mod find;
3030
mod find_map;
3131
mod fold;
3232
mod fuse;
33+
mod inspect;
3334
mod min_by;
3435
mod next;
3536
mod nth;
@@ -41,6 +42,7 @@ mod zip;
4142

4243
pub use filter::Filter;
4344
pub use fuse::Fuse;
45+
pub use inspect::Inspect;
4446
pub use scan::Scan;
4547
pub use skip::Skip;
4648
pub use skip_while::SkipWhile;
@@ -260,6 +262,37 @@ pub trait Stream {
260262
Enumerate::new(self)
261263
}
262264

265+
/// A combinator that does something with each element in the stream, passing the value on.
266+
///
267+
/// # Examples
268+
///
269+
/// Basic usage:
270+
///
271+
/// ```
272+
/// # fn main() { async_std::task::block_on(async {
273+
/// #
274+
/// use async_std::prelude::*;
275+
/// use std::collections::VecDeque;
276+
///
277+
/// let a: VecDeque<_> = vec![1u8, 2, 3, 4, 5].into_iter().collect();
278+
/// let sum = a
279+
/// .inspect(|x| println!("about to filter {}", x))
280+
/// .filter(|x| x % 2 == 0)
281+
/// .inspect(|x| println!("made it through filter: {}", x))
282+
/// .fold(0, |sum, i| sum + i).await;
283+
///
284+
/// assert_eq!(sum, 6);
285+
/// #
286+
/// # }) }
287+
/// ```
288+
fn inspect<F>(self, f: F) -> Inspect<Self, F, Self::Item>
289+
where
290+
Self: Sized,
291+
F: FnMut(&Self::Item),
292+
{
293+
Inspect::new(self, f)
294+
}
295+
263296
/// Transforms this `Stream` into a "fused" `Stream` such that after the first time `poll`
264297
/// returns `Poll::Ready(None)`, all future calls to `poll` will also return
265298
/// `Poll::Ready(None)`.

0 commit comments

Comments
 (0)