Skip to content

Commit b94bde3

Browse files
Fix/pubsub ping mutex (#1878)
* Fix PubSub.Ping to hold the lock * Fix PubSub.Ping to hold the lock * add write cmd data-race test Signed-off-by: monkey92t <golang@88.com> Co-authored-by: monkey92t <golang@88.com>
1 parent 917c476 commit b94bde3

File tree

2 files changed

+20
-2
lines changed

2 files changed

+20
-2
lines changed

pubsub.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,13 +252,16 @@ func (c *PubSub) Ping(ctx context.Context, payload ...string) error {
252252
}
253253
cmd := NewCmd(ctx, args...)
254254

255-
cn, err := c.connWithLock(ctx)
255+
c.mu.Lock()
256+
defer c.mu.Unlock()
257+
258+
cn, err := c.conn(ctx, nil)
256259
if err != nil {
257260
return err
258261
}
259262

260263
err = c.writeCmd(ctx, cn, cmd)
261-
c.releaseConnWithLock(ctx, cn, err, false)
264+
c.releaseConn(ctx, cn, err, false)
262265
return err
263266
}
264267

@@ -361,6 +364,8 @@ func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (int
361364
c.cmd = NewCmd(ctx)
362365
}
363366

367+
// Don't hold the lock to allow subscriptions and pings.
368+
364369
cn, err := c.connWithLock(ctx)
365370
if err != nil {
366371
return nil, err
@@ -371,6 +376,7 @@ func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (int
371376
})
372377

373378
c.releaseConnWithLock(ctx, cn, err, timeout > 0)
379+
374380
if err != nil {
375381
return nil, err
376382
}

race_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,18 @@ var _ = Describe("cluster races", func() {
373373
Expect(err).NotTo(HaveOccurred())
374374
Expect(val).To(Equal(int64(C * N)))
375375
})
376+
377+
It("write cmd data-race", func() {
378+
pubsub := client.Subscribe(ctx)
379+
defer pubsub.Close()
380+
381+
pubsub.Channel(redis.WithChannelHealthCheckInterval(time.Millisecond))
382+
for i := 0; i < 100; i++ {
383+
key := fmt.Sprintf("channel_%d", i)
384+
pubsub.Subscribe(ctx, key)
385+
pubsub.Unsubscribe(ctx, key)
386+
}
387+
})
376388
})
377389

378390
func bigVal() []byte {

0 commit comments

Comments
 (0)