Skip to content

Commit 0d94a7b

Browse files
committed
Fix race in PubSub
1 parent fbc8000 commit 0d94a7b

File tree

4 files changed

+25
-51
lines changed

4 files changed

+25
-51
lines changed

internal/pool/pool_test.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -238,30 +238,4 @@ var _ = Describe("race", func() {
238238
}
239239
})
240240
})
241-
242-
It("does not happen on Get and PopFree", func() {
243-
connPool = pool.NewConnPool(
244-
&pool.Options{
245-
Dialer: dummyDialer,
246-
PoolSize: 10,
247-
PoolTimeout: time.Minute,
248-
IdleTimeout: time.Second,
249-
IdleCheckFrequency: time.Millisecond,
250-
})
251-
252-
perform(C, func(id int) {
253-
for i := 0; i < N; i++ {
254-
cn, _, err := connPool.Get()
255-
Expect(err).NotTo(HaveOccurred())
256-
if err == nil {
257-
Expect(connPool.Put(cn)).NotTo(HaveOccurred())
258-
}
259-
260-
cn = connPool.PopFree()
261-
if cn != nil {
262-
Expect(connPool.Put(cn)).NotTo(HaveOccurred())
263-
}
264-
}
265-
})
266-
})
267241
})

main_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package redis_test
33
import (
44
"errors"
55
"fmt"
6-
"log"
76
"net"
87
"os"
98
"os/exec"
@@ -52,7 +51,7 @@ var cluster = &clusterScenario{
5251
}
5352

5453
func init() {
55-
redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
54+
//redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
5655
}
5756

5857
var _ = BeforeSuite(func() {

pubsub.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,37 +28,41 @@ type PubSub struct {
2828
cmd *Cmd
2929
}
3030

31-
func (c *PubSub) conn() (*pool.Conn, bool, error) {
31+
func (c *PubSub) conn() (*pool.Conn, error) {
3232
c.mu.Lock()
33-
defer c.mu.Unlock()
33+
cn, err := c._conn()
34+
c.mu.Unlock()
35+
return cn, err
36+
}
3437

38+
func (c *PubSub) _conn() (*pool.Conn, error) {
3539
if c.closed {
36-
return nil, false, pool.ErrClosed
40+
return nil, pool.ErrClosed
3741
}
3842

3943
if c.cn != nil {
40-
return c.cn, false, nil
44+
return c.cn, nil
4145
}
4246

4347
cn, err := c.base.connPool.NewConn()
4448
if err != nil {
45-
return nil, false, err
49+
return nil, err
4650
}
4751

4852
if !cn.Inited {
4953
if err := c.base.initConn(cn); err != nil {
5054
_ = c.base.connPool.CloseConn(cn)
51-
return nil, false, err
55+
return nil, err
5256
}
5357
}
5458

5559
if err := c.resubscribe(cn); err != nil {
5660
_ = c.base.connPool.CloseConn(cn)
57-
return nil, false, err
61+
return nil, err
5862
}
5963

6064
c.cn = cn
61-
return cn, true, nil
65+
return cn, nil
6266
}
6367

6468
func (c *PubSub) resubscribe(cn *pool.Conn) error {
@@ -125,48 +129,48 @@ func (c *PubSub) Close() error {
125129
// empty subscription if there are no channels.
126130
func (c *PubSub) Subscribe(channels ...string) error {
127131
c.mu.Lock()
132+
err := c.subscribe("subscribe", channels...)
128133
c.channels = appendIfNotExists(c.channels, channels...)
129134
c.mu.Unlock()
130-
return c.subscribe("subscribe", channels...)
135+
return err
131136
}
132137

133138
// Subscribes the client to the given patterns. It returns
134139
// empty subscription if there are no patterns.
135140
func (c *PubSub) PSubscribe(patterns ...string) error {
136141
c.mu.Lock()
142+
err := c.subscribe("psubscribe", patterns...)
137143
c.patterns = appendIfNotExists(c.patterns, patterns...)
138144
c.mu.Unlock()
139-
return c.subscribe("psubscribe", patterns...)
145+
return err
140146
}
141147

142148
// Unsubscribes the client from the given channels, or from all of
143149
// them if none is given.
144150
func (c *PubSub) Unsubscribe(channels ...string) error {
145151
c.mu.Lock()
152+
err := c.subscribe("unsubscribe", channels...)
146153
c.channels = remove(c.channels, channels...)
147154
c.mu.Unlock()
148-
return c.subscribe("unsubscribe", channels...)
155+
return err
149156
}
150157

151158
// Unsubscribes the client from the given patterns, or from all of
152159
// them if none is given.
153160
func (c *PubSub) PUnsubscribe(patterns ...string) error {
154161
c.mu.Lock()
162+
err := c.subscribe("punsubscribe", patterns...)
155163
c.patterns = remove(c.patterns, patterns...)
156164
c.mu.Unlock()
157-
return c.subscribe("punsubscribe", patterns...)
165+
return err
158166
}
159167

160168
func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
161-
cn, isNew, err := c.conn()
169+
cn, err := c._conn()
162170
if err != nil {
163171
return err
164172
}
165173

166-
if isNew {
167-
return nil
168-
}
169-
170174
err = c._subscribe(cn, redisCmd, channels...)
171175
c.putConn(cn, err)
172176
return err
@@ -179,7 +183,7 @@ func (c *PubSub) Ping(payload ...string) error {
179183
}
180184
cmd := NewCmd(args...)
181185

182-
cn, _, err := c.conn()
186+
cn, err := c.conn()
183187
if err != nil {
184188
return err
185189
}
@@ -272,7 +276,7 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
272276
c.cmd = NewCmd()
273277
}
274278

275-
cn, _, err := c.conn()
279+
cn, err := c.conn()
276280
if err != nil {
277281
return nil, err
278282
}

redis.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -387,10 +387,7 @@ func (c *Client) pubSub() *PubSub {
387387
func (c *Client) Subscribe(channels ...string) *PubSub {
388388
pubsub := c.pubSub()
389389
if len(channels) > 0 {
390-
err := pubsub.Subscribe(channels...)
391-
if err != nil {
392-
panic(err)
393-
}
390+
_ = pubsub.Subscribe(channels...)
394391
}
395392
return pubsub
396393
}

0 commit comments

Comments
 (0)