@@ -38,25 +38,33 @@ def set_delay(self, delay: float = 0.0):
38
38
Allow to override the delay for parts of tests which aren't time dependent,
39
39
to speed up execution.
40
40
"""
41
- old = self .delay
41
+ old_delay = self .delay
42
42
self .delay = delay
43
43
try :
44
44
yield
45
45
finally :
46
- self .delay = old
46
+ self .delay = old_delay
47
47
48
48
async def handle (self , reader , writer ):
49
49
# establish connection to redis
50
50
redis_reader , redis_writer = await asyncio .open_connection (* self .redis_addr )
51
- pipe1 = asyncio .create_task (
52
- self .pipe (reader , redis_writer , "to redis:" , self .send_event )
53
- )
54
- pipe2 = asyncio .create_task (self .pipe (redis_reader , writer , "from redis:" ))
55
- await asyncio .gather (pipe1 , pipe2 )
51
+ try :
52
+ pipe1 = asyncio .create_task (
53
+ self .pipe (reader , redis_writer , "to redis:" , self .send_event )
54
+ )
55
+ pipe2 = asyncio .create_task (self .pipe (redis_reader , writer , "from redis:" ))
56
+ await asyncio .gather (pipe1 , pipe2 )
57
+ finally :
58
+ redis_writer .close ()
59
+ redis_reader .close ()
56
60
57
61
async def stop (self ):
58
62
# clean up enough so that we can reuse the looper
59
63
self .ROUTINE .cancel ()
64
+ try :
65
+ await self .ROUTINE
66
+ except asyncio .CancelledError :
67
+ pass
60
68
loop = self .server .get_loop ()
61
69
await loop .shutdown_asyncgens ()
62
70
@@ -179,25 +187,25 @@ async def test_cluster(request, redis_addr):
179
187
dp = DelayProxy (addr = ("127.0.0.1" , 5381 ), redis_addr = redis_addr )
180
188
await dp .start ()
181
189
182
- r = RedisCluster .from_url ("redis://127.0.0.1:5381" )
183
- await r .initialize ()
184
- await r .set ("foo" , "foo" )
185
- await r .set ("bar" , "bar" )
186
-
187
- async def op (r ):
188
- with dp .set_delay (delay ):
189
- return await r .get ("foo" )
190
-
191
- dp .send_event .clear ()
192
- t = asyncio .create_task (op (r ))
193
- await dp .send_event .wait ()
194
- await asyncio .sleep (0.01 )
195
- t .cancel ()
196
- with pytest .raises (asyncio .CancelledError ):
197
- await t
198
-
199
- assert await r .get ("bar" ) == b"bar"
200
- assert await r .ping ()
201
- assert await r .get ("foo" ) == b"foo"
190
+ with contextlib . closing ( RedisCluster .from_url ("redis://127.0.0.1:5381" )) as r :
191
+ await r .initialize ()
192
+ await r .set ("foo" , "foo" )
193
+ await r .set ("bar" , "bar" )
194
+
195
+ async def op (r ):
196
+ with dp .set_delay (delay ):
197
+ return await r .get ("foo" )
198
+
199
+ dp .send_event .clear ()
200
+ t = asyncio .create_task (op (r ))
201
+ await dp .send_event .wait ()
202
+ await asyncio .sleep (0.01 )
203
+ t .cancel ()
204
+ with pytest .raises (asyncio .CancelledError ):
205
+ await t
206
+
207
+ assert await r .get ("bar" ) == b"bar"
208
+ assert await r .ping ()
209
+ assert await r .get ("foo" ) == b"foo"
202
210
203
211
await dp .stop ()
0 commit comments