@@ -19,7 +19,7 @@ use crate::{
19
19
use bytes:: { Buf , BytesMut } ;
20
20
use fallible_iterator:: FallibleIterator ;
21
21
use futures_channel:: mpsc;
22
- use futures_util:: { future, pin_mut, ready, StreamExt , TryStreamExt } ;
22
+ use futures_util:: { future, pin_mut, ready, Stream , StreamExt , TryStreamExt } ;
23
23
use parking_lot:: Mutex ;
24
24
use postgres_protocol:: message:: backend:: Message ;
25
25
use postgres_types:: BorrowToSql ;
@@ -29,6 +29,7 @@ use std::fmt;
29
29
use std:: net:: IpAddr ;
30
30
#[ cfg( feature = "runtime" ) ]
31
31
use std:: path:: PathBuf ;
32
+ use std:: pin:: Pin ;
32
33
use std:: sync:: Arc ;
33
34
use std:: task:: { Context , Poll } ;
34
35
#[ cfg( feature = "runtime" ) ]
@@ -61,6 +62,17 @@ impl Responses {
61
62
}
62
63
}
63
64
65
+ impl Stream for Responses {
66
+ type Item = Result < Message , Error > ;
67
+
68
+ fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
69
+ match ready ! ( ( * self ) . poll_next( cx) ) {
70
+ Err ( err) if err. is_closed ( ) => Poll :: Ready ( None ) ,
71
+ msg => Poll :: Ready ( Some ( msg) ) ,
72
+ }
73
+ }
74
+ }
75
+
64
76
/// A cache of type info and prepared statements for fetching type info
65
77
/// (corresponding to the queries in the [prepare](prepare) module).
66
78
#[ derive( Default ) ]
0 commit comments