1
1
import asyncio
2
- import sys
3
2
4
3
import pytest
5
4
@@ -17,23 +16,12 @@ def redis_addr(request):
17
16
return host , int (port )
18
17
19
18
20
- async def pipe (
21
- reader : asyncio .StreamReader , writer : asyncio .StreamWriter , delay : float , name = ""
22
- ):
23
- while True :
24
- data = await reader .read (1000 )
25
- if not data :
26
- break
27
- await asyncio .sleep (delay )
28
- writer .write (data )
29
- await writer .drain ()
30
-
31
-
32
19
class DelayProxy :
33
20
def __init__ (self , addr , redis_addr , delay : float ):
34
21
self .addr = addr
35
22
self .redis_addr = redis_addr
36
23
self .delay = delay
24
+ self .send_event = asyncio .Event ()
37
25
38
26
async def start (self ):
39
27
# test that we can connect to redis
@@ -46,10 +34,10 @@ async def start(self):
46
34
async def handle (self , reader , writer ):
47
35
# establish connection to redis
48
36
redis_reader , redis_writer = await asyncio .open_connection (* self .redis_addr )
49
- pipe1 = asyncio .create_task (pipe (reader , redis_writer , self .delay , "to redis:" ))
50
- pipe2 = asyncio .create_task (
51
- pipe (redis_reader , writer , self .delay , "from redis:" )
37
+ pipe1 = asyncio .create_task (
38
+ self .pipe (reader , redis_writer , "to redis:" , self .send_event )
52
39
)
40
+ pipe2 = asyncio .create_task (self .pipe (redis_reader , writer , "from redis:" ))
53
41
await asyncio .gather (pipe1 , pipe2 )
54
42
55
43
async def stop (self ):
@@ -58,6 +46,23 @@ async def stop(self):
58
46
loop = self .server .get_loop ()
59
47
await loop .shutdown_asyncgens ()
60
48
49
+ async def pipe (
50
+ self ,
51
+ reader : asyncio .StreamReader ,
52
+ writer : asyncio .StreamWriter ,
53
+ name = "" ,
54
+ event : asyncio .Event = None ,
55
+ ):
56
+ while True :
57
+ data = await reader .read (1000 )
58
+ if not data :
59
+ break
60
+ if event :
61
+ event .set ()
62
+ await asyncio .sleep (self .delay )
63
+ writer .write (data )
64
+ await writer .drain ()
65
+
61
66
62
67
@pytest .mark .onlynoncluster
63
68
@pytest .mark .parametrize ("delay" , argvalues = [0.05 , 0.5 , 1 , 2 ])
@@ -75,17 +80,18 @@ async def test_standalone(delay, redis_addr):
75
80
await r .set ("foo" , "foo" )
76
81
await r .set ("bar" , "bar" )
77
82
83
+ dp .send_event .clear ()
78
84
t = asyncio .create_task (r .get ("foo" ))
79
- await asyncio .sleep (delay )
85
+ # Wait until the task has sent, and then some, to make sure it has
86
+ # settled on the read.
87
+ await dp .send_event .wait ()
88
+ await asyncio .sleep (0.01 ) # a little extra time for prudence
80
89
t .cancel ()
81
- try :
90
+ with pytest . raises ( asyncio . CancelledError ) :
82
91
await t
83
- sys .stderr .write ("try again, we did not cancel the task in time\n " )
84
- except asyncio .CancelledError :
85
- sys .stderr .write (
86
- "canceled task, connection is left open with unread response\n "
87
- )
88
92
93
+ # make sure that our previous request, cancelled while waiting for
94
+ # a repsponse, didn't leave the connection open andin a bad state
89
95
assert await r .get ("bar" ) == b"bar"
90
96
assert await r .ping ()
91
97
assert await r .get ("foo" ) == b"foo"
@@ -110,10 +116,17 @@ async def test_standalone_pipeline(delay, redis_addr):
110
116
pipe2 .ping ()
111
117
pipe2 .get ("foo" )
112
118
119
+ dp .send_event .clear ()
113
120
t = asyncio .create_task (pipe .get ("foo" ).execute ())
114
- await asyncio .sleep (delay )
121
+ # wait until task has settled on the read
122
+ await dp .send_event .wait ()
123
+ await asyncio .sleep (0.01 )
115
124
t .cancel ()
125
+ with pytest .raises (asyncio .CancelledError ):
126
+ await t
116
127
128
+ # we have now cancelled the pieline in the middle of a request, make sure
129
+ # that the connection is still usable
117
130
pipe .get ("bar" )
118
131
pipe .ping ()
119
132
pipe .get ("foo" )
@@ -144,13 +157,13 @@ async def test_cluster(request, redis_addr):
144
157
await r .set ("foo" , "foo" )
145
158
await r .set ("bar" , "bar" )
146
159
160
+ dp .send_event .clear ()
147
161
t = asyncio .create_task (r .get ("foo" ))
148
- await asyncio .sleep (0.050 )
162
+ await dp .send_event .wait ()
163
+ await asyncio .sleep (0.01 )
149
164
t .cancel ()
150
- try :
165
+ with pytest . raises ( asyncio . CancelledError ) :
151
166
await t
152
- except asyncio .CancelledError :
153
- pytest .fail ("connection is left open with unread response" )
154
167
155
168
assert await r .get ("bar" ) == b"bar"
156
169
assert await r .ping ()
0 commit comments