Skip to content

Commit 6252cc8

Browse files
committed
Merge branch 'master' into feat/vector-set
2 parents 51a44fb + c149644 commit 6252cc8

File tree

8 files changed

+91
-35
lines changed

8 files changed

+91
-35
lines changed

error.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ import (
1515
// ErrClosed performs any operation on the closed client will return this error.
1616
var ErrClosed = pool.ErrClosed
1717

18+
// ErrPoolExhausted is returned from a pool connection method
19+
// when the maximum number of database connections in the pool has been reached.
20+
var ErrPoolExhausted = pool.ErrPoolExhausted
21+
22+
// ErrPoolTimeout timed out waiting to get a connection from the connection pool.
23+
var ErrPoolTimeout = pool.ErrPoolTimeout
24+
1825
// HasErrorPrefix checks if the err is a Redis error and the message contains a prefix.
1926
func HasErrorPrefix(err error, prefix string) bool {
2027
var rErr Error

export_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ import (
1111
"github.com/redis/go-redis/v9/internal/pool"
1212
)
1313

14-
var ErrPoolTimeout = pool.ErrPoolTimeout
15-
1614
func (c *baseClient) Pool() pool.Pooler {
1715
return c.connPool
1816
}

internal/pool/pool_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,4 +387,33 @@ var _ = Describe("race", func() {
387387
Expect(stats.WaitCount).To(Equal(uint32(1)))
388388
Expect(stats.WaitDurationNs).To(BeNumerically("~", time.Second.Nanoseconds(), 100*time.Millisecond.Nanoseconds()))
389389
})
390+
391+
It("timeout", func() {
392+
testPoolTimeout := 1 * time.Second
393+
opt := &pool.Options{
394+
Dialer: func(ctx context.Context) (net.Conn, error) {
395+
// Artificial delay to force pool timeout
396+
time.Sleep(3 * testPoolTimeout)
397+
398+
return &net.TCPConn{}, nil
399+
},
400+
PoolSize: 1,
401+
PoolTimeout: testPoolTimeout,
402+
}
403+
p := pool.NewConnPool(opt)
404+
405+
stats := p.Stats()
406+
Expect(stats.Timeouts).To(Equal(uint32(0)))
407+
408+
conn, err := p.Get(ctx)
409+
Expect(err).NotTo(HaveOccurred())
410+
_, err = p.Get(ctx)
411+
Expect(err).To(MatchError(pool.ErrPoolTimeout))
412+
p.Put(ctx, conn)
413+
conn, err = p.Get(ctx)
414+
Expect(err).NotTo(HaveOccurred())
415+
416+
stats = p.Stats()
417+
Expect(stats.Timeouts).To(Equal(uint32(1)))
418+
})
390419
})

internal/util.go

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,7 @@ func isLower(s string) bool {
4949
}
5050

5151
func ReplaceSpaces(s string) string {
52-
// Pre-allocate a builder with the same length as s to minimize allocations.
53-
// This is a basic optimization; adjust the initial size based on your use case.
54-
var builder strings.Builder
55-
builder.Grow(len(s))
56-
57-
for _, char := range s {
58-
if char == ' ' {
59-
// Replace space with a hyphen.
60-
builder.WriteRune('-')
61-
} else {
62-
// Copy the character as-is.
63-
builder.WriteRune(char)
64-
}
65-
}
66-
67-
return builder.String()
52+
return strings.ReplaceAll(s, " ", "-")
6853
}
6954

7055
func GetAddr(addr string) string {

internal/util_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package internal
22

33
import (
4+
"runtime"
45
"strings"
56
"testing"
67

@@ -72,3 +73,36 @@ func TestGetAddr(t *testing.T) {
7273
Expect(GetAddr("127")).To(Equal(""))
7374
})
7475
}
76+
77+
func BenchmarkReplaceSpaces(b *testing.B) {
78+
version := runtime.Version()
79+
for i := 0; i < b.N; i++ {
80+
_ = ReplaceSpaces(version)
81+
}
82+
}
83+
84+
func ReplaceSpacesUseBuilder(s string) string {
85+
// Pre-allocate a builder with the same length as s to minimize allocations.
86+
// This is a basic optimization; adjust the initial size based on your use case.
87+
var builder strings.Builder
88+
builder.Grow(len(s))
89+
90+
for _, char := range s {
91+
if char == ' ' {
92+
// Replace space with a hyphen.
93+
builder.WriteRune('-')
94+
} else {
95+
// Copy the character as-is.
96+
builder.WriteRune(char)
97+
}
98+
}
99+
100+
return builder.String()
101+
}
102+
103+
func BenchmarkReplaceSpacesUseBuilder(b *testing.B) {
104+
version := runtime.Version()
105+
for i := 0; i < b.N; i++ {
106+
_ = ReplaceSpacesUseBuilder(version)
107+
}
108+
}

internal_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,14 +364,14 @@ var _ = Describe("ClusterClient", func() {
364364
It("select slot from args for GETKEYSINSLOT command", func() {
365365
cmd := NewStringSliceCmd(ctx, "cluster", "getkeysinslot", 100, 200)
366366

367-
slot := client.cmdSlot(context.Background(), cmd)
367+
slot := client.cmdSlot(cmd)
368368
Expect(slot).To(Equal(100))
369369
})
370370

371371
It("select slot from args for COUNTKEYSINSLOT command", func() {
372372
cmd := NewStringSliceCmd(ctx, "cluster", "countkeysinslot", 100)
373373

374-
slot := client.cmdSlot(context.Background(), cmd)
374+
slot := client.cmdSlot(cmd)
375375
Expect(slot).To(Equal(100))
376376
})
377377
})

osscluster.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -981,7 +981,7 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
981981
}
982982

983983
func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
984-
slot := c.cmdSlot(ctx, cmd)
984+
slot := c.cmdSlot(cmd)
985985
var node *clusterNode
986986
var moved bool
987987
var ask bool
@@ -1329,7 +1329,7 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
13291329

13301330
if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) {
13311331
for _, cmd := range cmds {
1332-
slot := c.cmdSlot(ctx, cmd)
1332+
slot := c.cmdSlot(cmd)
13331333
node, err := c.slotReadOnlyNode(state, slot)
13341334
if err != nil {
13351335
return err
@@ -1340,7 +1340,7 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
13401340
}
13411341

13421342
for _, cmd := range cmds {
1343-
slot := c.cmdSlot(ctx, cmd)
1343+
slot := c.cmdSlot(cmd)
13441344
node, err := state.slotMasterNode(slot)
13451345
if err != nil {
13461346
return err
@@ -1540,7 +1540,7 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
15401540
func (c *ClusterClient) mapCmdsBySlot(ctx context.Context, cmds []Cmder) map[int][]Cmder {
15411541
cmdsMap := make(map[int][]Cmder)
15421542
for _, cmd := range cmds {
1543-
slot := c.cmdSlot(ctx, cmd)
1543+
slot := c.cmdSlot(cmd)
15441544
cmdsMap[slot] = append(cmdsMap[slot], cmd)
15451545
}
15461546
return cmdsMap
@@ -1569,7 +1569,7 @@ func (c *ClusterClient) processTxPipelineNode(
15691569
}
15701570

15711571
func (c *ClusterClient) processTxPipelineNodeConn(
1572-
ctx context.Context, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
1572+
ctx context.Context, _ *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
15731573
) error {
15741574
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
15751575
return writeCmds(wr, cmds)
@@ -1858,7 +1858,7 @@ func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo {
18581858
return info
18591859
}
18601860

1861-
func (c *ClusterClient) cmdSlot(ctx context.Context, cmd Cmder) int {
1861+
func (c *ClusterClient) cmdSlot(cmd Cmder) int {
18621862
args := cmd.Args()
18631863
if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") {
18641864
return args[2].(int)

ring.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -349,17 +349,16 @@ func (c *ringSharding) newRingShards(
349349
return
350350
}
351351

352+
// Warning: External exposure of `c.shards.list` may cause data races.
353+
// So keep internal or implement deep copy if exposed.
352354
func (c *ringSharding) List() []*ringShard {
353-
var list []*ringShard
354-
355355
c.mu.RLock()
356-
if !c.closed {
357-
list = make([]*ringShard, len(c.shards.list))
358-
copy(list, c.shards.list)
359-
}
360-
c.mu.RUnlock()
356+
defer c.mu.RUnlock()
361357

362-
return list
358+
if c.closed {
359+
return nil
360+
}
361+
return c.shards.list
363362
}
364363

365364
func (c *ringSharding) Hash(key string) string {
@@ -423,6 +422,7 @@ func (c *ringSharding) Heartbeat(ctx context.Context, frequency time.Duration) {
423422
case <-ticker.C:
424423
var rebalance bool
425424

425+
// note: `c.List()` return a shadow copy of `[]*ringShard`.
426426
for _, shard := range c.List() {
427427
err := shard.Client.Ping(ctx).Err()
428428
isUp := err == nil || err == pool.ErrPoolTimeout
@@ -582,6 +582,7 @@ func (c *Ring) retryBackoff(attempt int) time.Duration {
582582

583583
// PoolStats returns accumulated connection pool stats.
584584
func (c *Ring) PoolStats() *PoolStats {
585+
// note: `c.List()` return a shadow copy of `[]*ringShard`.
585586
shards := c.sharding.List()
586587
var acc PoolStats
587588
for _, shard := range shards {
@@ -651,6 +652,7 @@ func (c *Ring) ForEachShard(
651652
ctx context.Context,
652653
fn func(ctx context.Context, client *Client) error,
653654
) error {
655+
// note: `c.List()` return a shadow copy of `[]*ringShard`.
654656
shards := c.sharding.List()
655657
var wg sync.WaitGroup
656658
errCh := make(chan error, 1)
@@ -682,6 +684,7 @@ func (c *Ring) ForEachShard(
682684
}
683685

684686
func (c *Ring) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) {
687+
// note: `c.List()` return a shadow copy of `[]*ringShard`.
685688
shards := c.sharding.List()
686689
var firstErr error
687690
for _, shard := range shards {
@@ -810,7 +813,7 @@ func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) er
810813

811814
for _, key := range keys {
812815
if key != "" {
813-
shard, err := c.sharding.GetByKey(hashtag.Key(key))
816+
shard, err := c.sharding.GetByKey(key)
814817
if err != nil {
815818
return err
816819
}

0 commit comments

Comments
 (0)