Skip to content

Commit 8423d6d

Browse files
committed
implement Stream for Responses
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent aefa11c commit 8423d6d

File tree

1 file changed

+13
-1
lines changed

1 file changed

+13
-1
lines changed

tokio-postgres/src/client.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@ use crate::{
1717
use bytes::{Buf, BytesMut};
1818
use fallible_iterator::FallibleIterator;
1919
use futures::channel::mpsc;
20-
use futures::{future, pin_mut, ready, StreamExt, TryStreamExt};
20+
use futures::{future, pin_mut, ready, Stream, StreamExt, TryStreamExt};
2121
use parking_lot::Mutex;
2222
use postgres_protocol::message::backend::Message;
2323
use postgres_types::BorrowToSql;
2424
use std::collections::HashMap;
2525
use std::fmt;
26+
use std::pin::Pin;
2627
use std::sync::Arc;
2728
use std::task::{Context, Poll};
2829
use std::time::Duration;
@@ -54,6 +55,17 @@ impl Responses {
5455
}
5556
}
5657

58+
impl Stream for Responses {
59+
type Item = Result<Message, Error>;
60+
61+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62+
match ready!((*self).poll_next(cx)) {
63+
Err(err) if err.is_closed() => Poll::Ready(None),
64+
msg => Poll::Ready(Some(msg)),
65+
}
66+
}
67+
}
68+
5769
/// A cache of type info and prepared statements for fetching type info
5870
/// (corresponding to the queries in the [prepare](prepare) module).
5971
#[derive(Default)]

0 commit comments

Comments
 (0)