Skip to content

Commit fe45ba5

Browse files
author
Shady Khalifa
committed
update docs and examples
1 parent 3b80165 commit fe45ba5

File tree

1 file changed

+26
-23
lines changed

1 file changed

+26
-23
lines changed

src/stream/stream.rs

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -136,34 +136,31 @@ pub trait Stream {
136136
/// use async_std::prelude::*;
137137
/// use async_std::stream;
138138
///
139-
/// let mut s = stream::repeat(9).take(3);
139+
/// let mut s = stream::repeat::<u32>(42).take(3);
140+
/// assert!(s.all(|x| x == 42).await);
140141
///
141-
/// while let Some(v) = s.next().await {
142-
/// assert_eq!(v, 9);
143-
/// }
144142
/// #
145143
/// # }) }
146144
/// ```
147-
/// ```
148-
/// let a = [1, 2, 3];
149145
///
150-
/// assert!(a.iter().all(|&x| x > 0));
146+
/// Empty stream:
151147
///
152-
/// assert!(!a.iter().all(|&x| x > 2));
153148
/// ```
149+
/// # fn main() { async_std::task::block_on(async {
150+
/// #
151+
/// use async_std::prelude::*;
152+
/// use async_std::stream;
154153
///
155-
/// Stopping at the first `false`:
154+
/// let mut s = stream::empty::<u32>();
155+
/// assert!(s.all(|_| false).await);
156156
///
157+
/// #
158+
/// # }) }
157159
/// ```
158-
/// let a = [1, 2, 3];
159-
///
160-
/// let mut iter = a.iter();
161-
///
162-
/// assert!(!iter.all(|&x| x != 2));
160+
/// Stopping at the first `false`:
163161
///
164-
/// // we can still use `iter`, as there are more elements.
165-
/// assert_eq!(iter.next(), Some(&3));
166-
/// ```
162+
/// TODO: add example here
163+
/// TODO: add more examples
167164
#[inline]
168165
fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F, Self::Item>
169166
where
@@ -235,9 +232,9 @@ impl<S: futures::Stream> futures::Stream for Take<S> {
235232
}
236233
}
237234

235+
#[derive(Debug)]
238236
pub struct AllFuture<'a, S, F, T>
239237
where
240-
S: ?Sized,
241238
F: FnMut(T) -> bool,
242239
{
243240
stream: &'a mut S,
@@ -248,7 +245,6 @@ where
248245

249246
impl<'a, S, F, T> AllFuture<'a, S, F, T>
250247
where
251-
S: ?Sized,
252248
F: FnMut(T) -> bool,
253249
{
254250
pin_utils::unsafe_pinned!(stream: &'a mut S);
@@ -258,8 +254,9 @@ where
258254

259255
impl<S, F> Future for AllFuture<'_, S, F, S::Item>
260256
where
261-
S: futures::Stream + Unpin + ?Sized,
257+
S: futures::Stream + Unpin + Sized,
262258
F: FnMut(S::Item) -> bool,
259+
S::Item: std::fmt::Debug,
263260
{
264261
type Output = bool;
265262

@@ -268,9 +265,15 @@ where
268265
let next = futures::ready!(self.as_mut().stream().poll_next(cx));
269266
match next {
270267
Some(v) => {
271-
// me: *screams*
272-
*self.as_mut().result() = (self.as_mut().f())(v);
273-
Poll::Pending
268+
let result = (self.as_mut().f())(v);
269+
*self.as_mut().result() = result;
270+
if result {
271+
// don't forget to wake this task again to pull the next item from stream
272+
cx.waker().wake_by_ref();
273+
Poll::Pending
274+
} else {
275+
Poll::Ready(false)
276+
}
274277
}
275278
None => Poll::Ready(self.result),
276279
}

0 commit comments

Comments
 (0)