Skip to content

Commit c2e63ad

Browse files
committed
Add MaxConnAge
1 parent 9e6dca7 commit c2e63ad

File tree

12 files changed

+162
-115
lines changed

12 files changed

+162
-115
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## Unreleased
4+
5+
- New option MinIdleConns.
6+
- New option MaxConnAge.
7+
- PoolStats.FreeConns is renamed to PoolStats.IdleConns.
8+
39
## v6.13
410

511
- Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards.

cluster.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ type ClusterOptions struct {
6565

6666
// PoolSize applies per cluster node and not for the whole cluster.
6767
PoolSize int
68+
MinIdleConns int
69+
MaxConnAge time.Duration
6870
PoolTimeout time.Duration
6971
IdleTimeout time.Duration
7072
IdleCheckFrequency time.Duration
@@ -130,10 +132,11 @@ func (opt *ClusterOptions) clientOptions() *Options {
130132
ReadTimeout: opt.ReadTimeout,
131133
WriteTimeout: opt.WriteTimeout,
132134

133-
PoolSize: opt.PoolSize,
134-
PoolTimeout: opt.PoolTimeout,
135-
IdleTimeout: opt.IdleTimeout,
136-
135+
PoolSize: opt.PoolSize,
136+
MinIdleConns: opt.MinIdleConns,
137+
MaxConnAge: opt.MaxConnAge,
138+
PoolTimeout: opt.PoolTimeout,
139+
IdleTimeout: opt.IdleTimeout,
137140
IdleCheckFrequency: disableIdleCheck,
138141

139142
TLSConfig: opt.TLSConfig,
@@ -1106,7 +1109,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
11061109
acc.Timeouts += s.Timeouts
11071110

11081111
acc.TotalConns += s.TotalConns
1109-
acc.FreeConns += s.FreeConns
1112+
acc.IdleConns += s.IdleConns
11101113
acc.StaleConns += s.StaleConns
11111114
}
11121115

@@ -1117,7 +1120,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
11171120
acc.Timeouts += s.Timeouts
11181121

11191122
acc.TotalConns += s.TotalConns
1120-
acc.FreeConns += s.FreeConns
1123+
acc.IdleConns += s.IdleConns
11211124
acc.StaleConns += s.StaleConns
11221125
}
11231126

cluster_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -557,13 +557,13 @@ var _ = Describe("ClusterClient", func() {
557557
It("removes idle connections", func() {
558558
stats := client.PoolStats()
559559
Expect(stats.TotalConns).NotTo(BeZero())
560-
Expect(stats.FreeConns).NotTo(BeZero())
560+
Expect(stats.IdleConns).NotTo(BeZero())
561561

562562
time.Sleep(2 * time.Second)
563563

564564
stats = client.PoolStats()
565565
Expect(stats.TotalConns).To(BeZero())
566-
Expect(stats.FreeConns).To(BeZero())
566+
Expect(stats.IdleConns).To(BeZero())
567567
})
568568

569569
It("returns an error when there are no attempts left", func() {

commands_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ var _ = Describe("Commands", func() {
4242
Expect(stats.Misses).To(Equal(uint32(1)))
4343
Expect(stats.Timeouts).To(Equal(uint32(0)))
4444
Expect(stats.TotalConns).To(Equal(uint32(1)))
45-
Expect(stats.FreeConns).To(Equal(uint32(1)))
45+
Expect(stats.IdleConns).To(Equal(uint32(1)))
4646
})
4747

4848
It("should Echo", func() {

internal/pool/conn.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ type Conn struct {
1818

1919
concurrentReadWrite bool
2020

21-
Inited bool
22-
pooled bool
23-
usedAt atomic.Value
21+
InitedAt time.Time
22+
pooled bool
23+
usedAt atomic.Value
2424
}
2525

2626
func NewConn(netConn net.Conn) *Conn {
@@ -47,10 +47,6 @@ func (cn *Conn) SetNetConn(netConn net.Conn) {
4747
cn.Rd.Reset(netConn)
4848
}
4949

50-
func (cn *Conn) IsStale(timeout time.Duration) bool {
51-
return timeout > 0 && time.Since(cn.UsedAt()) > timeout
52-
}
53-
5450
func (cn *Conn) SetReadTimeout(timeout time.Duration) {
5551
now := time.Now()
5652
cn.SetUsedAt(now)

internal/pool/pool.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ type Stats struct {
2828
Timeouts uint32 // number of times a wait timeout occurred
2929

3030
TotalConns uint32 // number of total connections in the pool
31-
FreeConns uint32 // deprecated - use IdleConns
3231
IdleConns uint32 // number of idle connections in the pool
3332
StaleConns uint32 // number of stale connections removed from the pool
3433
}
@@ -54,6 +53,7 @@ type Options struct {
5453

5554
PoolSize int
5655
MinIdleConns int
56+
MaxConnAge time.Duration
5757
PoolTimeout time.Duration
5858
IdleTimeout time.Duration
5959
IdleCheckFrequency time.Duration
@@ -223,8 +223,8 @@ func (p *ConnPool) Get() (*Conn, error) {
223223
break
224224
}
225225

226-
if cn.IsStale(p.opt.IdleTimeout) {
227-
p.CloseConn(cn)
226+
if p.isStaleConn(cn) {
227+
_ = p.CloseConn(cn)
228228
continue
229229
}
230230

@@ -343,12 +343,12 @@ func (p *ConnPool) closeConn(cn *Conn) error {
343343
// Len returns total number of connections.
344344
func (p *ConnPool) Len() int {
345345
p.connsMu.Lock()
346-
n := p.poolSize
346+
n := len(p.conns)
347347
p.connsMu.Unlock()
348348
return n
349349
}
350350

351-
// FreeLen returns number of idle connections.
351+
// IdleLen returns number of idle connections.
352352
func (p *ConnPool) IdleLen() int {
353353
p.connsMu.Lock()
354354
n := p.idleConnsLen
@@ -364,7 +364,6 @@ func (p *ConnPool) Stats() *Stats {
364364
Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
365365

366366
TotalConns: uint32(p.Len()),
367-
FreeConns: uint32(idleLen),
368367
IdleConns: uint32(idleLen),
369368
StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
370369
}
@@ -415,7 +414,7 @@ func (p *ConnPool) reapStaleConn() *Conn {
415414
}
416415

417416
cn := p.idleConns[0]
418-
if !cn.IsStale(p.opt.IdleTimeout) {
417+
if !p.isStaleConn(cn) {
419418
return nil
420419
}
421420

@@ -466,3 +465,19 @@ func (p *ConnPool) reaper(frequency time.Duration) {
466465
atomic.AddUint32(&p.stats.StaleConns, uint32(n))
467466
}
468467
}
468+
469+
func (p *ConnPool) isStaleConn(cn *Conn) bool {
470+
if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
471+
return false
472+
}
473+
474+
now := time.Now()
475+
if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
476+
return true
477+
}
478+
if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge {
479+
return true
480+
}
481+
482+
return false
483+
}

internal/pool/pool_test.go

Lines changed: 98 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -248,114 +248,128 @@ var _ = Describe("MinIdleConns", func() {
248248

249249
var _ = Describe("conns reaper", func() {
250250
const idleTimeout = time.Minute
251+
const maxAge = time.Hour
251252

252253
var connPool *pool.ConnPool
253-
var conns, idleConns, closedConns []*pool.Conn
254+
var conns, staleConns, closedConns []*pool.Conn
254255

255-
BeforeEach(func() {
256-
conns = nil
257-
closedConns = nil
256+
assert := func(typ string) {
257+
BeforeEach(func() {
258+
closedConns = nil
259+
connPool = pool.NewConnPool(&pool.Options{
260+
Dialer: dummyDialer,
261+
PoolSize: 10,
262+
IdleTimeout: idleTimeout,
263+
MaxConnAge: maxAge,
264+
PoolTimeout: time.Second,
265+
IdleCheckFrequency: time.Hour,
266+
OnClose: func(cn *pool.Conn) error {
267+
closedConns = append(closedConns, cn)
268+
return nil
269+
},
270+
})
258271

259-
connPool = pool.NewConnPool(&pool.Options{
260-
Dialer: dummyDialer,
261-
PoolSize: 10,
262-
PoolTimeout: time.Second,
263-
IdleTimeout: idleTimeout,
264-
IdleCheckFrequency: time.Hour,
265-
266-
OnClose: func(cn *pool.Conn) error {
267-
closedConns = append(closedConns, cn)
268-
return nil
269-
},
270-
})
272+
conns = nil
271273

272-
// add stale connections
273-
idleConns = nil
274-
for i := 0; i < 3; i++ {
275-
cn, err := connPool.Get()
276-
Expect(err).NotTo(HaveOccurred())
277-
cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
278-
conns = append(conns, cn)
279-
idleConns = append(idleConns, cn)
280-
}
274+
// add stale connections
275+
staleConns = nil
276+
for i := 0; i < 3; i++ {
277+
cn, err := connPool.Get()
278+
Expect(err).NotTo(HaveOccurred())
279+
switch typ {
280+
case "idle":
281+
cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
282+
case "aged":
283+
cn.InitedAt = time.Now().Add(-2 * maxAge)
284+
}
285+
conns = append(conns, cn)
286+
staleConns = append(staleConns, cn)
287+
}
281288

282-
// add fresh connections
283-
for i := 0; i < 3; i++ {
284-
cn, err := connPool.Get()
285-
Expect(err).NotTo(HaveOccurred())
286-
conns = append(conns, cn)
287-
}
289+
// add fresh connections
290+
for i := 0; i < 3; i++ {
291+
cn, err := connPool.Get()
292+
Expect(err).NotTo(HaveOccurred())
293+
conns = append(conns, cn)
294+
}
288295

289-
for _, cn := range conns {
290-
connPool.Put(cn)
291-
}
296+
for _, cn := range conns {
297+
if cn.InitedAt.IsZero() {
298+
cn.InitedAt = time.Now()
299+
}
300+
connPool.Put(cn)
301+
}
292302

293-
Expect(connPool.Len()).To(Equal(6))
294-
Expect(connPool.IdleLen()).To(Equal(6))
303+
Expect(connPool.Len()).To(Equal(6))
304+
Expect(connPool.IdleLen()).To(Equal(6))
295305

296-
n, err := connPool.ReapStaleConns()
297-
Expect(err).NotTo(HaveOccurred())
298-
Expect(n).To(Equal(3))
299-
})
306+
n, err := connPool.ReapStaleConns()
307+
Expect(err).NotTo(HaveOccurred())
308+
Expect(n).To(Equal(3))
309+
})
300310

301-
AfterEach(func() {
302-
_ = connPool.Close()
303-
Expect(connPool.Len()).To(Equal(0))
304-
Expect(connPool.IdleLen()).To(Equal(0))
305-
Expect(len(closedConns)).To(Equal(len(conns)))
306-
Expect(closedConns).To(ConsistOf(conns))
307-
})
311+
AfterEach(func() {
312+
_ = connPool.Close()
313+
Expect(connPool.Len()).To(Equal(0))
314+
Expect(connPool.IdleLen()).To(Equal(0))
315+
Expect(len(closedConns)).To(Equal(len(conns)))
316+
Expect(closedConns).To(ConsistOf(conns))
317+
})
308318

309-
It("reaps stale connections", func() {
310-
Expect(connPool.Len()).To(Equal(3))
311-
Expect(connPool.IdleLen()).To(Equal(3))
312-
})
319+
It("reaps stale connections", func() {
320+
Expect(connPool.Len()).To(Equal(3))
321+
Expect(connPool.IdleLen()).To(Equal(3))
322+
})
313323

314-
It("does not reap fresh connections", func() {
315-
n, err := connPool.ReapStaleConns()
316-
Expect(err).NotTo(HaveOccurred())
317-
Expect(n).To(Equal(0))
318-
})
324+
It("does not reap fresh connections", func() {
325+
n, err := connPool.ReapStaleConns()
326+
Expect(err).NotTo(HaveOccurred())
327+
Expect(n).To(Equal(0))
328+
})
319329

320-
It("stale connections are closed", func() {
321-
Expect(len(closedConns)).To(Equal(len(idleConns)))
322-
Expect(closedConns).To(ConsistOf(idleConns))
323-
})
330+
It("stale connections are closed", func() {
331+
Expect(len(closedConns)).To(Equal(len(staleConns)))
332+
Expect(closedConns).To(ConsistOf(staleConns))
333+
})
334+
335+
It("pool is functional", func() {
336+
for j := 0; j < 3; j++ {
337+
var freeCns []*pool.Conn
338+
for i := 0; i < 3; i++ {
339+
cn, err := connPool.Get()
340+
Expect(err).NotTo(HaveOccurred())
341+
Expect(cn).NotTo(BeNil())
342+
freeCns = append(freeCns, cn)
343+
}
344+
345+
Expect(connPool.Len()).To(Equal(3))
346+
Expect(connPool.IdleLen()).To(Equal(0))
324347

325-
It("pool is functional", func() {
326-
for j := 0; j < 3; j++ {
327-
var freeCns []*pool.Conn
328-
for i := 0; i < 3; i++ {
329348
cn, err := connPool.Get()
330349
Expect(err).NotTo(HaveOccurred())
331350
Expect(cn).NotTo(BeNil())
332-
freeCns = append(freeCns, cn)
333-
}
351+
conns = append(conns, cn)
334352

335-
Expect(connPool.Len()).To(Equal(3))
336-
Expect(connPool.IdleLen()).To(Equal(0))
353+
Expect(connPool.Len()).To(Equal(4))
354+
Expect(connPool.IdleLen()).To(Equal(0))
337355

338-
cn, err := connPool.Get()
339-
Expect(err).NotTo(HaveOccurred())
340-
Expect(cn).NotTo(BeNil())
341-
conns = append(conns, cn)
356+
connPool.Remove(cn)
342357

343-
Expect(connPool.Len()).To(Equal(4))
344-
Expect(connPool.IdleLen()).To(Equal(0))
358+
Expect(connPool.Len()).To(Equal(3))
359+
Expect(connPool.IdleLen()).To(Equal(0))
345360

346-
connPool.Remove(cn)
347-
348-
Expect(connPool.Len()).To(Equal(3))
349-
Expect(connPool.IdleLen()).To(Equal(0))
361+
for _, cn := range freeCns {
362+
connPool.Put(cn)
363+
}
350364

351-
for _, cn := range freeCns {
352-
connPool.Put(cn)
365+
Expect(connPool.Len()).To(Equal(3))
366+
Expect(connPool.IdleLen()).To(Equal(3))
353367
}
368+
})
369+
}
354370

355-
Expect(connPool.Len()).To(Equal(3))
356-
Expect(connPool.IdleLen()).To(Equal(3))
357-
}
358-
})
371+
assert("idle")
372+
assert("aged")
359373
})
360374

361375
var _ = Describe("race", func() {

0 commit comments

Comments
 (0)