Skip to content

Commit b52814f

Browse files
authored
Merge pull request #588 from go-redis/fix/sentinel-reset-pool
Resent client pool when sentinel switches master
2 parents 75ceb98 + 0d94a7b commit b52814f

File tree

6 files changed

+152
-185
lines changed

6 files changed

+152
-185
lines changed

internal/pool/pool.go

Lines changed: 26 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -140,47 +140,6 @@ func (p *ConnPool) lastDialError() error {
140140
return p._lastDialError.Load().(error)
141141
}
142142

143-
func (p *ConnPool) PopFree() *Conn {
144-
select {
145-
case p.queue <- struct{}{}:
146-
default:
147-
timer := timers.Get().(*time.Timer)
148-
timer.Reset(p.opt.PoolTimeout)
149-
150-
select {
151-
case p.queue <- struct{}{}:
152-
if !timer.Stop() {
153-
<-timer.C
154-
}
155-
timers.Put(timer)
156-
case <-timer.C:
157-
timers.Put(timer)
158-
atomic.AddUint32(&p.stats.Timeouts, 1)
159-
return nil
160-
}
161-
}
162-
163-
p.freeConnsMu.Lock()
164-
cn := p.popFree()
165-
p.freeConnsMu.Unlock()
166-
167-
if cn == nil {
168-
<-p.queue
169-
}
170-
return cn
171-
}
172-
173-
func (p *ConnPool) popFree() *Conn {
174-
if len(p.freeConns) == 0 {
175-
return nil
176-
}
177-
178-
idx := len(p.freeConns) - 1
179-
cn := p.freeConns[idx]
180-
p.freeConns = p.freeConns[:idx]
181-
return cn
182-
}
183-
184143
// Get returns existed connection from the pool or creates a new one.
185144
func (p *ConnPool) Get() (*Conn, bool, error) {
186145
if p.closed() {
@@ -235,6 +194,17 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
235194
return newcn, true, nil
236195
}
237196

197+
func (p *ConnPool) popFree() *Conn {
198+
if len(p.freeConns) == 0 {
199+
return nil
200+
}
201+
202+
idx := len(p.freeConns) - 1
203+
cn := p.freeConns[idx]
204+
p.freeConns = p.freeConns[:idx]
205+
return cn
206+
}
207+
238208
func (p *ConnPool) Put(cn *Conn) error {
239209
if data := cn.Rd.PeekBuffered(); data != nil {
240210
internal.Logf("connection has unread data: %q", data)
@@ -303,17 +273,28 @@ func (p *ConnPool) closed() bool {
303273
return atomic.LoadUint32(&p._closed) == 1
304274
}
305275

276+
func (p *ConnPool) Filter(fn func(*Conn) bool) error {
277+
var firstErr error
278+
p.connsMu.Lock()
279+
for _, cn := range p.conns {
280+
if fn(cn) {
281+
if err := p.closeConn(cn); err != nil && firstErr == nil {
282+
firstErr = err
283+
}
284+
}
285+
}
286+
p.connsMu.Unlock()
287+
return firstErr
288+
}
289+
306290
func (p *ConnPool) Close() error {
307291
if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
308292
return ErrClosed
309293
}
310294

311-
p.connsMu.Lock()
312295
var firstErr error
296+
p.connsMu.Lock()
313297
for _, cn := range p.conns {
314-
if cn == nil {
315-
continue
316-
}
317298
if err := p.closeConn(cn); err != nil && firstErr == nil {
318299
firstErr = err
319300
}

internal/pool/pool_test.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -238,30 +238,4 @@ var _ = Describe("race", func() {
238238
}
239239
})
240240
})
241-
242-
It("does not happen on Get and PopFree", func() {
243-
connPool = pool.NewConnPool(
244-
&pool.Options{
245-
Dialer: dummyDialer,
246-
PoolSize: 10,
247-
PoolTimeout: time.Minute,
248-
IdleTimeout: time.Second,
249-
IdleCheckFrequency: time.Millisecond,
250-
})
251-
252-
perform(C, func(id int) {
253-
for i := 0; i < N; i++ {
254-
cn, _, err := connPool.Get()
255-
Expect(err).NotTo(HaveOccurred())
256-
if err == nil {
257-
Expect(connPool.Put(cn)).NotTo(HaveOccurred())
258-
}
259-
260-
cn = connPool.PopFree()
261-
if cn != nil {
262-
Expect(connPool.Put(cn)).NotTo(HaveOccurred())
263-
}
264-
}
265-
})
266-
})
267241
})

main_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ var cluster = &clusterScenario{
5050
clients: make(map[string]*redis.Client, 6),
5151
}
5252

53+
func init() {
54+
//redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
55+
}
56+
5357
var _ = BeforeSuite(func() {
5458
var err error
5559

pubsub.go

Lines changed: 56 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -19,54 +19,53 @@ import (
1919
type PubSub struct {
2020
base baseClient
2121

22-
mu sync.Mutex
23-
cn *pool.Conn
24-
closed bool
25-
26-
subMu sync.Mutex
22+
mu sync.Mutex
23+
cn *pool.Conn
2724
channels []string
2825
patterns []string
26+
closed bool
2927

3028
cmd *Cmd
3129
}
3230

33-
func (c *PubSub) conn() (*pool.Conn, bool, error) {
31+
func (c *PubSub) conn() (*pool.Conn, error) {
3432
c.mu.Lock()
35-
defer c.mu.Unlock()
33+
cn, err := c._conn()
34+
c.mu.Unlock()
35+
return cn, err
36+
}
3637

38+
func (c *PubSub) _conn() (*pool.Conn, error) {
3739
if c.closed {
38-
return nil, false, pool.ErrClosed
40+
return nil, pool.ErrClosed
3941
}
4042

4143
if c.cn != nil {
42-
return c.cn, false, nil
44+
return c.cn, nil
4345
}
4446

4547
cn, err := c.base.connPool.NewConn()
4648
if err != nil {
47-
return nil, false, err
49+
return nil, err
4850
}
4951

5052
if !cn.Inited {
5153
if err := c.base.initConn(cn); err != nil {
5254
_ = c.base.connPool.CloseConn(cn)
53-
return nil, false, err
55+
return nil, err
5456
}
5557
}
5658

5759
if err := c.resubscribe(cn); err != nil {
5860
_ = c.base.connPool.CloseConn(cn)
59-
return nil, false, err
61+
return nil, err
6062
}
6163

6264
c.cn = cn
63-
return cn, true, nil
65+
return cn, nil
6466
}
6567

6668
func (c *PubSub) resubscribe(cn *pool.Conn) error {
67-
c.subMu.Lock()
68-
defer c.subMu.Unlock()
69-
7069
var firstErr error
7170
if len(c.channels) > 0 {
7271
if err := c._subscribe(cn, "subscribe", c.channels...); err != nil && firstErr == nil {
@@ -81,6 +80,18 @@ func (c *PubSub) resubscribe(cn *pool.Conn) error {
8180
return firstErr
8281
}
8382

83+
func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error {
84+
args := make([]interface{}, 1+len(channels))
85+
args[0] = redisCmd
86+
for i, channel := range channels {
87+
args[1+i] = channel
88+
}
89+
cmd := NewSliceCmd(args...)
90+
91+
cn.SetWriteTimeout(c.base.opt.WriteTimeout)
92+
return writeCmd(cn, cmd)
93+
}
94+
8495
func (c *PubSub) putConn(cn *pool.Conn, err error) {
8596
if !internal.IsBadConn(err, true) {
8697
return
@@ -114,67 +125,55 @@ func (c *PubSub) Close() error {
114125
return nil
115126
}
116127

117-
func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
118-
cn, isNew, err := c.conn()
119-
if err != nil {
120-
return err
121-
}
122-
123-
if isNew {
124-
return nil
125-
}
126-
127-
err = c._subscribe(cn, redisCmd, channels...)
128-
c.putConn(cn, err)
129-
return err
130-
}
131-
132-
func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error {
133-
args := make([]interface{}, 1+len(channels))
134-
args[0] = redisCmd
135-
for i, channel := range channels {
136-
args[1+i] = channel
137-
}
138-
cmd := NewSliceCmd(args...)
139-
140-
cn.SetWriteTimeout(c.base.opt.WriteTimeout)
141-
return writeCmd(cn, cmd)
142-
}
143-
144128
// Subscribes the client to the specified channels. It returns
145129
// empty subscription if there are no channels.
146130
func (c *PubSub) Subscribe(channels ...string) error {
147-
c.subMu.Lock()
131+
c.mu.Lock()
132+
err := c.subscribe("subscribe", channels...)
148133
c.channels = appendIfNotExists(c.channels, channels...)
149-
c.subMu.Unlock()
150-
return c.subscribe("subscribe", channels...)
134+
c.mu.Unlock()
135+
return err
151136
}
152137

153138
// Subscribes the client to the given patterns. It returns
154139
// empty subscription if there are no patterns.
155140
func (c *PubSub) PSubscribe(patterns ...string) error {
156-
c.subMu.Lock()
141+
c.mu.Lock()
142+
err := c.subscribe("psubscribe", patterns...)
157143
c.patterns = appendIfNotExists(c.patterns, patterns...)
158-
c.subMu.Unlock()
159-
return c.subscribe("psubscribe", patterns...)
144+
c.mu.Unlock()
145+
return err
160146
}
161147

162148
// Unsubscribes the client from the given channels, or from all of
163149
// them if none is given.
164150
func (c *PubSub) Unsubscribe(channels ...string) error {
165-
c.subMu.Lock()
151+
c.mu.Lock()
152+
err := c.subscribe("unsubscribe", channels...)
166153
c.channels = remove(c.channels, channels...)
167-
c.subMu.Unlock()
168-
return c.subscribe("unsubscribe", channels...)
154+
c.mu.Unlock()
155+
return err
169156
}
170157

171158
// Unsubscribes the client from the given patterns, or from all of
172159
// them if none is given.
173160
func (c *PubSub) PUnsubscribe(patterns ...string) error {
174-
c.subMu.Lock()
161+
c.mu.Lock()
162+
err := c.subscribe("punsubscribe", patterns...)
175163
c.patterns = remove(c.patterns, patterns...)
176-
c.subMu.Unlock()
177-
return c.subscribe("punsubscribe", patterns...)
164+
c.mu.Unlock()
165+
return err
166+
}
167+
168+
func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
169+
cn, err := c._conn()
170+
if err != nil {
171+
return err
172+
}
173+
174+
err = c._subscribe(cn, redisCmd, channels...)
175+
c.putConn(cn, err)
176+
return err
178177
}
179178

180179
func (c *PubSub) Ping(payload ...string) error {
@@ -184,7 +183,7 @@ func (c *PubSub) Ping(payload ...string) error {
184183
}
185184
cmd := NewCmd(args...)
186185

187-
cn, _, err := c.conn()
186+
cn, err := c.conn()
188187
if err != nil {
189188
return err
190189
}
@@ -277,7 +276,7 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
277276
c.cmd = NewCmd()
278277
}
279278

280-
cn, _, err := c.conn()
279+
cn, err := c.conn()
281280
if err != nil {
282281
return nil, err
283282
}

0 commit comments

Comments
 (0)