@@ -185,37 +185,67 @@ async def op(pipe):
185
185
@pytest .mark .onlycluster
186
186
async def test_cluster (request , redis_addr ):
187
187
188
- # TODO: This test actually doesn't work. Once the RedisCluster initializes,
189
- # it will re-connect to the nodes as advertised by the cluster, bypassing
190
- # the single DelayProxy we set up.
191
- # to work around this, we really would nedd a port-remapper for the RedisCluster
192
-
193
- redis_addr = redis_addr [0 ], 6372 # use the cluster port
194
188
delay = 0.1
195
- dp = DelayProxy (addr = ("127.0.0.1" , 5381 ), redis_addr = redis_addr )
196
- await dp .start ()
189
+ cluster_port = 6372
190
+ remap_base = 7372
191
+ n_nodes = 6
192
+
193
+ def remap (host , port ):
194
+ return host , remap_base + port - cluster_port
195
+
196
+ proxies = []
197
+ for i in range (n_nodes ):
198
+ port = cluster_port + i
199
+ remapped = remap_base + i
200
+ forward_addr = redis_addr [0 ], port
201
+ proxy = DelayProxy (addr = ("127.0.0.1" , remapped ), redis_addr = forward_addr )
202
+ proxies .append (proxy )
203
+
204
+ # start proxies
205
+ await asyncio .gather (* [p .start () for p in proxies ])
206
+
207
+ def all_clear ():
208
+ for p in proxies :
209
+ p .send_event .clear ()
210
+
211
+ async def wait_for_send ():
212
+ asyncio .wait (
213
+ [p .send_event .wait () for p in proxies ], return_when = asyncio .FIRST_COMPLETED
214
+ )
215
+
216
+ @contextlib .contextmanager
217
+ def set_delay (delay : float ):
218
+ with contextlib .ExitStack () as stack :
219
+ for p in proxies :
220
+ stack .enter_context (p .set_delay (delay ))
221
+ yield
197
222
198
- with contextlib .closing (RedisCluster .from_url ("redis://127.0.0.1:5381" )) as r :
223
+ with contextlib .closing (
224
+ RedisCluster .from_url (f"redis://127.0.0.1:{ remap_base } " , host_port_remap = remap )
225
+ ) as r :
199
226
await r .initialize ()
200
227
await r .set ("foo" , "foo" )
201
228
await r .set ("bar" , "bar" )
202
229
203
230
async def op (r ):
204
- with dp . set_delay (delay ):
231
+ with set_delay (delay ):
205
232
return await r .get ("foo" )
206
233
207
- dp . send_event . clear ()
234
+ all_clear ()
208
235
t = asyncio .create_task (op (r ))
209
- # await dp.send_event.wait() # won"t work, because DelayProxy is by-passed
236
+ # Wait for whichever DelayProxy gets the request first
237
+ await wait_for_send ()
210
238
await asyncio .sleep (0.01 )
211
239
t .cancel ()
212
- try :
240
+ with pytest . raises ( asyncio . CancelledError ) :
213
241
await t
214
- except asyncio .CancelledError :
215
- pass
216
242
217
- assert await r .get ("bar" ) == b"bar"
218
- assert await r .ping ()
219
- assert await r .get ("foo" ) == b"foo"
243
+ # try a number of requests to excercise all the connections
244
+ async def doit ():
245
+ assert await r .get ("bar" ) == b"bar"
246
+ assert await r .ping ()
247
+ assert await r .get ("foo" ) == b"foo"
220
248
221
- await dp .stop ()
249
+ await asyncio .gather (* [doit () for _ in range (10 )])
250
+
251
+ await asyncio .gather (* (p .stop () for p in proxies ))
0 commit comments