Skip to content

Commit b7a3cfa

Browse files
committed
implement Stream for Responses
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent 44db213 commit b7a3cfa

File tree

1 file changed

+13
-1
lines changed

1 file changed

+13
-1
lines changed

tokio-postgres/src/client.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ use crate::{
2121
use bytes::{Buf, BytesMut};
2222
use fallible_iterator::FallibleIterator;
2323
use futures_channel::mpsc;
24-
use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt};
24+
use futures_util::{future, pin_mut, ready, Stream, StreamExt, TryStreamExt};
2525
use parking_lot::Mutex;
2626
use postgres_protocol::message::{backend::Message, frontend};
2727
use postgres_types::BorrowToSql;
2828
use std::collections::HashMap;
2929
use std::fmt;
30+
use std::pin::Pin;
3031
use std::sync::Arc;
3132
use std::task::{Context, Poll};
3233
#[cfg(feature = "runtime")]
@@ -59,6 +60,17 @@ impl Responses {
5960
}
6061
}
6162

63+
impl Stream for Responses {
64+
type Item = Result<Message, Error>;
65+
66+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67+
match ready!((*self).poll_next(cx)) {
68+
Err(err) if err.is_closed() => Poll::Ready(None),
69+
msg => Poll::Ready(Some(msg)),
70+
}
71+
}
72+
}
73+
6274
/// A cache of type info and prepared statements for fetching type info
6375
/// (corresponding to the queries in the [prepare](prepare) module).
6476
#[derive(Default)]

0 commit comments

Comments
 (0)