diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c9def351..82275e1db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,8 +52,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. connection objects (#136). This function now does not attempt to reconnect and tries to establish a connection only once. Function might be canceled via context. Context accepted as first argument. - `pool.Connect` and `pool.Add` now accept context as first argument, which - user may cancel in process. If `pool.Connect` is canceled in progress, an + `pool.Connect` and `pool.Add` now accept context as the first argument, which + user may cancel in process. If `pool.Connect` is canceled in progress, an error will be returned. All created connections will be closed. - `iproto.Feature` type now used instead of `ProtocolFeature` (#337) - `iproto.IPROTO_FEATURE_` constants now used instead of local `Feature` @@ -95,6 +95,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Renamed `StrangerResponse` to `MockResponse` (#237) - `pool.Connect`, `pool.ConnetcWithOpts` and `pool.Add` use a new type `pool.Instance` to determinate connection options (#356) +- `pool.Connect`, `pool.ConnectWithOpts` and `pool.Add` add connections to + the pool even it is unable to connect to it (#372) ### Deprecated diff --git a/README.md b/README.md index f37b3be64..bc3df007c 100644 --- a/README.md +++ b/README.md @@ -199,6 +199,9 @@ The subpackage has been deleted. You could use `pool` instead. the second argument instead of a list of addresses. Each instance is associated with a unique string name, `Dialer` and connection options which allows instances to be independently configured. +* `pool.Connect`, `pool.ConnectWithOpts` and `pool.Add` add instances into + the pool even it is unable to connect to it. The pool will try to connect to + the instance later. * `pool.Add` now accepts context as the first argument, which user may cancel in process. * `pool.Add` now accepts `pool.Instance` as the second argument instead of diff --git a/pool/connection_pool.go b/pool/connection_pool.go index 7dd6e0c46..798a43af2 100644 --- a/pool/connection_pool.go +++ b/pool/connection_pool.go @@ -24,9 +24,7 @@ import ( ) var ( - ErrEmptyInstances = errors.New("instances (second argument) should not be empty") ErrWrongCheckTimeout = errors.New("wrong check timeout, must be greater than 0") - ErrNoConnection = errors.New("no active connections") ErrTooManyArgs = errors.New("too many arguments") ErrIncorrectResponse = errors.New("incorrect response format") ErrIncorrectStatus = errors.New("incorrect instance status: status should be `running`") @@ -155,9 +153,6 @@ func newEndpoint(name string, dialer tarantool.Dialer, opts tarantool.Opts) *end // opts. Instances must have unique names. func ConnectWithOpts(ctx context.Context, instances []Instance, opts Opts) (*ConnectionPool, error) { - if len(instances) == 0 { - return nil, ErrEmptyInstances - } unique := make(map[string]bool) for _, instance := range instances { if _, ok := unique[instance.Name]; ok { @@ -178,28 +173,23 @@ func ConnectWithOpts(ctx context.Context, instances []Instance, connPool := &ConnectionPool{ ends: make(map[string]*endpoint), opts: opts, - state: unknownState, + state: connectedState, done: make(chan struct{}), rwPool: rwPool, roPool: roPool, anyPool: anyPool, } - somebodyAlive, ctxCanceled := connPool.fillPools(ctx, instances) - if !somebodyAlive { + canceled := connPool.fillPools(ctx, instances) + if canceled { connPool.state.set(closedState) - if ctxCanceled { - return nil, ErrContextCanceled - } - return nil, ErrNoConnection + return nil, ErrContextCanceled } - connPool.state.set(connectedState) - - for _, s := range connPool.ends { + for _, endpoint := range connPool.ends { endpointCtx, cancel := context.WithCancel(context.Background()) - s.cancel = cancel - go connPool.controller(endpointCtx, s) + endpoint.cancel = cancel + go connPool.controller(endpointCtx, endpoint) } return connPool, nil @@ -252,8 +242,12 @@ func (p *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, error) { return conn.ConfiguredTimeout(), nil } -// Add adds a new instance into the pool. This function adds the instance -// only after successful connection. +// Add adds a new instance into the pool. The pool will try to connect to the +// instance later if it is unable to establish a connection. +// +// The function may return an error and don't add the instance into the pool +// if the context has been cancelled or on concurrent Close()/CloseGraceful() +// call. func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error { e := newEndpoint(instance.Name, instance.Dialer, instance.Opts) @@ -268,19 +262,34 @@ func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error { return ErrExists } - endpointCtx, cancel := context.WithCancel(context.Background()) - e.cancel = cancel + endpointCtx, endpointCancel := context.WithCancel(context.Background()) + connectCtx, connectCancel := context.WithCancel(ctx) + e.cancel = func() { + connectCancel() + endpointCancel() + } p.ends[instance.Name] = e p.endsMutex.Unlock() - if err := p.tryConnect(ctx, e); err != nil { - p.endsMutex.Lock() - delete(p.ends, instance.Name) - p.endsMutex.Unlock() - e.cancel() - close(e.closed) - return err + if err := p.tryConnect(connectCtx, e); err != nil { + var canceled bool + select { + case <-connectCtx.Done(): + canceled = true + case <-endpointCtx.Done(): + canceled = true + default: + canceled = false + } + if canceled { + p.endsMutex.Lock() + delete(p.ends, instance.Name) + p.endsMutex.Unlock() + e.cancel() + close(e.closed) + return err + } } go p.controller(endpointCtx, e) @@ -1145,64 +1154,30 @@ func (p *ConnectionPool) deactivateConnections() { } } -func (p *ConnectionPool) processConnection(conn *tarantool.Connection, - name string, end *endpoint) bool { - role, err := p.getConnectionRole(conn) - if err != nil { - conn.Close() - log.Printf("tarantool: storing connection to %s failed: %s\n", name, err) - return false - } - - if !p.handlerDiscovered(name, conn, role) { - conn.Close() - return false - } - if p.addConnection(name, conn, role) != nil { - conn.Close() - p.handlerDeactivated(name, conn, role) - return false - } - - end.conn = conn - end.role = role - return true -} - -func (p *ConnectionPool) fillPools(ctx context.Context, - instances []Instance) (bool, bool) { - somebodyAlive := false - ctxCanceled := false - +func (p *ConnectionPool) fillPools(ctx context.Context, instances []Instance) bool { // It is called before controller() goroutines, so we don't expect // concurrency issues here. for _, instance := range instances { end := newEndpoint(instance.Name, instance.Dialer, instance.Opts) p.ends[instance.Name] = end - connOpts := instance.Opts - connOpts.Notify = end.notify - conn, err := tarantool.Connect(ctx, instance.Dialer, connOpts) - if err != nil { + + if err := p.tryConnect(ctx, end); err != nil { log.Printf("tarantool: connect to %s failed: %s\n", instance.Name, err) select { case <-ctx.Done(): - ctxCanceled = true - p.ends[instance.Name] = nil log.Printf("tarantool: operation was canceled") p.deactivateConnections() - return false, ctxCanceled + return true default: } - } else if p.processConnection(conn, instance.Name, end) { - somebodyAlive = true } } - return somebodyAlive, ctxCanceled + return false } func (p *ConnectionPool) updateConnection(e *endpoint) { diff --git a/pool/connection_pool_test.go b/pool/connection_pool_test.go index ea19225c8..954277922 100644 --- a/pool/connection_pool_test.go +++ b/pool/connection_pool_test.go @@ -86,22 +86,6 @@ var defaultTimeoutRetry = 500 * time.Millisecond var helpInstances []test_helpers.TarantoolInstance -func TestConnect_error_empty_instances(t *testing.T) { - ctx, cancel := test_helpers.GetPoolConnectContext() - connPool, err := pool.Connect(ctx, []pool.Instance{}) - cancel() - require.Nilf(t, connPool, "conn is not nil with incorrect param") - require.ErrorIs(t, err, pool.ErrEmptyInstances) -} - -func TestConnect_error_unavailable(t *testing.T) { - ctx, cancel := test_helpers.GetPoolConnectContext() - connPool, err := pool.Connect(ctx, makeInstances([]string{"err1", "err2"}, connOpts)) - cancel() - require.Nilf(t, connPool, "conn is not nil with incorrect param") - require.ErrorIs(t, err, pool.ErrNoConnection) -} - func TestConnect_error_duplicate(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() connPool, err := pool.Connect(ctx, makeInstances([]string{"foo", "foo"}, connOpts)) @@ -138,6 +122,7 @@ func TestConnSuccessfully(t *testing.T) { ExpectedPoolStatus: true, ExpectedStatuses: map[string]bool{ healthyServ: true, + "err": false, }, } @@ -145,6 +130,48 @@ func TestConnSuccessfully(t *testing.T) { require.Nil(t, err) } +func TestConnect_empty(t *testing.T) { + cases := []struct { + Name string + Instances []pool.Instance + }{ + {"nil", nil}, + {"empty", []pool.Instance{}}, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + connPool, err := pool.Connect(ctx, tc.Instances) + if connPool != nil { + defer connPool.Close() + } + require.NoError(t, err, "failed to create a pool") + require.NotNilf(t, connPool, "pool is nil after Connect") + require.Lenf(t, connPool.GetInfo(), 0, "empty pool expected") + }) + } +} + +func TestConnect_unavailable(t *testing.T) { + servers := []string{"err1", "err2"} + ctx, cancel := test_helpers.GetPoolConnectContext() + connPool, err := pool.Connect(ctx, makeInstances([]string{"err1", "err2"}, connOpts)) + cancel() + + if connPool != nil { + defer connPool.Close() + } + + require.NoError(t, err, "failed to create a pool") + require.NotNilf(t, connPool, "pool is nil after Connect") + require.Equal(t, map[string]pool.ConnectionInfo{ + servers[0]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole}, + servers[1]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole}, + }, connPool.GetInfo()) +} + func TestConnErrorAfterCtxCancel(t *testing.T) { var connLongReconnectOpts = tarantool.Opts{ Timeout: 5 * time.Second, @@ -410,16 +437,17 @@ func TestDisconnectAll(t *testing.T) { func TestAdd(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, makeInstances(servers[:1], connOpts)) + connPool, err := pool.Connect(ctx, []pool.Instance{}) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() - for _, server := range servers[1:] { + for _, server := range servers { ctx, cancel := test_helpers.GetConnectContext() + defer cancel() + err = connPool.Add(ctx, makeInstance(server, connOpts)) - cancel() require.Nil(t, err) } @@ -442,6 +470,22 @@ func TestAdd(t *testing.T) { require.Nil(t, err) } +func TestAdd_canceled_ctx(t *testing.T) { + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + connPool, err := pool.Connect(ctx, []pool.Instance{}) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + ctx, cancel = test_helpers.GetConnectContext() + cancel() + + err = connPool.Add(ctx, makeInstance(servers[0], connOpts)) + require.Error(t, err) +} + func TestAdd_exist(t *testing.T) { server := servers[0] ctx, cancel := test_helpers.GetPoolConnectContext() @@ -453,8 +497,9 @@ func TestAdd_exist(t *testing.T) { defer connPool.Close() ctx, cancel = test_helpers.GetConnectContext() + defer cancel() + err = connPool.Add(ctx, makeInstance(server, connOpts)) - cancel() require.Equal(t, pool.ErrExists, err) args := test_helpers.CheckStatusesArgs{ @@ -483,18 +528,15 @@ func TestAdd_unreachable(t *testing.T) { defer connPool.Close() - unhealthyServ := "127.0.0.2:6667" - ctx, cancel = test_helpers.GetConnectContext() - err = connPool.Add(ctx, pool.Instance{ + unhealthyServ := "unreachable:6667" + err = connPool.Add(context.Background(), pool.Instance{ Name: unhealthyServ, Dialer: tarantool.NetDialer{ Address: unhealthyServ, }, Opts: connOpts, }) - cancel() - // The OS-dependent error so we just check for existence. - require.NotNil(t, err) + require.NoError(t, err) args := test_helpers.CheckStatusesArgs{ ConnPool: connPool, @@ -502,7 +544,8 @@ func TestAdd_unreachable(t *testing.T) { Servers: servers, ExpectedPoolStatus: true, ExpectedStatuses: map[string]bool{ - server: true, + server: true, + unhealthyServ: false, }, } @@ -520,9 +563,11 @@ func TestAdd_afterClose(t *testing.T) { require.NotNilf(t, connPool, "conn is nil after Connect") connPool.Close() + ctx, cancel = test_helpers.GetConnectContext() + defer cancel() + err = connPool.Add(ctx, makeInstance(server, connOpts)) - cancel() assert.Equal(t, err, pool.ErrClosed) } @@ -541,9 +586,10 @@ func TestAdd_Close_concurrent(t *testing.T) { go func() { defer wg.Done() - ctx, cancel := test_helpers.GetConnectContext() + ctx, cancel = test_helpers.GetConnectContext() + defer cancel() + err = connPool.Add(ctx, makeInstance(serv1, connOpts)) - cancel() if err != nil { assert.Equal(t, pool.ErrClosed, err) } @@ -569,9 +615,10 @@ func TestAdd_CloseGraceful_concurrent(t *testing.T) { go func() { defer wg.Done() - ctx, cancel := test_helpers.GetConnectContext() + ctx, cancel = test_helpers.GetConnectContext() + defer cancel() + err = connPool.Add(ctx, makeInstance(serv1, connOpts)) - cancel() if err != nil { assert.Equal(t, pool.ErrClosed, err) } @@ -1028,8 +1075,17 @@ func TestConnectionHandlerOpenError(t *testing.T) { if err == nil { defer connPool.Close() } - require.NotNilf(t, err, "success to connect") - require.Equalf(t, 2, h.discovered, "unexpected discovered count") + require.NoError(t, err, "failed to connect") + require.NotNil(t, connPool, "pool expected") + require.Equal(t, map[string]pool.ConnectionInfo{ + servers[0]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole}, + servers[1]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole}, + }, connPool.GetInfo()) + connPool.Close() + + // It could happen additional reconnect attempts in the background, but + // at least 2 connects on start. + require.GreaterOrEqualf(t, h.discovered, 2, "unexpected discovered count") require.Equalf(t, 0, h.deactivated, "unexpected deactivated count") }