Skip to content

Add Stream::sum() and Stream::product() implementations #377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/option/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@ mod from_stream;

#[doc(inline)]
pub use std::option::Option;

cfg_unstable! {
mod product;
mod sum;
}
66 changes: 66 additions & 0 deletions src/option/product.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::pin::Pin;

use crate::prelude::*;
use crate::stream::{Stream, Product};

impl<T, U> Product<Option<U>> for Option<T>
where
T: Product<U>,
{
#[doc = r#"
Takes each element in the `Stream`: if it is a `None`, no further
elements are taken, and the `None` is returned. Should no `None` occur,
the product of all elements is returned.

# Examples

This multiplies every integer in a vector, rejecting the product if a negative element is
encountered:

```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;

let v: VecDeque<_> = vec![1, 2, 4].into_iter().collect();
let prod: Option<i32> = v.map(|x|
if x < 0 {
None
} else {
Some(x)
}).product().await;
assert_eq!(prod, Some(8));
#
# }) }
```
"#]
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
where S: Stream<Item = Option<U>> + 'a
{
Box::pin(async move {
pin_utils::pin_mut!(stream);

// Using `scan` here because it is able to stop the stream early
// if a failure occurs
let mut found_none = false;
let out = <T as Product<U>>::product(stream
.scan((), |_, elem| {
match elem {
Some(elem) => Some(elem),
None => {
found_none = true;
// Stop processing the stream on error
None
}
}
})).await;

if found_none {
None
} else {
Some(out)
}
})
}
}
63 changes: 63 additions & 0 deletions src/option/sum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::pin::Pin;

use crate::prelude::*;
use crate::stream::{Stream, Sum};

impl<T, U> Sum<Option<U>> for Option<T>
where
T: Sum<U>,
{
#[doc = r#"
Takes each element in the `Iterator`: if it is a `None`, no further
elements are taken, and the `None` is returned. Should no `None` occur,
the sum of all elements is returned.

# Examples

This sums up the position of the character 'a' in a vector of strings,
if a word did not have the character 'a' the operation returns `None`:

```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;

let words: VecDeque<_> = vec!["have", "a", "great", "day"]
.into_iter()
.collect();
let total: Option<usize> = words.map(|w| w.find('a')).sum().await;
assert_eq!(total, Some(5));
#
# }) }
```
"#]
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
where S: Stream<Item = Option<U>> + 'a
{
Box::pin(async move {
pin_utils::pin_mut!(stream);

// Using `scan` here because it is able to stop the stream early
// if a failure occurs
let mut found_none = false;
let out = <T as Sum<U>>::sum(stream
.scan((), |_, elem| {
match elem {
Some(elem) => Some(elem),
None => {
found_none = true;
// Stop processing the stream on error
None
}
}
})).await;

if found_none {
None
} else {
Some(out)
}
})
}
}
5 changes: 5 additions & 0 deletions src/result/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@ mod from_stream;

#[doc(inline)]
pub use std::result::Result;

cfg_unstable! {
mod product;
mod sum;
}
64 changes: 64 additions & 0 deletions src/result/product.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::pin::Pin;

use crate::prelude::*;
use crate::stream::{Stream, Product};

impl<T, U, E> Product<Result<U, E>> for Result<T, E>
where
T: Product<U>,
{
#[doc = r#"
Takes each element in the `Stream`: if it is an `Err`, no further
elements are taken, and the `Err` is returned. Should no `Err` occur,
the product of all elements is returned.

# Examples

This multiplies every integer in a vector, rejecting the product if a negative element is
encountered:

```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;

let v: VecDeque<_> = vec![1, 2, 4].into_iter().collect();
let res: Result<i32, &'static str> = v.map(|x|
if x < 0 {
Err("Negative element found")
} else {
Ok(x)
}).product().await;
assert_eq!(res, Ok(8));
#
# }) }
```
"#]
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
where S: Stream<Item = Result<U, E>> + 'a
{
Box::pin(async move {
pin_utils::pin_mut!(stream);

// Using `scan` here because it is able to stop the stream early
// if a failure occurs
let mut found_error = None;
let out = <T as Product<U>>::product(stream
.scan((), |_, elem| {
match elem {
Ok(elem) => Some(elem),
Err(err) => {
found_error = Some(err);
// Stop processing the stream on error
None
}
}
})).await;
match found_error {
Some(err) => Err(err),
None => Ok(out)
}
})
}
}
64 changes: 64 additions & 0 deletions src/result/sum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::pin::Pin;

use crate::prelude::*;
use crate::stream::{Stream, Sum};

impl<T, U, E> Sum<Result<U, E>> for Result<T, E>
where
T: Sum<U>,
{
#[doc = r#"
Takes each element in the `Stream`: if it is an `Err`, no further
elements are taken, and the `Err` is returned. Should no `Err` occur,
the sum of all elements is returned.

# Examples

This sums up every integer in a vector, rejecting the sum if a negative
element is encountered:

```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;

let v: VecDeque<_> = vec![1, 2].into_iter().collect();
let res: Result<i32, &'static str> = v.map(|x|
if x < 0 {
Err("Negative element found")
} else {
Ok(x)
}).sum().await;
assert_eq!(res, Ok(3));
#
# }) }
```
"#]
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
where S: Stream<Item = Result<U, E>> + 'a
{
Box::pin(async move {
pin_utils::pin_mut!(stream);

// Using `scan` here because it is able to stop the stream early
// if a failure occurs
let mut found_error = None;
let out = <T as Sum<U>>::sum(stream
.scan((), |_, elem| {
match elem {
Ok(elem) => Some(elem),
Err(err) => {
found_error = Some(err);
// Stop processing the stream on error
None
}
}
})).await;
match found_error {
Some(err) => Err(err),
None => Ok(out)
}
})
}
}
64 changes: 60 additions & 4 deletions src/stream/product.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::pin::Pin;

use crate::future::Future;
use crate::stream::Stream;

/// Trait to represent types that can be created by productming up a stream.
/// Trait to represent types that can be created by multiplying the elements of a stream.
///
/// This trait is used to implement the [`product`] method on streams. Types which
/// implement the trait can be generated by the [`product`] method. Like
Expand All @@ -16,8 +18,62 @@ use crate::stream::Stream;
pub trait Product<A = Self>: Sized {
/// Method which takes a stream and generates `Self` from the elements by
/// multiplying the items.
fn product<S, F>(stream: S) -> F
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'a>>
where
S: Stream<Item = A>,
F: Future<Output = Self>;
S: Stream<Item = A> + 'a;
}

use core::ops::Mul;
use core::num::Wrapping;
use crate::stream::stream::StreamExt;

macro_rules! integer_product {
(@impls $one: expr, $($a:ty)*) => ($(
impl Product for $a {
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'a>>
where
S: Stream<Item = $a> + 'a,
{
Box::pin(async move { stream.fold($one, Mul::mul).await } )
}
}
impl<'a> Product<&'a $a> for $a {
fn product<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'b>>
where
S: Stream<Item = &'a $a> + 'b,
{
Box::pin(async move { stream.fold($one, Mul::mul).await } )
}
}
)*);
($($a:ty)*) => (
integer_product!(@impls 1, $($a)*);
integer_product!(@impls Wrapping(1), $(Wrapping<$a>)*);
);
}

macro_rules! float_product {
($($a:ty)*) => ($(
impl Product for $a {
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'a>>
where S: Stream<Item = $a> + 'a,
{
Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } )
}
}
impl<'a> Product<&'a $a> for $a {
fn product<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'b>>
where S: Stream<Item = &'a $a> + 'b,
{
Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } )
}
}
)*);
($($a:ty)*) => (
float_product!($($a)*);
float_product!($(Wrapping<$a>)*);
);
}

integer_product!{ i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize }
float_product!{ f32 f64 }
Loading