Skip to content

Commit fae7507

Browse files
committed
api: block Connect() on failure if Reconnect > 0
This patch makes Connect() retry connection attempts if opts.Reconnect is greater than 0. The delay between connection attempts is opts.Reconnect. If opts.MaxReconnects > 0 then the maximum number of attempts is equal to it, otherwise the maximum number of attempts is unlimited. Connect() now also blocks until a connection is established, provided context is cancelled or the number of attempts is exhausted. Closes #436
1 parent 252c3b7 commit fae7507

File tree

3 files changed

+141
-14
lines changed

3 files changed

+141
-14
lines changed

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,19 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
66
and this project adheres to [Semantic
77
Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
88

9+
## [Unreleased]
10+
11+
### Added
12+
13+
### Changed
14+
15+
- Connect() now retry the connection if a failure occurs and opts.Reconnect > 0.
16+
The number of attempts is equal to opts.MaxReconnects or unlimited if
17+
opts.MaxReconnects == 0. Connect() blocks until a connection is established,
18+
the context is cancelled, or the number of attempts is exhausted (#436).
19+
20+
### Fixed
21+
922
## [v2.3.0] - 2025-03-11
1023

1124
The release extends box.info responses and ConnectionPool.GetInfo return data.

connection.go

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,24 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
9292
case LogReconnectFailed:
9393
reconnects := v[0].(uint)
9494
err := v[1].(error)
95-
log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s",
96-
reconnects, conn.opts.MaxReconnects, conn.Addr(), err)
95+
addr := conn.Addr()
96+
if addr == nil {
97+
log.Printf("tarantool: connect (%d/%d) failed: %s",
98+
reconnects, conn.opts.MaxReconnects, err)
99+
} else {
100+
log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s",
101+
reconnects, conn.opts.MaxReconnects, addr, err)
102+
}
97103
case LogLastReconnectFailed:
98104
err := v[0].(error)
99-
log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
100-
conn.Addr(), err)
105+
addr := conn.Addr()
106+
if addr == nil {
107+
log.Printf("tarantool: last connect failed: %s, giving it up",
108+
err)
109+
} else {
110+
log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
111+
addr, err)
112+
}
101113
case LogUnexpectedResultId:
102114
header := v[0].(Header)
103115
log.Printf("tarantool: connection %s got unexpected request ID (%d) in response "+
@@ -362,8 +374,20 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
362374

363375
conn.cond = sync.NewCond(&conn.mutex)
364376

365-
if err = conn.createConnection(ctx); err != nil {
366-
return nil, err
377+
if conn.opts.Reconnect > 0 {
378+
// We don't need these mutex.Lock()/mutex.Unlock() here, but
379+
// runReconnects() expects mutex.Lock() to be set, so it's
380+
// easier to add them instead of reworking runReconnects().
381+
conn.mutex.Lock()
382+
err = conn.runReconnects(ctx)
383+
conn.mutex.Unlock()
384+
if err != nil {
385+
return nil, err
386+
}
387+
} else {
388+
if err = conn.connect(ctx); err != nil {
389+
return nil, err
390+
}
367391
}
368392

369393
go conn.pinger()
@@ -553,7 +577,7 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
553577
return
554578
}
555579

556-
func (conn *Connection) createConnection(ctx context.Context) error {
580+
func (conn *Connection) connect(ctx context.Context) error {
557581
var err error
558582
if conn.c == nil && conn.state == connDisconnected {
559583
if err = conn.dial(ctx); err == nil {
@@ -616,19 +640,22 @@ func (conn *Connection) getDialTimeout() time.Duration {
616640
return dialTimeout
617641
}
618642

619-
func (conn *Connection) runReconnects() error {
643+
func (conn *Connection) runReconnects(ctx context.Context) error {
620644
dialTimeout := conn.getDialTimeout()
621645
var reconnects uint
622646
var err error
623647

648+
t := time.NewTicker(conn.opts.Reconnect)
649+
defer t.Stop()
624650
for conn.opts.MaxReconnects == 0 || reconnects <= conn.opts.MaxReconnects {
625-
now := time.Now()
626-
627-
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
628-
err = conn.createConnection(ctx)
651+
localCtx, cancel := context.WithTimeout(ctx, dialTimeout)
652+
err = conn.connect(localCtx)
629653
cancel()
630654

631655
if err != nil {
656+
if ctx.Err() != nil {
657+
return err
658+
}
632659
if clientErr, ok := err.(ClientError); ok &&
633660
clientErr.Code == ErrConnectionClosed {
634661
return err
@@ -642,7 +669,12 @@ func (conn *Connection) runReconnects() error {
642669
reconnects++
643670
conn.mutex.Unlock()
644671

645-
time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))
672+
select {
673+
case <-ctx.Done():
674+
// Since the context is cancelled, we don't need to do anything.
675+
// Conn.connect() will return the correct error.
676+
case <-t.C:
677+
}
646678

647679
conn.mutex.Lock()
648680
}
@@ -656,7 +688,7 @@ func (conn *Connection) reconnectImpl(neterr error, c Conn) {
656688
if conn.opts.Reconnect > 0 {
657689
if c == conn.c {
658690
conn.closeConnection(neterr, false)
659-
if err := conn.runReconnects(); err != nil {
691+
if err := conn.runReconnects(context.Background()); err != nil {
660692
conn.closeConnection(err, true)
661693
}
662694
}

tarantool_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3972,6 +3972,88 @@ func TestConnect_context_cancel(t *testing.T) {
39723972
}
39733973
}
39743974

3975+
func TestConnectIsBlocked(t *testing.T) {
3976+
const server = "127.0.0.1:3015"
3977+
3978+
testDialer := dialer
3979+
testDialer.Address = server
3980+
3981+
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
3982+
defer cancel()
3983+
errors := make(chan error)
3984+
go func() {
3985+
reconnectOpts := opts
3986+
reconnectOpts.Reconnect = 100 * time.Millisecond
3987+
reconnectOpts.MaxReconnects = 100
3988+
conn, err := Connect(ctx, testDialer, reconnectOpts)
3989+
if err != nil {
3990+
errors <- fmt.Errorf("Connection was not established: %v", err)
3991+
return
3992+
}
3993+
defer conn.Close()
3994+
close(errors)
3995+
}()
3996+
3997+
time.Sleep(time.Second)
3998+
inst, err := test_helpers.StartTarantool(test_helpers.StartOpts{
3999+
Dialer: testDialer,
4000+
InitScript: "config.lua",
4001+
Listen: server,
4002+
WaitStart: 100 * time.Millisecond,
4003+
ConnectRetry: 10,
4004+
RetryTimeout: 500 * time.Millisecond,
4005+
})
4006+
defer test_helpers.StopTarantoolWithCleanup(inst)
4007+
if err != nil {
4008+
t.Fatalf("Unable to start Tarantool: %s", err)
4009+
}
4010+
if err := <-errors; err != nil {
4011+
t.Fatal(err)
4012+
}
4013+
}
4014+
4015+
func TestConnectIsBlockedUntilContextExpires(t *testing.T) {
4016+
const server = "127.0.0.1:3015"
4017+
4018+
testDialer := dialer
4019+
testDialer.Address = server
4020+
4021+
ctx, cancel := test_helpers.GetConnectContext()
4022+
defer cancel()
4023+
reconnectOpts := opts
4024+
reconnectOpts.Reconnect = 100 * time.Millisecond
4025+
reconnectOpts.MaxReconnects = 100
4026+
_, err := Connect(ctx, testDialer, reconnectOpts)
4027+
if err == nil {
4028+
t.Fatal("Connection was unexpectedly established.")
4029+
}
4030+
exp := "failed to dial: dial tcp 127.0.0.1:3015: i/o timeout"
4031+
if err.Error() != exp {
4032+
t.Fatalf("Expected '%s', got '%v'", exp, err)
4033+
}
4034+
}
4035+
4036+
func TestConnectIsUnblockedAfterMaxAttempts(t *testing.T) {
4037+
const server = "127.0.0.1:3015"
4038+
4039+
testDialer := dialer
4040+
testDialer.Address = server
4041+
4042+
ctx, cancel := test_helpers.GetConnectContext()
4043+
defer cancel()
4044+
reconnectOpts := opts
4045+
reconnectOpts.Reconnect = 100 * time.Millisecond
4046+
reconnectOpts.MaxReconnects = 1
4047+
_, err := Connect(ctx, testDialer, reconnectOpts)
4048+
if err == nil {
4049+
t.Fatal("Connection was unexpectedly established.")
4050+
}
4051+
exp := "last reconnect failed"
4052+
if !strings.Contains(err.Error(), exp) {
4053+
t.Fatalf("Expected '%s', got '%v'", exp, err)
4054+
}
4055+
}
4056+
39754057
func buildSidecar(dir string) error {
39764058
goPath, err := exec.LookPath("go")
39774059
if err != nil {

0 commit comments

Comments
 (0)