diff --git a/CHANGELOG.md b/CHANGELOG.md index 58b390aed..81c4f9341 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,11 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ### Added +- Connection.CloseGraceful() unlike Connection.Close() waits for all + requests to complete (#257) +- ConnectionPool.CloseGraceful() unlike ConnectionPool.Close() waits for all + requests to complete (#257) + ### Changed ### Fixed diff --git a/connection.go b/connection.go index 65d21365c..055941e78 100644 --- a/connection.go +++ b/connection.go @@ -37,10 +37,10 @@ const ( Disconnected // ReconnectFailed signals that attempt to reconnect has failed. ReconnectFailed - // Either reconnect attempts exhausted, or explicit Close is called. - Closed // Shutdown signals that shutdown callback is processing. Shutdown + // Either reconnect attempts exhausted, or explicit Close is called. + Closed // LogReconnectFailed is logged when reconnect attempt failed. LogReconnectFailed ConnLogKind = iota + 1 @@ -109,6 +109,11 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac // - In "Disconnected" state it rejects queries with ClientError{Code: // ErrConnectionNotReady} // +// - In "Shutdown" state it rejects queries with ClientError{Code: +// ErrConnectionShutdown}. The state indicates that a graceful shutdown +// in progress. The connection waits for all active requests to +// complete. +// // - In "Closed" state it rejects queries with ClientError{Code: // ErrConnectionClosed}. Connection could become "Closed" when // Connection.Close() method called, or when Tarantool disconnected and @@ -457,6 +462,13 @@ func (conn *Connection) Close() error { return conn.closeConnection(err, true) } +// CloseGraceful closes Connection gracefully. It waits for all requests to +// complete. +// After this method called, there is no way to reopen this Connection. +func (conn *Connection) CloseGraceful() error { + return conn.shutdown(true) +} + // Addr returns a configured address of Tarantool socket. func (conn *Connection) Addr() string { return conn.addr @@ -1527,17 +1539,27 @@ func shutdownEventCallback(event WatchEvent) { // step 2. val, ok := event.Value.(bool) if ok && val { - go event.Conn.shutdown() + go event.Conn.shutdown(false) } } -func (conn *Connection) shutdown() { +func (conn *Connection) shutdown(forever bool) error { // Forbid state changes. conn.mutex.Lock() defer conn.mutex.Unlock() if !atomic.CompareAndSwapUint32(&conn.state, connConnected, connShutdown) { - return + if forever { + err := ClientError{ErrConnectionClosed, "connection closed by client"} + return conn.closeConnection(err, true) + } + return nil + } + + if forever { + // We don't want to reconnect any more. + conn.opts.Reconnect = 0 + conn.opts.MaxReconnects = 0 } conn.cond.Broadcast() @@ -1546,7 +1568,7 @@ func (conn *Connection) shutdown() { c := conn.c for { if (atomic.LoadUint32(&conn.state) != connShutdown) || (c != conn.c) { - return + return nil } if atomic.LoadInt64(&conn.requestCnt) == 0 { break @@ -1558,14 +1580,19 @@ func (conn *Connection) shutdown() { conn.cond.Wait() } - // Start to reconnect based on common rules, same as in net.box. - // Reconnect also closes the connection: server waits until all - // subscribed connections are terminated. - // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ - // step 3. - conn.reconnectImpl( - ClientError{ + if forever { + err := ClientError{ErrConnectionClosed, "connection closed by client"} + return conn.closeConnection(err, true) + } else { + // Start to reconnect based on common rules, same as in net.box. + // Reconnect also closes the connection: server waits until all + // subscribed connections are terminated. + // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ + // step 3. + conn.reconnectImpl(ClientError{ ErrConnectionClosed, "connection closed after server shutdown", }, conn.c) + return nil + } } diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index 892fea483..f67ac7092 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -87,7 +87,7 @@ Main features: - Automatic master discovery by mode parameter. */ type ConnectionPool struct { - addrs []string + addrs map[string]*endpointState connOpts tarantool.Opts opts OptsPool @@ -102,11 +102,16 @@ type ConnectionPool struct { var _ Pooler = (*ConnectionPool)(nil) -type connState struct { +type endpointState struct { addr string notify chan tarantool.ConnEvent conn *tarantool.Connection role Role + // This is used to switch a connection states. + shutdown chan struct{} + close chan struct{} + closed chan struct{} + closeErr error } // ConnectWithOpts creates pool for instances with addresses addrs @@ -125,7 +130,7 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co anyPool := NewEmptyRoundRobin(size) connPool = &ConnectionPool{ - addrs: make([]string, 0, len(addrs)), + addrs: make(map[string]*endpointState), connOpts: connOpts.Clone(), opts: opts, state: unknownState, @@ -135,28 +140,20 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co anyPool: anyPool, } - m := make(map[string]bool) for _, addr := range addrs { - if _, ok := m[addr]; !ok { - m[addr] = true - connPool.addrs = append(connPool.addrs, addr) - } + connPool.addrs[addr] = nil } - states, somebodyAlive := connPool.fillPools() + somebodyAlive := connPool.fillPools() if !somebodyAlive { connPool.state.set(closedState) - connPool.closeImpl() - for _, s := range states { - close(s.notify) - } return nil, ErrNoConnection } connPool.state.set(connectedState) - for _, s := range states { - go connPool.checker(s) + for _, s := range connPool.addrs { + go connPool.controller(s) } return connPool, nil @@ -208,44 +205,55 @@ func (connPool *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, err return conn.ConfiguredTimeout(), nil } -func (connPool *ConnectionPool) closeImpl() []error { - errs := make([]error, 0, len(connPool.addrs)) - - for _, addr := range connPool.addrs { - if conn := connPool.anyPool.DeleteConnByAddr(addr); conn != nil { - if err := conn.Close(); err != nil { - errs = append(errs, err) - } - - role := UnknownRole - if conn := connPool.rwPool.DeleteConnByAddr(addr); conn != nil { - role = MasterRole - } else if conn := connPool.roPool.DeleteConnByAddr(addr); conn != nil { - role = ReplicaRole - } - connPool.handlerDeactivated(conn, role) +func (pool *ConnectionPool) waitClose() []error { + errs := make([]error, 0, len(pool.addrs)) + for _, s := range pool.addrs { + <-s.closed + if s.closeErr != nil { + errs = append(errs, s.closeErr) } } - - close(connPool.done) return errs } -// Close closes connections in pool. -func (connPool *ConnectionPool) Close() []error { - if connPool.state.cas(connectedState, closedState) { - connPool.poolsMutex.Lock() - defer connPool.poolsMutex.Unlock() +// Close closes connections in the ConnectionPool. +func (pool *ConnectionPool) Close() []error { + if pool.state.cas(connectedState, closedState) || + pool.state.cas(shutdownState, closedState) { + pool.poolsMutex.Lock() + for _, s := range pool.addrs { + close(s.close) + } + pool.poolsMutex.Unlock() + } - return connPool.closeImpl() + return pool.waitClose() +} + +// CloseGraceful closes connections in the ConnectionPool gracefully. It waits +// for all requests to complete. +func (pool *ConnectionPool) CloseGraceful() []error { + if pool.state.cas(connectedState, shutdownState) { + pool.poolsMutex.Lock() + for _, s := range pool.addrs { + close(s.shutdown) + } + pool.poolsMutex.Unlock() } - return nil + + return pool.waitClose() } // GetAddrs gets addresses of connections in pool. func (connPool *ConnectionPool) GetAddrs() []string { cpy := make([]string, len(connPool.addrs)) - copy(cpy, connPool.addrs) + + i := 0 + for addr := range connPool.addrs { + cpy[i] = addr + i++ + } + return cpy } @@ -260,7 +268,7 @@ func (connPool *ConnectionPool) GetPoolInfo() map[string]*ConnectionInfo { return info } - for _, addr := range connPool.addrs { + for addr := range connPool.addrs { conn, role := connPool.getConnectionFromPool(addr) if conn != nil { info[addr] = &ConnectionInfo{ConnectedNow: conn.ConnectedNow(), ConnRole: role} @@ -886,22 +894,25 @@ func (connPool *ConnectionPool) handlerDeactivated(conn *tarantool.Connection, } } -func (connPool *ConnectionPool) fillPools() ([]connState, bool) { - states := make([]connState, len(connPool.addrs)) +func (connPool *ConnectionPool) fillPools() bool { somebodyAlive := false - // It is called before checker() goroutines and before closeImpl() may be - // called so we don't expect concurrency issues here. - for i, addr := range connPool.addrs { - states[i] = connState{ - addr: addr, - notify: make(chan tarantool.ConnEvent, 10), - conn: nil, - role: UnknownRole, + // It is called before controller() goroutines so we don't expect + // concurrency issues here. + for addr := range connPool.addrs { + state := &endpointState{ + addr: addr, + notify: make(chan tarantool.ConnEvent, 10), + conn: nil, + role: UnknownRole, + shutdown: make(chan struct{}), + close: make(chan struct{}), + closed: make(chan struct{}), } - connOpts := connPool.connOpts - connOpts.Notify = states[i].notify + connPool.addrs[addr] = state + connOpts := connPool.connOpts + connOpts.Notify = state.notify conn, err := tarantool.Connect(addr, connOpts) if err != nil { log.Printf("tarantool: connect to %s failed: %s\n", addr, err.Error()) @@ -920,8 +931,8 @@ func (connPool *ConnectionPool) fillPools() ([]connState, bool) { } if conn.ConnectedNow() { - states[i].conn = conn - states[i].role = role + state.conn = conn + state.role = role somebodyAlive = true } else { connPool.deleteConnection(addr) @@ -934,15 +945,15 @@ func (connPool *ConnectionPool) fillPools() ([]connState, bool) { } } - return states, somebodyAlive + return somebodyAlive } -func (pool *ConnectionPool) updateConnection(s connState) connState { +func (pool *ConnectionPool) updateConnection(s *endpointState) { pool.poolsMutex.Lock() if pool.state.get() != connectedState { pool.poolsMutex.Unlock() - return s + return } if role, err := pool.getConnectionRole(s.conn); err == nil { @@ -956,7 +967,7 @@ func (pool *ConnectionPool) updateConnection(s connState) connState { s.conn.Close() s.conn = nil s.role = UnknownRole - return s + return } pool.poolsMutex.Lock() @@ -967,7 +978,7 @@ func (pool *ConnectionPool) updateConnection(s connState) connState { pool.handlerDeactivated(s.conn, role) s.conn = nil s.role = UnknownRole - return s + return } if pool.addConnection(s.addr, s.conn, role) != nil { @@ -977,12 +988,12 @@ func (pool *ConnectionPool) updateConnection(s connState) connState { pool.handlerDeactivated(s.conn, role) s.conn = nil s.role = UnknownRole - return s + return } s.role = role } pool.poolsMutex.Unlock() - return s + return } else { pool.deleteConnection(s.addr) pool.poolsMutex.Unlock() @@ -991,16 +1002,16 @@ func (pool *ConnectionPool) updateConnection(s connState) connState { pool.handlerDeactivated(s.conn, s.role) s.conn = nil s.role = UnknownRole - return s + return } } -func (pool *ConnectionPool) tryConnect(s connState) connState { +func (pool *ConnectionPool) tryConnect(s *endpointState) { pool.poolsMutex.Lock() if pool.state.get() != connectedState { pool.poolsMutex.Unlock() - return s + return } s.conn = nil @@ -1016,13 +1027,13 @@ func (pool *ConnectionPool) tryConnect(s connState) connState { if err != nil { conn.Close() log.Printf("tarantool: storing connection to %s failed: %s\n", s.addr, err) - return s + return } opened := pool.handlerDiscovered(conn, role) if !opened { conn.Close() - return s + return } pool.poolsMutex.Lock() @@ -1030,29 +1041,28 @@ func (pool *ConnectionPool) tryConnect(s connState) connState { pool.poolsMutex.Unlock() conn.Close() pool.handlerDeactivated(conn, role) - return s + return } if pool.addConnection(s.addr, conn, role) != nil { pool.poolsMutex.Unlock() conn.Close() pool.handlerDeactivated(conn, role) - return s + return } s.conn = conn s.role = role } pool.poolsMutex.Unlock() - return s } -func (pool *ConnectionPool) reconnect(s connState) connState { +func (pool *ConnectionPool) reconnect(s *endpointState) { pool.poolsMutex.Lock() if pool.state.get() != connectedState { pool.poolsMutex.Unlock() - return s + return } pool.deleteConnection(s.addr) @@ -1062,41 +1072,100 @@ func (pool *ConnectionPool) reconnect(s connState) connState { s.conn = nil s.role = UnknownRole - return pool.tryConnect(s) + pool.tryConnect(s) } -func (pool *ConnectionPool) checker(s connState) { +func (pool *ConnectionPool) controller(s *endpointState) { timer := time.NewTicker(pool.opts.CheckTimeout) defer timer.Stop() + shutdown := false for { + if shutdown { + // Graceful shutdown in progress. We need to wait for a finish or + // to force close. + select { + case <-s.closed: + case <-s.close: + } + } + select { - case <-pool.done: - close(s.notify) + case <-s.closed: return - case <-s.notify: - if s.conn != nil && s.conn.ClosedNow() { + default: + } + + select { + // s.close has priority to avoid concurrency with s.shutdown. + case <-s.close: + if s.conn != nil { pool.poolsMutex.Lock() - if pool.state.get() == connectedState { - pool.deleteConnection(s.addr) - pool.poolsMutex.Unlock() + pool.deleteConnection(s.addr) + pool.poolsMutex.Unlock() + + if !shutdown { + s.closeErr = s.conn.Close() pool.handlerDeactivated(s.conn, s.role) - s.conn = nil - s.role = UnknownRole + close(s.closed) } else { - pool.poolsMutex.Unlock() + // Force close the connection. + s.conn.Close() + // And wait for a finish. + <-s.closed } - } - case <-timer.C: - // Reopen connection - // Relocate connection between subpools - // if ro/rw was updated - if s.conn == nil { - s = pool.tryConnect(s) - } else if !s.conn.ClosedNow() { - s = pool.updateConnection(s) } else { - s = pool.reconnect(s) + close(s.closed) + } + default: + select { + case <-s.shutdown: + shutdown = true + if s.conn != nil { + pool.poolsMutex.Lock() + pool.deleteConnection(s.addr) + pool.poolsMutex.Unlock() + + // We need to catch s.close in the current goroutine, so + // we need to start an another one for the shutdown. + go func() { + s.closeErr = s.conn.CloseGraceful() + close(s.closed) + }() + } else { + close(s.closed) + } + default: + select { + case <-s.close: + // Will be processed at an upper level. + case <-s.shutdown: + // Will be processed at an upper level. + case <-s.notify: + if s.conn != nil && s.conn.ClosedNow() { + pool.poolsMutex.Lock() + if pool.state.get() == connectedState { + pool.deleteConnection(s.addr) + pool.poolsMutex.Unlock() + pool.handlerDeactivated(s.conn, s.role) + s.conn = nil + s.role = UnknownRole + } else { + pool.poolsMutex.Unlock() + } + } + case <-timer.C: + // Reopen connection. + // Relocate connection between subpools + // if ro/rw was updated. + if s.conn == nil { + pool.tryConnect(s) + } else if !s.conn.ClosedNow() { + pool.updateConnection(s) + } else { + pool.reconnect(s) + } + } } } } diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index e6738c2a1..763df3e64 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -281,6 +281,64 @@ func TestClose(t *testing.T) { require.Nil(t, err) } +func TestCloseGraceful(t *testing.T) { + server1 := servers[0] + server2 := servers[1] + + connPool, err := connection_pool.Connect([]string{server1, server2}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: []string{server1, server2}, + ExpectedPoolStatus: true, + ExpectedStatuses: map[string]bool{ + server1: true, + server2: true, + }, + } + + err = test_helpers.CheckPoolStatuses(args) + require.Nil(t, err) + + eval := `local fiber = require('fiber') + local time = ... + fiber.sleep(time) +` + evalSleep := 3 // In seconds. + req := tarantool.NewEvalRequest(eval).Args([]interface{}{evalSleep}) + fut := connPool.Do(req, connection_pool.ANY) + go func() { + connPool.CloseGraceful() + }() + + // Check that a request rejected if graceful shutdown in progress. + time.Sleep((time.Duration(evalSleep) * time.Second) / 2) + _, err = connPool.Do(tarantool.NewPingRequest(), connection_pool.ANY).Get() + require.ErrorContains(t, err, "can't find healthy instance in pool") + + // Check that a previous request was successful. + resp, err := fut.Get() + require.Nilf(t, err, "sleep request no error") + require.NotNilf(t, resp, "sleep response exists") + + args = test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: []string{server1, server2}, + ExpectedPoolStatus: false, + ExpectedStatuses: map[string]bool{ + server1: false, + server2: false, + }, + } + + err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) +} + type testHandler struct { discovered, deactivated uint32 errs []error @@ -572,9 +630,8 @@ func TestGetPoolInfo(t *testing.T) { srvs[0] = "x" connPool.GetAddrs()[1] = "y" - for i, addr := range connPool.GetAddrs() { - require.Equal(t, expected[i], addr) - } + + require.ElementsMatch(t, expected, connPool.GetAddrs()) } func TestCall17(t *testing.T) { diff --git a/connection_pool/example_test.go b/connection_pool/example_test.go index cf59455ca..4ed422375 100644 --- a/connection_pool/example_test.go +++ b/connection_pool/example_test.go @@ -926,3 +926,43 @@ func ExampleConnectorAdapter() { // Ping Data [] // Ping Error } + +// ExampleConnectionPool_CloseGraceful_force demonstrates how to force close +// a connection pool with graceful close in progress after a while. +func ExampleConnectionPool_CloseGraceful_force() { + pool, err := examplePool(testRoles, connOpts) + if err != nil { + fmt.Println(err) + return + } + + eval := `local fiber = require('fiber') + local time = ... + fiber.sleep(time) +` + req := tarantool.NewEvalRequest(eval).Args([]interface{}{10}) + fut := pool.Do(req, connection_pool.ANY) + + done := make(chan struct{}) + go func() { + pool.CloseGraceful() + fmt.Println("ConnectionPool.CloseGraceful() done!") + close(done) + }() + + select { + case <-done: + case <-time.After(3 * time.Second): + fmt.Println("Force ConnectionPool.Close()!") + pool.Close() + } + <-done + + fmt.Println("Result:") + fmt.Println(fut.Get()) + // Output: + // Force ConnectionPool.Close()! + // ConnectionPool.CloseGraceful() done! + // Result: + // connection closed by client (0x4001) +} diff --git a/connection_pool/state.go b/connection_pool/state.go index a9d20392e..20cd070af 100644 --- a/connection_pool/state.go +++ b/connection_pool/state.go @@ -10,6 +10,7 @@ type state uint32 const ( unknownState state = iota connectedState + shutdownState closedState ) diff --git a/example_test.go b/example_test.go index 7d16893e1..e608d0f42 100644 --- a/example_test.go +++ b/example_test.go @@ -1115,3 +1115,39 @@ func ExamplePingRequest_Context() { // Ping Resp // Ping Error context is done } + +// ExampleConnection_CloseGraceful_force demonstrates how to force close +// a connection with graceful close in progress after a while. +func ExampleConnection_CloseGraceful_force() { + conn := example_connect(opts) + + eval := `local fiber = require('fiber') + local time = ... + fiber.sleep(time) +` + req := tarantool.NewEvalRequest(eval).Args([]interface{}{10}) + fut := conn.Do(req) + + done := make(chan struct{}) + go func() { + conn.CloseGraceful() + fmt.Println("Connection.CloseGraceful() done!") + close(done) + }() + + select { + case <-done: + case <-time.After(time.Second): + fmt.Println("Force Connection.Close()!") + conn.Close() + } + <-done + + fmt.Println("Result:") + fmt.Println(fut.Get()) + // Output: + // Force Connection.Close()! + // Connection.CloseGraceful() done! + // Result: + // connection closed by client (0x4001) +} diff --git a/shutdown_test.go b/shutdown_test.go index fe268b41e..1b06284c0 100644 --- a/shutdown_test.go +++ b/shutdown_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" . "github.com/tarantool/go-tarantool" "github.com/tarantool/go-tarantool/test_helpers" @@ -143,6 +144,48 @@ func TestGracefulShutdown(t *testing.T) { testGracefulShutdown(t, conn, &inst) } +func TestCloseGraceful(t *testing.T) { + opts := Opts{ + User: shtdnClntOpts.User, + Pass: shtdnClntOpts.Pass, + Timeout: shtdnClntOpts.Timeout, + } + + inst, err := test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, opts) + defer conn.Close() + + // Send request with sleep. + evalSleep := 3 // In seconds. + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + fut := conn.Do(req) + + go func() { + // CloseGraceful closes the connection gracefully. + conn.CloseGraceful() + // Connection is closed. + assert.Equal(t, true, conn.ClosedNow()) + }() + + // Check that a request rejected if graceful shutdown in progress. + time.Sleep((time.Duration(evalSleep) * time.Second) / 2) + _, err = conn.Do(NewPingRequest()).Get() + assert.ErrorContains(t, err, "server shutdown in progress") + + // Check that a previous request was successful. + resp, err := fut.Get() + assert.Nilf(t, err, "sleep request no error") + assert.NotNilf(t, resp, "sleep response exists") +} + func TestGracefulShutdownWithReconnect(t *testing.T) { test_helpers.SkipIfWatchersUnsupported(t)