@@ -4,22 +4,24 @@ use std::pin::Pin;
4
4
use std:: time:: Duration ;
5
5
6
6
use futures_timer:: Delay ;
7
+ use pin_project_lite:: pin_project;
7
8
8
9
use crate :: future:: Future ;
9
10
use crate :: stream:: Stream ;
10
11
use crate :: task:: { Context , Poll } ;
11
12
12
- /// A stream with timeout time set
13
- #[ derive( Debug ) ]
14
- pub struct Timeout < S : Stream > {
15
- stream : S ,
16
- delay : Delay ,
13
+ pin_project ! {
14
+ /// A stream with timeout time set
15
+ #[ derive( Debug ) ]
16
+ pub struct Timeout <S : Stream > {
17
+ #[ pin]
18
+ stream: S ,
19
+ #[ pin]
20
+ delay: Delay ,
21
+ }
17
22
}
18
23
19
24
impl < S : Stream > Timeout < S > {
20
- pin_utils:: unsafe_pinned!( stream: S ) ;
21
- pin_utils:: unsafe_pinned!( delay: Delay ) ;
22
-
23
25
pub fn new ( stream : S , dur : Duration ) -> Timeout < S > {
24
26
let delay = Delay :: new ( dur) ;
25
27
@@ -30,11 +32,13 @@ impl<S: Stream> Timeout<S> {
30
32
impl < S : Stream > Stream for Timeout < S > {
31
33
type Item = Result < S :: Item , TimeoutError > ;
32
34
33
- fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
34
- match self . as_mut ( ) . stream ( ) . poll_next ( cx) {
35
+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
36
+ let this = self . project ( ) ;
37
+
38
+ match this. stream . poll_next ( cx) {
35
39
Poll :: Ready ( Some ( v) ) => Poll :: Ready ( Some ( Ok ( v) ) ) ,
36
40
Poll :: Ready ( None ) => Poll :: Ready ( None ) ,
37
- Poll :: Pending => match self . delay ( ) . poll ( cx) {
41
+ Poll :: Pending => match this . delay . poll ( cx) {
38
42
Poll :: Ready ( _) => Poll :: Ready ( Some ( Err ( TimeoutError { _private : ( ) } ) ) ) ,
39
43
Poll :: Pending => Poll :: Pending ,
40
44
} ,
0 commit comments