1
1
import asyncio
2
+ import contextlib
2
3
3
4
import pytest
4
5
@@ -17,7 +18,7 @@ def redis_addr(request):
17
18
18
19
19
20
class DelayProxy :
20
- def __init__ (self , addr , redis_addr , delay : float ):
21
+ def __init__ (self , addr , redis_addr , delay : float = 0.0 ):
21
22
self .addr = addr
22
23
self .redis_addr = redis_addr
23
24
self .delay = delay
@@ -31,6 +32,19 @@ async def start(self):
31
32
self .server = await asyncio .start_server (self .handle , * self .addr )
32
33
self .ROUTINE = asyncio .create_task (self .server .serve_forever ())
33
34
35
+ @contextlib .contextmanager
36
+ def set_delay (self , delay : float = 0.0 ):
37
+ """
38
+ Allow to override the delay for parts of tests which aren't time dependent,
39
+ to speed up execution.
40
+ """
41
+ old = self .delay
42
+ self .delay = delay
43
+ try :
44
+ yield
45
+ finally :
46
+ self .delay = old
47
+
34
48
async def handle (self , reader , writer ):
35
49
# establish connection to redis
36
50
redis_reader , redis_writer = await asyncio .open_connection (* self .redis_addr )
@@ -70,7 +84,7 @@ async def test_standalone(delay, redis_addr):
70
84
71
85
# create a tcp socket proxy that relays data to Redis and back,
72
86
# inserting 0.1 seconds of delay
73
- dp = DelayProxy (addr = ("127.0.0.1" , 5380 ), redis_addr = redis_addr , delay = delay * 2 )
87
+ dp = DelayProxy (addr = ("127.0.0.1" , 5380 ), redis_addr = redis_addr )
74
88
await dp .start ()
75
89
76
90
for b in [True , False ]:
@@ -80,8 +94,14 @@ async def test_standalone(delay, redis_addr):
80
94
await r .set ("foo" , "foo" )
81
95
await r .set ("bar" , "bar" )
82
96
97
+ async def op (r ):
98
+ with dp .set_delay (delay * 2 ):
99
+ return await r .get (
100
+ "foo"
101
+ ) # <-- this is the operation we want to cancel
102
+
83
103
dp .send_event .clear ()
84
- t = asyncio .create_task (r . get ( "foo" ))
104
+ t = asyncio .create_task (op ( r ))
85
105
# Wait until the task has sent, and then some, to make sure it has
86
106
# settled on the read.
87
107
await dp .send_event .wait ()
@@ -116,8 +136,14 @@ async def test_standalone_pipeline(delay, redis_addr):
116
136
pipe2 .ping ()
117
137
pipe2 .get ("foo" )
118
138
139
+ async def op (pipe ):
140
+ with dp .set_delay (delay * 2 ):
141
+ return await pipe .get (
142
+ "foo"
143
+ ).execute () # <-- this is the operation we want to cancel
144
+
119
145
dp .send_event .clear ()
120
- t = asyncio .create_task (pipe . get ( "foo" ). execute ( ))
146
+ t = asyncio .create_task (op ( pipe ))
121
147
# wait until task has settled on the read
122
148
await dp .send_event .wait ()
123
149
await asyncio .sleep (0.01 )
@@ -149,16 +175,21 @@ async def test_standalone_pipeline(delay, redis_addr):
149
175
async def test_cluster (request , redis_addr ):
150
176
151
177
redis_addr = redis_addr [0 ], 6372 # use the cluster port
152
- dp = DelayProxy (addr = ("127.0.0.1" , 5381 ), redis_addr = redis_addr , delay = 0.1 )
178
+ delay = 0.1
179
+ dp = DelayProxy (addr = ("127.0.0.1" , 5381 ), redis_addr = redis_addr )
153
180
await dp .start ()
154
181
155
182
r = RedisCluster .from_url ("redis://127.0.0.1:5381" )
156
183
await r .initialize ()
157
184
await r .set ("foo" , "foo" )
158
185
await r .set ("bar" , "bar" )
159
186
187
+ async def op (r ):
188
+ with dp .set_delay (delay ):
189
+ return await r .get ("foo" )
190
+
160
191
dp .send_event .clear ()
161
- t = asyncio .create_task (r . get ( "foo" ))
192
+ t = asyncio .create_task (op ( r ))
162
193
await dp .send_event .wait ()
163
194
await asyncio .sleep (0.01 )
164
195
t .cancel ()
0 commit comments