Skip to content

Async combinators #262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions src/stream/stream/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,51 @@ use crate::task::{Context, Poll};

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct AllFuture<'a, S, F, T> {
pub struct AllFuture<'a, S, F, Fut, T> {
pub(crate) stream: &'a mut S,
pub(crate) f: F,
pub(crate) result: bool,
pub(crate) future: Option<Fut>,
pub(crate) _marker: PhantomData<T>,
}

impl<S: Unpin, F, T> Unpin for AllFuture<'_, S, F, T> {}
impl<'a, S, F, Fut, T> AllFuture<'a, S, F, Fut, T> {
pin_utils::unsafe_pinned!(stream: &'a mut S);
pin_utils::unsafe_unpinned!(f: F);
pin_utils::unsafe_pinned!(future: Option<Fut>);
}

impl<S, F> Future for AllFuture<'_, S, F, S::Item>
impl<S, F, Fut> Future for AllFuture<'_, S, F, Fut, S::Item>
where
S: Stream + Unpin + Sized,
F: FnMut(S::Item) -> bool,
F: FnMut(S::Item) -> Fut,
Fut: Future<Output = bool>,
{
type Output = bool;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));

match next {
Some(v) => {
let result = (&mut self.f)(v);
self.result = result;
loop {
match self.future.is_some() {
false => {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next {
Some(v) => {
let fut = (self.as_mut().f())(v);
self.as_mut().future().set(Some(fut));
}
None => return Poll::Ready(self.result),
}
}
true => {
let res =
futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));

if result {
// don't forget to wake this task again to pull the next item from stream
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(false)
self.as_mut().future().set(None);
if !res {
return Poll::Ready(false);
}
}
}
None => Poll::Ready(self.result),
}
}
}
46 changes: 29 additions & 17 deletions src/stream/stream/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,51 @@ use crate::task::{Context, Poll};

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct AnyFuture<'a, S, F, T> {
pub struct AnyFuture<'a, S, F, Fut, T> {
pub(crate) stream: &'a mut S,
pub(crate) f: F,
pub(crate) result: bool,
pub(crate) future: Option<Fut>,
pub(crate) _marker: PhantomData<T>,
}

impl<S: Unpin, F, T> Unpin for AnyFuture<'_, S, F, T> {}
impl<'a, S, F, Fut, T> AnyFuture<'a, S, F, Fut, T> {
pin_utils::unsafe_unpinned!(f: F);
pin_utils::unsafe_pinned!(future: Option<Fut>);
pin_utils::unsafe_pinned!(stream: &'a mut S);
}

impl<S, F> Future for AnyFuture<'_, S, F, S::Item>
impl<S, F, Fut> Future for AnyFuture<'_, S, F, Fut, S::Item>
where
S: Stream + Unpin + Sized,
F: FnMut(S::Item) -> bool,
F: FnMut(S::Item) -> Fut,
Fut: Future<Output = bool>,
{
type Output = bool;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));

match next {
Some(v) => {
let result = (&mut self.f)(v);
self.result = result;
loop {
match self.future.is_some() {
false => {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next {
Some(v) => {
let fut = (self.as_mut().f())(v);
self.as_mut().future().set(Some(fut));
}
None => return Poll::Ready(self.result),
}
}
true => {
let res =
futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));

if result {
Poll::Ready(true)
} else {
// don't forget to wake this task again to pull the next item from stream
cx.waker().wake_by_ref();
Poll::Pending
self.as_mut().future().set(None);
if res {
return Poll::Ready(true);
}
}
}
None => Poll::Ready(self.result),
}
}
}
51 changes: 38 additions & 13 deletions src/stream/stream/filter_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,74 @@ use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::future::Future;
use crate::stream::Stream;

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FilterMap<S, F, T, B> {
pub struct FilterMap<S, F, Fut, T, B> {
stream: S,
f: F,
future: Option<Fut>,
__from: PhantomData<T>,
__to: PhantomData<B>,
}

impl<S, F, T, B> FilterMap<S, F, T, B> {
impl<S, F, Fut, T, B> Unpin for FilterMap<S, F, Fut, T, B>
where
S: Unpin,
Fut: Unpin,
{
}

impl<S, F, Fut, T, B> FilterMap<S, F, Fut, T, B> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_pinned!(future: Option<Fut>);
pin_utils::unsafe_unpinned!(f: F);

pub(crate) fn new(stream: S, f: F) -> Self {
FilterMap {
stream,
f,
future: None,
__from: PhantomData,
__to: PhantomData,
}
}
}

impl<S, F, B> Stream for FilterMap<S, F, S::Item, B>
impl<S, F, Fut, B> Stream for FilterMap<S, F, Fut, S::Item, B>
where
S: Stream,
F: FnMut(S::Item) -> Option<B>,
F: FnMut(S::Item) -> Fut,
Fut: Future<Output = Option<B>>,
{
type Item = B;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next {
Some(v) => match (self.as_mut().f())(v) {
Some(b) => Poll::Ready(Some(b)),
None => {
cx.waker().wake_by_ref();
Poll::Pending
loop {
match self.future.is_some() {
false => {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next {
Some(v) => {
let fut = self.as_mut().f()(v);
self.as_mut().future().set(Some(fut));
}
None => return Poll::Ready(None),
}
}
true => {
let res =
futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));

self.as_mut().future().set(None);

if let Some(b) = res {
return Poll::Ready(Some(b));
}
}
},
None => Poll::Ready(None),
}
}
}
}
50 changes: 35 additions & 15 deletions src/stream/stream/find_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,65 @@ use crate::stream::Stream;

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FindMapFuture<'a, S, F, T, B> {
pub struct FindMapFuture<'a, S, F, Fut, T, B> {
stream: &'a mut S,
f: F,
future: Option<Fut>,
__b: PhantomData<B>,
__t: PhantomData<T>,
}

impl<'a, S, B, F, T> FindMapFuture<'a, S, F, T, B> {
impl<'a, S, F, Fut, T, B> FindMapFuture<'a, S, F, Fut, T, B> {
pin_utils::unsafe_pinned!(stream: &'a mut S);
pin_utils::unsafe_unpinned!(f: F);
pin_utils::unsafe_pinned!(future: Option<Fut>);

pub(super) fn new(stream: &'a mut S, f: F) -> Self {
FindMapFuture {
stream,
f,
future: None,
__b: PhantomData,
__t: PhantomData,
}
}
}

impl<S: Unpin, F, T, B> Unpin for FindMapFuture<'_, S, F, T, B> {}
impl<S: Unpin, F, Fut: Unpin, T, B> Unpin for FindMapFuture<'_, S, F, Fut, T, B> {}

impl<'a, S, B, F> Future for FindMapFuture<'a, S, F, S::Item, B>
impl<'a, S, B, F, Fut> Future for FindMapFuture<'a, S, F, Fut, S::Item, B>
where
S: Stream + Unpin + Sized,
F: FnMut(S::Item) -> Option<B>,
F: FnMut(S::Item) -> Fut,
Fut: Future<Output = Option<B>>,
{
type Output = Option<B>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));

match item {
Some(v) => match (&mut self.f)(v) {
Some(v) => Poll::Ready(Some(v)),
None => {
cx.waker().wake_by_ref();
Poll::Pending
loop {
match self.future.is_some() {
false => {
let item = futures_core::ready!(self.as_mut().stream().poll_next(cx));

match item {
Some(v) => {
let fut = (self.as_mut().f())(v);
self.as_mut().future().set(Some(fut));
}
None => return Poll::Ready(None),
}
}
true => {
let res =
futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
self.as_mut().future().set(None);

match res {
Some(v) => return Poll::Ready(Some(v)),
None => (),
}
}
},
None => Poll::Ready(None),
}
}
}
}
36 changes: 25 additions & 11 deletions src/stream/stream/fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,60 @@ use crate::task::{Context, Poll};

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FoldFuture<S, F, T, B> {
pub struct FoldFuture<S, F, Fut, T, B> {
stream: S,
f: F,
future: Option<Fut>,
acc: Option<B>,
__t: PhantomData<T>,
}

impl<S, F, T, B> FoldFuture<S, F, T, B> {
impl<S, F, Fut, T, B> FoldFuture<S, F, Fut, T, B> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(f: F);
pin_utils::unsafe_unpinned!(acc: Option<B>);
pin_utils::unsafe_pinned!(future: Option<Fut>);

pub(super) fn new(stream: S, init: B, f: F) -> Self {
FoldFuture {
stream,
f,
future: None,
acc: Some(init),
__t: PhantomData,
}
}
}

impl<S, F, B> Future for FoldFuture<S, F, S::Item, B>
impl<S, F, Fut, B> Future for FoldFuture<S, F, Fut, S::Item, B>
where
S: Stream + Sized,
F: FnMut(B, S::Item) -> B,
F: FnMut(B, S::Item) -> Fut,
Fut: Future<Output = B>,
{
type Output = B;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match self.future.is_some() {
false => {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));

match next {
Some(v) => {
let old = self.as_mut().acc().take().unwrap();
let new = (self.as_mut().f())(old, v);
*self.as_mut().acc() = Some(new);
match next {
Some(v) => {
let old = self.as_mut().acc().take().unwrap();
let fut = (self.as_mut().f())(old, v);
self.as_mut().future().set(Some(fut));
}
None => return Poll::Ready(self.as_mut().acc().take().unwrap()),
}
}
true => {
let res =
futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
self.as_mut().future().set(None);
*self.as_mut().acc() = Some(res);
}
None => return Poll::Ready(self.as_mut().acc().take().unwrap()),
}
}
}
Expand Down
Loading