From 13789ce134484b55e3ebb9166c3602624c17372b Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Fri, 26 Jan 2024 16:40:08 +0300 Subject: [PATCH] pool: add a connection even on connection error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit From a user's perspective, it is useful to add all target instances to the pool, even some that are not currently unavailable. This way the user don’t have to keep track of the list of actually added instances. The patch make it possible. Closes #372 --- CHANGELOG.md | 6 +- README.md | 3 + pool/connection_pool.go | 109 ++++++++++++------------------ pool/connection_pool_test.go | 124 +++++++++++++++++++++++++---------- 4 files changed, 139 insertions(+), 103 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c9def351..82275e1db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,8 +52,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. connection objects (#136). This function now does not attempt to reconnect and tries to establish a connection only once. Function might be canceled via context. Context accepted as first argument. - `pool.Connect` and `pool.Add` now accept context as first argument, which - user may cancel in process. If `pool.Connect` is canceled in progress, an + `pool.Connect` and `pool.Add` now accept context as the first argument, which + user may cancel in process. If `pool.Connect` is canceled in progress, an error will be returned. All created connections will be closed. - `iproto.Feature` type now used instead of `ProtocolFeature` (#337) - `iproto.IPROTO_FEATURE_` constants now used instead of local `Feature` @@ -95,6 +95,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Renamed `StrangerResponse` to `MockResponse` (#237) - `pool.Connect`, `pool.ConnetcWithOpts` and `pool.Add` use a new type `pool.Instance` to determinate connection options (#356) +- `pool.Connect`, `pool.ConnectWithOpts` and `pool.Add` add connections to + the pool even it is unable to connect to it (#372) ### Deprecated diff --git a/README.md b/README.md index f37b3be64..bc3df007c 100644 --- a/README.md +++ b/README.md @@ -199,6 +199,9 @@ The subpackage has been deleted. You could use `pool` instead. the second argument instead of a list of addresses. Each instance is associated with a unique string name, `Dialer` and connection options which allows instances to be independently configured. +* `pool.Connect`, `pool.ConnectWithOpts` and `pool.Add` add instances into + the pool even it is unable to connect to it. The pool will try to connect to + the instance later. * `pool.Add` now accepts context as the first argument, which user may cancel in process. * `pool.Add` now accepts `pool.Instance` as the second argument instead of diff --git a/pool/connection_pool.go b/pool/connection_pool.go index 7dd6e0c46..798a43af2 100644 --- a/pool/connection_pool.go +++ b/pool/connection_pool.go @@ -24,9 +24,7 @@ import ( ) var ( - ErrEmptyInstances = errors.New("instances (second argument) should not be empty") ErrWrongCheckTimeout = errors.New("wrong check timeout, must be greater than 0") - ErrNoConnection = errors.New("no active connections") ErrTooManyArgs = errors.New("too many arguments") ErrIncorrectResponse = errors.New("incorrect response format") ErrIncorrectStatus = errors.New("incorrect instance status: status should be `running`") @@ -155,9 +153,6 @@ func newEndpoint(name string, dialer tarantool.Dialer, opts tarantool.Opts) *end // opts. Instances must have unique names. func ConnectWithOpts(ctx context.Context, instances []Instance, opts Opts) (*ConnectionPool, error) { - if len(instances) == 0 { - return nil, ErrEmptyInstances - } unique := make(map[string]bool) for _, instance := range instances { if _, ok := unique[instance.Name]; ok { @@ -178,28 +173,23 @@ func ConnectWithOpts(ctx context.Context, instances []Instance, connPool := &ConnectionPool{ ends: make(map[string]*endpoint), opts: opts, - state: unknownState, + state: connectedState, done: make(chan struct{}), rwPool: rwPool, roPool: roPool, anyPool: anyPool, } - somebodyAlive, ctxCanceled := connPool.fillPools(ctx, instances) - if !somebodyAlive { + canceled := connPool.fillPools(ctx, instances) + if canceled { connPool.state.set(closedState) - if ctxCanceled { - return nil, ErrContextCanceled - } - return nil, ErrNoConnection + return nil, ErrContextCanceled } - connPool.state.set(connectedState) - - for _, s := range connPool.ends { + for _, endpoint := range connPool.ends { endpointCtx, cancel := context.WithCancel(context.Background()) - s.cancel = cancel - go connPool.controller(endpointCtx, s) + endpoint.cancel = cancel + go connPool.controller(endpointCtx, endpoint) } return connPool, nil @@ -252,8 +242,12 @@ func (p *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, error) { return conn.ConfiguredTimeout(), nil } -// Add adds a new instance into the pool. This function adds the instance -// only after successful connection. +// Add adds a new instance into the pool. The pool will try to connect to the +// instance later if it is unable to establish a connection. +// +// The function may return an error and don't add the instance into the pool +// if the context has been cancelled or on concurrent Close()/CloseGraceful() +// call. func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error { e := newEndpoint(instance.Name, instance.Dialer, instance.Opts) @@ -268,19 +262,34 @@ func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error { return ErrExists } - endpointCtx, cancel := context.WithCancel(context.Background()) - e.cancel = cancel + endpointCtx, endpointCancel := context.WithCancel(context.Background()) + connectCtx, connectCancel := context.WithCancel(ctx) + e.cancel = func() { + connectCancel() + endpointCancel() + } p.ends[instance.Name] = e p.endsMutex.Unlock() - if err := p.tryConnect(ctx, e); err != nil { - p.endsMutex.Lock() - delete(p.ends, instance.Name) - p.endsMutex.Unlock() - e.cancel() - close(e.closed) - return err + if err := p.tryConnect(connectCtx, e); err != nil { + var canceled bool + select { + case <-connectCtx.Done(): + canceled = true + case <-endpointCtx.Done(): + canceled = true + default: + canceled = false + } + if canceled { + p.endsMutex.Lock() + delete(p.ends, instance.Name) + p.endsMutex.Unlock() + e.cancel() + close(e.closed) + return err + } } go p.controller(endpointCtx, e) @@ -1145,64 +1154,30 @@ func (p *ConnectionPool) deactivateConnections() { } } -func (p *ConnectionPool) processConnection(conn *tarantool.Connection, - name string, end *endpoint) bool { - role, err := p.getConnectionRole(conn) - if err != nil { - conn.Close() - log.Printf("tarantool: storing connection to %s failed: %s\n", name, err) - return false - } - - if !p.handlerDiscovered(name, conn, role) { - conn.Close() - return false - } - if p.addConnection(name, conn, role) != nil { - conn.Close() - p.handlerDeactivated(name, conn, role) - return false - } - - end.conn = conn - end.role = role - return true -} - -func (p *ConnectionPool) fillPools(ctx context.Context, - instances []Instance) (bool, bool) { - somebodyAlive := false - ctxCanceled := false - +func (p *ConnectionPool) fillPools(ctx context.Context, instances []Instance) bool { // It is called before controller() goroutines, so we don't expect // concurrency issues here. for _, instance := range instances { end := newEndpoint(instance.Name, instance.Dialer, instance.Opts) p.ends[instance.Name] = end - connOpts := instance.Opts - connOpts.Notify = end.notify - conn, err := tarantool.Connect(ctx, instance.Dialer, connOpts) - if err != nil { + + if err := p.tryConnect(ctx, end); err != nil { log.Printf("tarantool: connect to %s failed: %s\n", instance.Name, err) select { case <-ctx.Done(): - ctxCanceled = true - p.ends[instance.Name] = nil log.Printf("tarantool: operation was canceled") p.deactivateConnections() - return false, ctxCanceled + return true default: } - } else if p.processConnection(conn, instance.Name, end) { - somebodyAlive = true } } - return somebodyAlive, ctxCanceled + return false } func (p *ConnectionPool) updateConnection(e *endpoint) { diff --git a/pool/connection_pool_test.go b/pool/connection_pool_test.go index ea19225c8..954277922 100644 --- a/pool/connection_pool_test.go +++ b/pool/connection_pool_test.go @@ -86,22 +86,6 @@ var defaultTimeoutRetry = 500 * time.Millisecond var helpInstances []test_helpers.TarantoolInstance -func TestConnect_error_empty_instances(t *testing.T) { - ctx, cancel := test_helpers.GetPoolConnectContext() - connPool, err := pool.Connect(ctx, []pool.Instance{}) - cancel() - require.Nilf(t, connPool, "conn is not nil with incorrect param") - require.ErrorIs(t, err, pool.ErrEmptyInstances) -} - -func TestConnect_error_unavailable(t *testing.T) { - ctx, cancel := test_helpers.GetPoolConnectContext() - connPool, err := pool.Connect(ctx, makeInstances([]string{"err1", "err2"}, connOpts)) - cancel() - require.Nilf(t, connPool, "conn is not nil with incorrect param") - require.ErrorIs(t, err, pool.ErrNoConnection) -} - func TestConnect_error_duplicate(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() connPool, err := pool.Connect(ctx, makeInstances([]string{"foo", "foo"}, connOpts)) @@ -138,6 +122,7 @@ func TestConnSuccessfully(t *testing.T) { ExpectedPoolStatus: true, ExpectedStatuses: map[string]bool{ healthyServ: true, + "err": false, }, } @@ -145,6 +130,48 @@ func TestConnSuccessfully(t *testing.T) { require.Nil(t, err) } +func TestConnect_empty(t *testing.T) { + cases := []struct { + Name string + Instances []pool.Instance + }{ + {"nil", nil}, + {"empty", []pool.Instance{}}, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + connPool, err := pool.Connect(ctx, tc.Instances) + if connPool != nil { + defer connPool.Close() + } + require.NoError(t, err, "failed to create a pool") + require.NotNilf(t, connPool, "pool is nil after Connect") + require.Lenf(t, connPool.GetInfo(), 0, "empty pool expected") + }) + } +} + +func TestConnect_unavailable(t *testing.T) { + servers := []string{"err1", "err2"} + ctx, cancel := test_helpers.GetPoolConnectContext() + connPool, err := pool.Connect(ctx, makeInstances([]string{"err1", "err2"}, connOpts)) + cancel() + + if connPool != nil { + defer connPool.Close() + } + + require.NoError(t, err, "failed to create a pool") + require.NotNilf(t, connPool, "pool is nil after Connect") + require.Equal(t, map[string]pool.ConnectionInfo{ + servers[0]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole}, + servers[1]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole}, + }, connPool.GetInfo()) +} + func TestConnErrorAfterCtxCancel(t *testing.T) { var connLongReconnectOpts = tarantool.Opts{ Timeout: 5 * time.Second, @@ -410,16 +437,17 @@ func TestDisconnectAll(t *testing.T) { func TestAdd(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, makeInstances(servers[:1], connOpts)) + connPool, err := pool.Connect(ctx, []pool.Instance{}) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() - for _, server := range servers[1:] { + for _, server := range servers { ctx, cancel := test_helpers.GetConnectContext() + defer cancel() + err = connPool.Add(ctx, makeInstance(server, connOpts)) - cancel() require.Nil(t, err) } @@ -442,6 +470,22 @@ func TestAdd(t *testing.T) { require.Nil(t, err) } +func TestAdd_canceled_ctx(t *testing.T) { + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + connPool, err := pool.Connect(ctx, []pool.Instance{}) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + ctx, cancel = test_helpers.GetConnectContext() + cancel() + + err = connPool.Add(ctx, makeInstance(servers[0], connOpts)) + require.Error(t, err) +} + func TestAdd_exist(t *testing.T) { server := servers[0] ctx, cancel := test_helpers.GetPoolConnectContext() @@ -453,8 +497,9 @@ func TestAdd_exist(t *testing.T) { defer connPool.Close() ctx, cancel = test_helpers.GetConnectContext() + defer cancel() + err = connPool.Add(ctx, makeInstance(server, connOpts)) - cancel() require.Equal(t, pool.ErrExists, err) args := test_helpers.CheckStatusesArgs{ @@ -483,18 +528,15 @@ func TestAdd_unreachable(t *testing.T) { defer connPool.Close() - unhealthyServ := "127.0.0.2:6667" - ctx, cancel = test_helpers.GetConnectContext() - err = connPool.Add(ctx, pool.Instance{ + unhealthyServ := "unreachable:6667" + err = connPool.Add(context.Background(), pool.Instance{ Name: unhealthyServ, Dialer: tarantool.NetDialer{ Address: unhealthyServ, }, Opts: connOpts, }) - cancel() - // The OS-dependent error so we just check for existence. - require.NotNil(t, err) + require.NoError(t, err) args := test_helpers.CheckStatusesArgs{ ConnPool: connPool, @@ -502,7 +544,8 @@ func TestAdd_unreachable(t *testing.T) { Servers: servers, ExpectedPoolStatus: true, ExpectedStatuses: map[string]bool{ - server: true, + server: true, + unhealthyServ: false, }, } @@ -520,9 +563,11 @@ func TestAdd_afterClose(t *testing.T) { require.NotNilf(t, connPool, "conn is nil after Connect") connPool.Close() + ctx, cancel = test_helpers.GetConnectContext() + defer cancel() + err = connPool.Add(ctx, makeInstance(server, connOpts)) - cancel() assert.Equal(t, err, pool.ErrClosed) } @@ -541,9 +586,10 @@ func TestAdd_Close_concurrent(t *testing.T) { go func() { defer wg.Done() - ctx, cancel := test_helpers.GetConnectContext() + ctx, cancel = test_helpers.GetConnectContext() + defer cancel() + err = connPool.Add(ctx, makeInstance(serv1, connOpts)) - cancel() if err != nil { assert.Equal(t, pool.ErrClosed, err) } @@ -569,9 +615,10 @@ func TestAdd_CloseGraceful_concurrent(t *testing.T) { go func() { defer wg.Done() - ctx, cancel := test_helpers.GetConnectContext() + ctx, cancel = test_helpers.GetConnectContext() + defer cancel() + err = connPool.Add(ctx, makeInstance(serv1, connOpts)) - cancel() if err != nil { assert.Equal(t, pool.ErrClosed, err) } @@ -1028,8 +1075,17 @@ func TestConnectionHandlerOpenError(t *testing.T) { if err == nil { defer connPool.Close() } - require.NotNilf(t, err, "success to connect") - require.Equalf(t, 2, h.discovered, "unexpected discovered count") + require.NoError(t, err, "failed to connect") + require.NotNil(t, connPool, "pool expected") + require.Equal(t, map[string]pool.ConnectionInfo{ + servers[0]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole}, + servers[1]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole}, + }, connPool.GetInfo()) + connPool.Close() + + // It could happen additional reconnect attempts in the background, but + // at least 2 connects on start. + require.GreaterOrEqualf(t, h.discovered, 2, "unexpected discovered count") require.Equalf(t, 0, h.deactivated, "unexpected deactivated count") }