Skip to content

Commit 5b720ab

Browse files
committed
adds stream::fold combinator
1 parent 6f9ec66 commit 5b720ab

File tree

2 files changed

+91
-0
lines changed

2 files changed

+91
-0
lines changed

src/stream/stream/fold.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use std::marker::PhantomData;
2+
use std::pin::Pin;
3+
4+
use crate::future::Future;
5+
use crate::task::{Context, Poll};
6+
7+
#[doc(hidden)]
8+
#[allow(missing_debug_implementations)]
9+
pub struct FoldFuture<S, F, T, B> {
10+
stream: S,
11+
f: F,
12+
acc: Option<B>,
13+
__t: PhantomData<T>,
14+
}
15+
16+
impl<S, F, T, B> FoldFuture<S, F, T, B> {
17+
pin_utils::unsafe_pinned!(stream: S);
18+
pin_utils::unsafe_unpinned!(f: F);
19+
pin_utils::unsafe_unpinned!(acc: Option<B>);
20+
21+
pub(super) fn new(stream: S, init: B, f: F) -> Self {
22+
FoldFuture {
23+
stream,
24+
f,
25+
acc: Some(init),
26+
__t: PhantomData,
27+
}
28+
}
29+
}
30+
31+
impl<S, F, B> Future for FoldFuture<S, F, S::Item, B>
32+
where
33+
S: futures_core::stream::Stream + Unpin + Sized,
34+
F: FnMut(B, S::Item) -> B,
35+
{
36+
type Output = B;
37+
38+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
39+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
40+
41+
match next {
42+
Some(v) => {
43+
cx.waker().wake_by_ref();
44+
let old = self
45+
.as_mut()
46+
.acc()
47+
.take()
48+
.expect("FoldFuture should never contain None");
49+
let new = (self.as_mut().f())(old, v);
50+
*self.as_mut().acc() = Some(new);
51+
Poll::Pending
52+
}
53+
None => Poll::Ready(
54+
self.as_mut()
55+
.acc()
56+
.take()
57+
.expect("FoldFuture should never contain None"),
58+
),
59+
}
60+
}
61+
}

src/stream/stream/mod.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ mod all;
2525
mod any;
2626
mod filter_map;
2727
mod find_map;
28+
mod fold;
2829
mod min_by;
2930
mod next;
3031
mod nth;
@@ -36,6 +37,7 @@ use all::AllFuture;
3637
use any::AnyFuture;
3738
use filter_map::FilterMap;
3839
use find_map::FindMapFuture;
40+
use fold::FoldFuture;
3941
use min_by::MinByFuture;
4042
use next::NextFuture;
4143
use nth::NthFuture;
@@ -344,6 +346,34 @@ pub trait Stream {
344346
FindMapFuture::new(self, f)
345347
}
346348

349+
/// A combinator that applies a function to every element in a stream
350+
/// producing a single, final value.
351+
///
352+
/// # Examples
353+
///
354+
/// Basic usage:
355+
///
356+
/// ```
357+
/// # fn main() { async_std::task::block_on(async {
358+
/// #
359+
/// use async_std::prelude::*;
360+
/// use std::collections::VecDeque;
361+
///
362+
/// let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
363+
/// let sum = s.fold(0, |acc, x| acc + x).await;
364+
///
365+
/// assert_eq!(sum, 6);
366+
/// #
367+
/// # }) }
368+
/// ```
369+
fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, F, Self::Item, B>
370+
where
371+
Self: Sized,
372+
F: FnMut(B, Self::Item) -> B,
373+
{
374+
FoldFuture::new(self, init, f)
375+
}
376+
347377
/// Tests if any element of the stream matches a predicate.
348378
///
349379
/// `any()` takes a closure that returns `true` or `false`. It applies

0 commit comments

Comments
 (0)