Skip to content

Commit e7ecfce

Browse files
committed
implement Stream for Responses
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent fbc9944 commit e7ecfce

File tree

1 file changed

+13
-1
lines changed

1 file changed

+13
-1
lines changed

tokio-postgres/src/client.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::{
1919
use bytes::{Buf, BytesMut};
2020
use fallible_iterator::FallibleIterator;
2121
use futures_channel::mpsc;
22-
use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt};
22+
use futures_util::{future, pin_mut, ready, Stream, StreamExt, TryStreamExt};
2323
use parking_lot::Mutex;
2424
use postgres_protocol::message::backend::Message;
2525
use postgres_types::BorrowToSql;
@@ -29,6 +29,7 @@ use std::fmt;
2929
use std::net::IpAddr;
3030
#[cfg(feature = "runtime")]
3131
use std::path::PathBuf;
32+
use std::pin::Pin;
3233
use std::sync::Arc;
3334
use std::task::{Context, Poll};
3435
#[cfg(feature = "runtime")]
@@ -61,6 +62,17 @@ impl Responses {
6162
}
6263
}
6364

65+
impl Stream for Responses {
66+
type Item = Result<Message, Error>;
67+
68+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
69+
match ready!((*self).poll_next(cx)) {
70+
Err(err) if err.is_closed() => Poll::Ready(None),
71+
msg => Poll::Ready(Some(msg)),
72+
}
73+
}
74+
}
75+
6476
/// A cache of type info and prepared statements for fetching type info
6577
/// (corresponding to the queries in the [prepare](prepare) module).
6678
#[derive(Default)]

0 commit comments

Comments
 (0)