diff --git a/CHANGELOG.md b/CHANGELOG.md index 81c4f9341..b2655e52e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. requests to complete (#257) - ConnectionPool.CloseGraceful() unlike ConnectionPool.Close() waits for all requests to complete (#257) +- ConnectionPool.Add()/ConnectionPool.Remove() to add/remove endpoints + from a pool (#290) ### Changed diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index f67ac7092..f1b3951b2 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -30,6 +30,7 @@ var ( ErrNoRwInstance = errors.New("can't find rw instance in pool") ErrNoRoInstance = errors.New("can't find ro instance in pool") ErrNoHealthyInstance = errors.New("can't find healthy instance in pool") + ErrClosed = errors.New("pool is closed") ) // ConnectionHandler provides callbacks for components interested in handling @@ -87,7 +88,9 @@ Main features: - Automatic master discovery by mode parameter. */ type ConnectionPool struct { - addrs map[string]*endpointState + addrs map[string]*endpoint + addrsMutex sync.RWMutex + connOpts tarantool.Opts opts OptsPool @@ -102,7 +105,7 @@ type ConnectionPool struct { var _ Pooler = (*ConnectionPool)(nil) -type endpointState struct { +type endpoint struct { addr string notify chan tarantool.ConnEvent conn *tarantool.Connection @@ -114,6 +117,18 @@ type endpointState struct { closeErr error } +func newEndpoint(addr string) *endpoint { + return &endpoint{ + addr: addr, + notify: make(chan tarantool.ConnEvent, 100), + conn: nil, + role: UnknownRole, + shutdown: make(chan struct{}), + close: make(chan struct{}), + closed: make(chan struct{}), + } +} + // ConnectWithOpts creates pool for instances with addresses addrs // with options opts. func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (connPool *ConnectionPool, err error) { @@ -130,7 +145,7 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co anyPool := NewEmptyRoundRobin(size) connPool = &ConnectionPool{ - addrs: make(map[string]*endpointState), + addrs: make(map[string]*endpoint), connOpts: connOpts.Clone(), opts: opts, state: unknownState, @@ -205,12 +220,75 @@ func (connPool *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, err return conn.ConfiguredTimeout(), nil } +// Add adds a new endpoint with the address into the pool. This function +// adds the endpoint only after successful connection. +func (pool *ConnectionPool) Add(addr string) error { + e := newEndpoint(addr) + + pool.addrsMutex.Lock() + // Ensure that Close()/CloseGraceful() not in progress/done. + if pool.state.get() != connectedState { + pool.addrsMutex.Unlock() + return ErrClosed + } + if _, ok := pool.addrs[addr]; ok { + pool.addrsMutex.Unlock() + return errors.New("endpoint exist") + } + pool.addrs[addr] = e + pool.addrsMutex.Unlock() + + if err := pool.tryConnect(e); err != nil { + pool.addrsMutex.Lock() + delete(pool.addrs, addr) + pool.addrsMutex.Unlock() + close(e.closed) + return err + } + + go pool.controller(e) + return nil +} + +// Remove removes an endpoint with the address from the pool. The call +// closes an active connection gracefully. +func (pool *ConnectionPool) Remove(addr string) error { + pool.addrsMutex.Lock() + endpoint, ok := pool.addrs[addr] + if !ok { + pool.addrsMutex.Unlock() + return errors.New("endpoint not exist") + } + + select { + case <-endpoint.close: + // Close() in progress/done. + case <-endpoint.shutdown: + // CloseGraceful()/Remove() in progress/done. + default: + close(endpoint.shutdown) + } + + delete(pool.addrs, addr) + pool.addrsMutex.Unlock() + + <-endpoint.closed + return nil +} + func (pool *ConnectionPool) waitClose() []error { - errs := make([]error, 0, len(pool.addrs)) - for _, s := range pool.addrs { - <-s.closed - if s.closeErr != nil { - errs = append(errs, s.closeErr) + pool.addrsMutex.RLock() + endpoints := make([]*endpoint, 0, len(pool.addrs)) + for _, e := range pool.addrs { + endpoints = append(endpoints, e) + } + pool.addrsMutex.RUnlock() + + errs := make([]error, 0, len(endpoints)) + for _, e := range endpoints { + <-e.closed + if e.closeErr != nil { + errs = append(errs, e.closeErr) } } return errs @@ -220,11 +298,11 @@ func (pool *ConnectionPool) waitClose() []error { func (pool *ConnectionPool) Close() []error { if pool.state.cas(connectedState, closedState) || pool.state.cas(shutdownState, closedState) { - pool.poolsMutex.Lock() + pool.addrsMutex.RLock() for _, s := range pool.addrs { close(s.close) } - pool.poolsMutex.Unlock() + pool.addrsMutex.RUnlock() } return pool.waitClose() @@ -234,22 +312,25 @@ func (pool *ConnectionPool) Close() []error { // for all requests to complete. func (pool *ConnectionPool) CloseGraceful() []error { if pool.state.cas(connectedState, shutdownState) { - pool.poolsMutex.Lock() + pool.addrsMutex.RLock() for _, s := range pool.addrs { close(s.shutdown) } - pool.poolsMutex.Unlock() + pool.addrsMutex.RUnlock() } return pool.waitClose() } // GetAddrs gets addresses of connections in pool. -func (connPool *ConnectionPool) GetAddrs() []string { - cpy := make([]string, len(connPool.addrs)) +func (pool *ConnectionPool) GetAddrs() []string { + pool.addrsMutex.RLock() + defer pool.addrsMutex.RUnlock() + + cpy := make([]string, len(pool.addrs)) i := 0 - for addr := range connPool.addrs { + for addr := range pool.addrs { cpy[i] = addr i++ } @@ -258,18 +339,20 @@ func (connPool *ConnectionPool) GetAddrs() []string { } // GetPoolInfo gets information of connections (connected status, ro/rw role). -func (connPool *ConnectionPool) GetPoolInfo() map[string]*ConnectionInfo { +func (pool *ConnectionPool) GetPoolInfo() map[string]*ConnectionInfo { info := make(map[string]*ConnectionInfo) - connPool.poolsMutex.RLock() - defer connPool.poolsMutex.RUnlock() + pool.addrsMutex.RLock() + defer pool.addrsMutex.RUnlock() + pool.poolsMutex.RLock() + defer pool.poolsMutex.RUnlock() - if connPool.state.get() != connectedState { + if pool.state.get() != connectedState { return info } - for addr := range connPool.addrs { - conn, role := connPool.getConnectionFromPool(addr) + for addr := range pool.addrs { + conn, role := pool.getConnectionFromPool(addr) if conn != nil { info[addr] = &ConnectionInfo{ConnectedNow: conn.ConnectedNow(), ConnRole: role} } @@ -900,19 +983,11 @@ func (connPool *ConnectionPool) fillPools() bool { // It is called before controller() goroutines so we don't expect // concurrency issues here. for addr := range connPool.addrs { - state := &endpointState{ - addr: addr, - notify: make(chan tarantool.ConnEvent, 10), - conn: nil, - role: UnknownRole, - shutdown: make(chan struct{}), - close: make(chan struct{}), - closed: make(chan struct{}), - } - connPool.addrs[addr] = state + end := newEndpoint(addr) + connPool.addrs[addr] = end connOpts := connPool.connOpts - connOpts.Notify = state.notify + connOpts.Notify = end.notify conn, err := tarantool.Connect(addr, connOpts) if err != nil { log.Printf("tarantool: connect to %s failed: %s\n", addr, err.Error()) @@ -931,8 +1006,8 @@ func (connPool *ConnectionPool) fillPools() bool { } if conn.ConnectedNow() { - state.conn = conn - state.role = role + end.conn = conn + end.role = role somebodyAlive = true } else { connPool.deleteConnection(addr) @@ -948,7 +1023,7 @@ func (connPool *ConnectionPool) fillPools() bool { return somebodyAlive } -func (pool *ConnectionPool) updateConnection(s *endpointState) { +func (pool *ConnectionPool) updateConnection(e *endpoint) { pool.poolsMutex.Lock() if pool.state.get() != connectedState { @@ -956,17 +1031,17 @@ func (pool *ConnectionPool) updateConnection(s *endpointState) { return } - if role, err := pool.getConnectionRole(s.conn); err == nil { - if s.role != role { - pool.deleteConnection(s.addr) + if role, err := pool.getConnectionRole(e.conn); err == nil { + if e.role != role { + pool.deleteConnection(e.addr) pool.poolsMutex.Unlock() - pool.handlerDeactivated(s.conn, s.role) - opened := pool.handlerDiscovered(s.conn, role) + pool.handlerDeactivated(e.conn, e.role) + opened := pool.handlerDiscovered(e.conn, role) if !opened { - s.conn.Close() - s.conn = nil - s.role = UnknownRole + e.conn.Close() + e.conn = nil + e.role = UnknownRole return } @@ -974,66 +1049,66 @@ func (pool *ConnectionPool) updateConnection(s *endpointState) { if pool.state.get() != connectedState { pool.poolsMutex.Unlock() - s.conn.Close() - pool.handlerDeactivated(s.conn, role) - s.conn = nil - s.role = UnknownRole + e.conn.Close() + pool.handlerDeactivated(e.conn, role) + e.conn = nil + e.role = UnknownRole return } - if pool.addConnection(s.addr, s.conn, role) != nil { + if pool.addConnection(e.addr, e.conn, role) != nil { pool.poolsMutex.Unlock() - s.conn.Close() - pool.handlerDeactivated(s.conn, role) - s.conn = nil - s.role = UnknownRole + e.conn.Close() + pool.handlerDeactivated(e.conn, role) + e.conn = nil + e.role = UnknownRole return } - s.role = role + e.role = role } pool.poolsMutex.Unlock() return } else { - pool.deleteConnection(s.addr) + pool.deleteConnection(e.addr) pool.poolsMutex.Unlock() - s.conn.Close() - pool.handlerDeactivated(s.conn, s.role) - s.conn = nil - s.role = UnknownRole + e.conn.Close() + pool.handlerDeactivated(e.conn, e.role) + e.conn = nil + e.role = UnknownRole return } } -func (pool *ConnectionPool) tryConnect(s *endpointState) { +func (pool *ConnectionPool) tryConnect(e *endpoint) error { pool.poolsMutex.Lock() if pool.state.get() != connectedState { pool.poolsMutex.Unlock() - return + return ErrClosed } - s.conn = nil - s.role = UnknownRole + e.conn = nil + e.role = UnknownRole connOpts := pool.connOpts - connOpts.Notify = s.notify - conn, _ := tarantool.Connect(s.addr, connOpts) - if conn != nil { + connOpts.Notify = e.notify + conn, err := tarantool.Connect(e.addr, connOpts) + if err == nil { role, err := pool.getConnectionRole(conn) pool.poolsMutex.Unlock() if err != nil { conn.Close() - log.Printf("tarantool: storing connection to %s failed: %s\n", s.addr, err) - return + log.Printf("tarantool: storing connection to %s failed: %s\n", e.addr, err) + return err } opened := pool.handlerDiscovered(conn, role) if !opened { conn.Close() - return + return errors.New("storing connection canceled") } pool.poolsMutex.Lock() @@ -1041,23 +1116,24 @@ func (pool *ConnectionPool) tryConnect(s *endpointState) { pool.poolsMutex.Unlock() conn.Close() pool.handlerDeactivated(conn, role) - return + return ErrClosed } - if pool.addConnection(s.addr, conn, role) != nil { + if err = pool.addConnection(e.addr, conn, role); err != nil { pool.poolsMutex.Unlock() conn.Close() pool.handlerDeactivated(conn, role) - return + return err } - s.conn = conn - s.role = role + e.conn = conn + e.role = role } pool.poolsMutex.Unlock() + return err } -func (pool *ConnectionPool) reconnect(s *endpointState) { +func (pool *ConnectionPool) reconnect(e *endpoint) { pool.poolsMutex.Lock() if pool.state.get() != connectedState { @@ -1065,17 +1141,17 @@ func (pool *ConnectionPool) reconnect(s *endpointState) { return } - pool.deleteConnection(s.addr) + pool.deleteConnection(e.addr) pool.poolsMutex.Unlock() - pool.handlerDeactivated(s.conn, s.role) - s.conn = nil - s.role = UnknownRole + pool.handlerDeactivated(e.conn, e.role) + e.conn = nil + e.role = UnknownRole - pool.tryConnect(s) + pool.tryConnect(e) } -func (pool *ConnectionPool) controller(s *endpointState) { +func (pool *ConnectionPool) controller(e *endpoint) { timer := time.NewTicker(pool.opts.CheckTimeout) defer timer.Stop() @@ -1085,71 +1161,71 @@ func (pool *ConnectionPool) controller(s *endpointState) { // Graceful shutdown in progress. We need to wait for a finish or // to force close. select { - case <-s.closed: - case <-s.close: + case <-e.closed: + case <-e.close: } } select { - case <-s.closed: + case <-e.closed: return default: } select { - // s.close has priority to avoid concurrency with s.shutdown. - case <-s.close: - if s.conn != nil { + // e.close has priority to avoid concurrency with e.shutdown. + case <-e.close: + if e.conn != nil { pool.poolsMutex.Lock() - pool.deleteConnection(s.addr) + pool.deleteConnection(e.addr) pool.poolsMutex.Unlock() if !shutdown { - s.closeErr = s.conn.Close() - pool.handlerDeactivated(s.conn, s.role) - close(s.closed) + e.closeErr = e.conn.Close() + pool.handlerDeactivated(e.conn, e.role) + close(e.closed) } else { // Force close the connection. - s.conn.Close() + e.conn.Close() // And wait for a finish. - <-s.closed + <-e.closed } } else { - close(s.closed) + close(e.closed) } default: select { - case <-s.shutdown: + case <-e.shutdown: shutdown = true - if s.conn != nil { + if e.conn != nil { pool.poolsMutex.Lock() - pool.deleteConnection(s.addr) + pool.deleteConnection(e.addr) pool.poolsMutex.Unlock() // We need to catch s.close in the current goroutine, so // we need to start an another one for the shutdown. go func() { - s.closeErr = s.conn.CloseGraceful() - close(s.closed) + e.closeErr = e.conn.CloseGraceful() + close(e.closed) }() } else { - close(s.closed) + close(e.closed) } default: select { - case <-s.close: + case <-e.close: // Will be processed at an upper level. - case <-s.shutdown: + case <-e.shutdown: // Will be processed at an upper level. - case <-s.notify: - if s.conn != nil && s.conn.ClosedNow() { + case <-e.notify: + if e.conn != nil && e.conn.ClosedNow() { pool.poolsMutex.Lock() if pool.state.get() == connectedState { - pool.deleteConnection(s.addr) + pool.deleteConnection(e.addr) pool.poolsMutex.Unlock() - pool.handlerDeactivated(s.conn, s.role) - s.conn = nil - s.role = UnknownRole + pool.handlerDeactivated(e.conn, e.role) + e.conn = nil + e.role = UnknownRole } else { pool.poolsMutex.Unlock() } @@ -1158,12 +1234,12 @@ func (pool *ConnectionPool) controller(s *endpointState) { // Reopen connection. // Relocate connection between subpools // if ro/rw was updated. - if s.conn == nil { - pool.tryConnect(s) - } else if !s.conn.ClosedNow() { - pool.updateConnection(s) + if e.conn == nil { + pool.tryConnect(e) + } else if !e.conn.ClosedNow() { + pool.updateConnection(e) } else { - pool.reconnect(s) + pool.reconnect(e) } } } diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index 763df3e64..57441ee37 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tarantool/go-tarantool" "github.com/tarantool/go-tarantool/connection_pool" @@ -242,6 +243,305 @@ func TestDisconnectAll(t *testing.T) { require.Nil(t, err) } +func TestAdd(t *testing.T) { + connPool, err := connection_pool.Connect([]string{servers[0]}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + for _, server := range servers[1:] { + err = connPool.Add(server) + require.Nil(t, err) + } + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: servers, + ExpectedPoolStatus: true, + ExpectedStatuses: map[string]bool{ + servers[0]: true, + servers[1]: true, + servers[2]: true, + servers[3]: true, + servers[4]: true, + }, + } + + err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, + defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) +} + +func TestAdd_exist(t *testing.T) { + connPool, err := connection_pool.Connect([]string{servers[0]}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + err = connPool.Add(servers[0]) + require.ErrorContains(t, err, "endpoint exist") + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: servers, + ExpectedPoolStatus: true, + ExpectedStatuses: map[string]bool{ + servers[0]: true, + }, + } + + err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, + defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) +} + +func TestAdd_unreachable(t *testing.T) { + connPool, err := connection_pool.Connect([]string{servers[0]}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + err = connPool.Add("127.0.0.2:6667") + // The OS-dependent error so we just check for existence. + require.NotNil(t, err) + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: servers, + ExpectedPoolStatus: true, + ExpectedStatuses: map[string]bool{ + servers[0]: true, + }, + } + + err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, + defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) +} + +func TestAdd_afterClose(t *testing.T) { + connPool, err := connection_pool.Connect([]string{servers[0]}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + connPool.Close() + err = connPool.Add(servers[0]) + assert.Equal(t, err, connection_pool.ErrClosed) +} + +func TestAdd_Close_concurrent(t *testing.T) { + connPool, err := connection_pool.Connect([]string{servers[0]}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + err = connPool.Add(servers[0]) + if err != nil { + assert.Equal(t, err, connection_pool.ErrClosed) + } + }() + + connPool.Close() + + wg.Wait() +} + +func TestAdd_CloseGraceful_concurrent(t *testing.T) { + connPool, err := connection_pool.Connect([]string{servers[0]}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + err = connPool.Add(servers[0]) + if err != nil { + assert.Equal(t, err, connection_pool.ErrClosed) + } + }() + + connPool.CloseGraceful() + + wg.Wait() +} + +func TestRemove(t *testing.T) { + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + for _, server := range servers[1:] { + err = connPool.Remove(server) + require.Nil(t, err) + } + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: servers, + ExpectedPoolStatus: true, + ExpectedStatuses: map[string]bool{ + servers[0]: true, + }, + } + + err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, + defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) +} + +func TestRemove_double(t *testing.T) { + connPool, err := connection_pool.Connect([]string{servers[0], servers[1]}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + err = connPool.Remove(servers[1]) + require.Nil(t, err) + err = connPool.Remove(servers[1]) + require.ErrorContains(t, err, "endpoint not exist") + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: servers, + ExpectedPoolStatus: true, + ExpectedStatuses: map[string]bool{ + servers[0]: true, + }, + } + + err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, + defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) +} + +func TestRemove_unknown(t *testing.T) { + connPool, err := connection_pool.Connect([]string{servers[0], servers[1]}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + err = connPool.Remove("not_exist:6667") + require.ErrorContains(t, err, "endpoint not exist") + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: servers, + ExpectedPoolStatus: true, + ExpectedStatuses: map[string]bool{ + servers[0]: true, + servers[1]: true, + }, + } + + err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, + defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) +} + +func TestRemove_concurrent(t *testing.T) { + connPool, err := connection_pool.Connect([]string{servers[0], servers[1]}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + const concurrency = 10 + var ( + wg sync.WaitGroup + ok uint32 + errs uint32 + ) + + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + err := connPool.Remove(servers[1]) + if err == nil { + atomic.AddUint32(&ok, 1) + } else { + assert.ErrorContains(t, err, "endpoint not exist") + atomic.AddUint32(&errs, 1) + } + }() + } + + wg.Wait() + assert.Equal(t, uint32(1), ok) + assert.Equal(t, uint32(concurrency-1), errs) + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: servers, + ExpectedPoolStatus: true, + ExpectedStatuses: map[string]bool{ + servers[0]: true, + }, + } + + err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, + defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) +} + +func TestRemove_Close_concurrent(t *testing.T) { + connPool, err := connection_pool.Connect([]string{servers[0], servers[1]}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + err = connPool.Remove(servers[1]) + assert.Nil(t, err) + }() + + connPool.Close() + + wg.Wait() +} + +func TestRemove_CloseGraceful_concurrent(t *testing.T) { + connPool, err := connection_pool.Connect([]string{servers[0], servers[1]}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + err = connPool.Remove(servers[1]) + assert.Nil(t, err) + }() + + connPool.CloseGraceful() + + wg.Wait() +} + func TestClose(t *testing.T) { server1 := servers[0] server2 := servers[1]