@@ -17,12 +17,13 @@ use crate::{
17
17
use bytes:: { Buf , BytesMut } ;
18
18
use fallible_iterator:: FallibleIterator ;
19
19
use futures:: channel:: mpsc;
20
- use futures:: { future, pin_mut, ready, StreamExt , TryStreamExt } ;
20
+ use futures:: { future, pin_mut, ready, Stream , StreamExt , TryStreamExt } ;
21
21
use parking_lot:: Mutex ;
22
22
use postgres_protocol:: message:: backend:: Message ;
23
23
use postgres_types:: BorrowToSql ;
24
24
use std:: collections:: HashMap ;
25
25
use std:: fmt;
26
+ use std:: pin:: Pin ;
26
27
use std:: sync:: Arc ;
27
28
use std:: task:: { Context , Poll } ;
28
29
use std:: time:: Duration ;
@@ -54,6 +55,17 @@ impl Responses {
54
55
}
55
56
}
56
57
58
+ impl Stream for Responses {
59
+ type Item = Result < Message , Error > ;
60
+
61
+ fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
62
+ match ready ! ( ( * self ) . poll_next( cx) ) {
63
+ Err ( err) if err. is_closed ( ) => Poll :: Ready ( None ) ,
64
+ msg => Poll :: Ready ( Some ( msg) ) ,
65
+ }
66
+ }
67
+ }
68
+
57
69
/// A cache of type info and prepared statements for fetching type info
58
70
/// (corresponding to the queries in the [prepare](prepare) module).
59
71
#[ derive( Default ) ]
0 commit comments