Skip to content

Commit ca4856a

Browse files
authored
Merge pull request #377 from ktomsic/sum-and-product-impls
Add `Stream::sum()` and `Stream::product()` implementations
2 parents e567515 + e26eb7a commit ca4856a

File tree

9 files changed

+476
-7
lines changed

9 files changed

+476
-7
lines changed

src/option/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,8 @@ mod from_stream;
77

88
#[doc(inline)]
99
pub use std::option::Option;
10+
11+
cfg_unstable! {
12+
mod product;
13+
mod sum;
14+
}

src/option/product.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use std::pin::Pin;
2+
3+
use crate::prelude::*;
4+
use crate::stream::{Stream, Product};
5+
6+
impl<T, U> Product<Option<U>> for Option<T>
7+
where
8+
T: Product<U>,
9+
{
10+
#[doc = r#"
11+
Takes each element in the `Stream`: if it is a `None`, no further
12+
elements are taken, and the `None` is returned. Should no `None` occur,
13+
the product of all elements is returned.
14+
15+
# Examples
16+
17+
This multiplies every integer in a vector, rejecting the product if a negative element is
18+
encountered:
19+
20+
```
21+
# fn main() { async_std::task::block_on(async {
22+
#
23+
use std::collections::VecDeque;
24+
use async_std::prelude::*;
25+
26+
let v: VecDeque<_> = vec![1, 2, 4].into_iter().collect();
27+
let prod: Option<i32> = v.map(|x|
28+
if x < 0 {
29+
None
30+
} else {
31+
Some(x)
32+
}).product().await;
33+
assert_eq!(prod, Some(8));
34+
#
35+
# }) }
36+
```
37+
"#]
38+
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
39+
where S: Stream<Item = Option<U>> + 'a
40+
{
41+
Box::pin(async move {
42+
pin_utils::pin_mut!(stream);
43+
44+
// Using `scan` here because it is able to stop the stream early
45+
// if a failure occurs
46+
let mut found_none = false;
47+
let out = <T as Product<U>>::product(stream
48+
.scan((), |_, elem| {
49+
match elem {
50+
Some(elem) => Some(elem),
51+
None => {
52+
found_none = true;
53+
// Stop processing the stream on error
54+
None
55+
}
56+
}
57+
})).await;
58+
59+
if found_none {
60+
None
61+
} else {
62+
Some(out)
63+
}
64+
})
65+
}
66+
}

src/option/sum.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
use std::pin::Pin;
2+
3+
use crate::prelude::*;
4+
use crate::stream::{Stream, Sum};
5+
6+
impl<T, U> Sum<Option<U>> for Option<T>
7+
where
8+
T: Sum<U>,
9+
{
10+
#[doc = r#"
11+
Takes each element in the `Iterator`: if it is a `None`, no further
12+
elements are taken, and the `None` is returned. Should no `None` occur,
13+
the sum of all elements is returned.
14+
15+
# Examples
16+
17+
This sums up the position of the character 'a' in a vector of strings,
18+
if a word did not have the character 'a' the operation returns `None`:
19+
20+
```
21+
# fn main() { async_std::task::block_on(async {
22+
#
23+
use std::collections::VecDeque;
24+
use async_std::prelude::*;
25+
26+
let words: VecDeque<_> = vec!["have", "a", "great", "day"]
27+
.into_iter()
28+
.collect();
29+
let total: Option<usize> = words.map(|w| w.find('a')).sum().await;
30+
assert_eq!(total, Some(5));
31+
#
32+
# }) }
33+
```
34+
"#]
35+
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
36+
where S: Stream<Item = Option<U>> + 'a
37+
{
38+
Box::pin(async move {
39+
pin_utils::pin_mut!(stream);
40+
41+
// Using `scan` here because it is able to stop the stream early
42+
// if a failure occurs
43+
let mut found_none = false;
44+
let out = <T as Sum<U>>::sum(stream
45+
.scan((), |_, elem| {
46+
match elem {
47+
Some(elem) => Some(elem),
48+
None => {
49+
found_none = true;
50+
// Stop processing the stream on error
51+
None
52+
}
53+
}
54+
})).await;
55+
56+
if found_none {
57+
None
58+
} else {
59+
Some(out)
60+
}
61+
})
62+
}
63+
}

src/result/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,8 @@ mod from_stream;
77

88
#[doc(inline)]
99
pub use std::result::Result;
10+
11+
cfg_unstable! {
12+
mod product;
13+
mod sum;
14+
}

src/result/product.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use std::pin::Pin;
2+
3+
use crate::prelude::*;
4+
use crate::stream::{Stream, Product};
5+
6+
impl<T, U, E> Product<Result<U, E>> for Result<T, E>
7+
where
8+
T: Product<U>,
9+
{
10+
#[doc = r#"
11+
Takes each element in the `Stream`: if it is an `Err`, no further
12+
elements are taken, and the `Err` is returned. Should no `Err` occur,
13+
the product of all elements is returned.
14+
15+
# Examples
16+
17+
This multiplies every integer in a vector, rejecting the product if a negative element is
18+
encountered:
19+
20+
```
21+
# fn main() { async_std::task::block_on(async {
22+
#
23+
use std::collections::VecDeque;
24+
use async_std::prelude::*;
25+
26+
let v: VecDeque<_> = vec![1, 2, 4].into_iter().collect();
27+
let res: Result<i32, &'static str> = v.map(|x|
28+
if x < 0 {
29+
Err("Negative element found")
30+
} else {
31+
Ok(x)
32+
}).product().await;
33+
assert_eq!(res, Ok(8));
34+
#
35+
# }) }
36+
```
37+
"#]
38+
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
39+
where S: Stream<Item = Result<U, E>> + 'a
40+
{
41+
Box::pin(async move {
42+
pin_utils::pin_mut!(stream);
43+
44+
// Using `scan` here because it is able to stop the stream early
45+
// if a failure occurs
46+
let mut found_error = None;
47+
let out = <T as Product<U>>::product(stream
48+
.scan((), |_, elem| {
49+
match elem {
50+
Ok(elem) => Some(elem),
51+
Err(err) => {
52+
found_error = Some(err);
53+
// Stop processing the stream on error
54+
None
55+
}
56+
}
57+
})).await;
58+
match found_error {
59+
Some(err) => Err(err),
60+
None => Ok(out)
61+
}
62+
})
63+
}
64+
}

src/result/sum.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use std::pin::Pin;
2+
3+
use crate::prelude::*;
4+
use crate::stream::{Stream, Sum};
5+
6+
impl<T, U, E> Sum<Result<U, E>> for Result<T, E>
7+
where
8+
T: Sum<U>,
9+
{
10+
#[doc = r#"
11+
Takes each element in the `Stream`: if it is an `Err`, no further
12+
elements are taken, and the `Err` is returned. Should no `Err` occur,
13+
the sum of all elements is returned.
14+
15+
# Examples
16+
17+
This sums up every integer in a vector, rejecting the sum if a negative
18+
element is encountered:
19+
20+
```
21+
# fn main() { async_std::task::block_on(async {
22+
#
23+
use std::collections::VecDeque;
24+
use async_std::prelude::*;
25+
26+
let v: VecDeque<_> = vec![1, 2].into_iter().collect();
27+
let res: Result<i32, &'static str> = v.map(|x|
28+
if x < 0 {
29+
Err("Negative element found")
30+
} else {
31+
Ok(x)
32+
}).sum().await;
33+
assert_eq!(res, Ok(3));
34+
#
35+
# }) }
36+
```
37+
"#]
38+
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
39+
where S: Stream<Item = Result<U, E>> + 'a
40+
{
41+
Box::pin(async move {
42+
pin_utils::pin_mut!(stream);
43+
44+
// Using `scan` here because it is able to stop the stream early
45+
// if a failure occurs
46+
let mut found_error = None;
47+
let out = <T as Sum<U>>::sum(stream
48+
.scan((), |_, elem| {
49+
match elem {
50+
Ok(elem) => Some(elem),
51+
Err(err) => {
52+
found_error = Some(err);
53+
// Stop processing the stream on error
54+
None
55+
}
56+
}
57+
})).await;
58+
match found_error {
59+
Some(err) => Err(err),
60+
None => Ok(out)
61+
}
62+
})
63+
}
64+
}

src/stream/product.rs

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use std::pin::Pin;
2+
13
use crate::future::Future;
24
use crate::stream::Stream;
35

4-
/// Trait to represent types that can be created by productming up a stream.
6+
/// Trait to represent types that can be created by multiplying the elements of a stream.
57
///
68
/// This trait is used to implement the [`product`] method on streams. Types which
79
/// implement the trait can be generated by the [`product`] method. Like
@@ -16,8 +18,62 @@ use crate::stream::Stream;
1618
pub trait Product<A = Self>: Sized {
1719
/// Method which takes a stream and generates `Self` from the elements by
1820
/// multiplying the items.
19-
fn product<S, F>(stream: S) -> F
21+
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'a>>
2022
where
21-
S: Stream<Item = A>,
22-
F: Future<Output = Self>;
23+
S: Stream<Item = A> + 'a;
24+
}
25+
26+
use core::ops::Mul;
27+
use core::num::Wrapping;
28+
use crate::stream::stream::StreamExt;
29+
30+
macro_rules! integer_product {
31+
(@impls $one: expr, $($a:ty)*) => ($(
32+
impl Product for $a {
33+
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'a>>
34+
where
35+
S: Stream<Item = $a> + 'a,
36+
{
37+
Box::pin(async move { stream.fold($one, Mul::mul).await } )
38+
}
39+
}
40+
impl<'a> Product<&'a $a> for $a {
41+
fn product<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'b>>
42+
where
43+
S: Stream<Item = &'a $a> + 'b,
44+
{
45+
Box::pin(async move { stream.fold($one, Mul::mul).await } )
46+
}
47+
}
48+
)*);
49+
($($a:ty)*) => (
50+
integer_product!(@impls 1, $($a)*);
51+
integer_product!(@impls Wrapping(1), $(Wrapping<$a>)*);
52+
);
2353
}
54+
55+
macro_rules! float_product {
56+
($($a:ty)*) => ($(
57+
impl Product for $a {
58+
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'a>>
59+
where S: Stream<Item = $a> + 'a,
60+
{
61+
Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } )
62+
}
63+
}
64+
impl<'a> Product<&'a $a> for $a {
65+
fn product<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'b>>
66+
where S: Stream<Item = &'a $a> + 'b,
67+
{
68+
Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } )
69+
}
70+
}
71+
)*);
72+
($($a:ty)*) => (
73+
float_product!($($a)*);
74+
float_product!($(Wrapping<$a>)*);
75+
);
76+
}
77+
78+
integer_product!{ i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize }
79+
float_product!{ f32 f64 }

0 commit comments

Comments
 (0)