diff --git a/CHANGELOG.md b/CHANGELOG.md index 28a20ed8f..0c3b026ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. check (#301) - `GreetingDialer` type for creating a dialer, that fills `Greeting` of a connection (#301) +- New method `Pool.DoInstance` to execute a request on a target instance in + a pool (#376). ### Changed diff --git a/pool/connection_pool.go b/pool/connection_pool.go index 798a43af2..9a79f9116 100644 --- a/pool/connection_pool.go +++ b/pool/connection_pool.go @@ -1002,6 +1002,16 @@ func (p *ConnectionPool) Do(req tarantool.Request, userMode Mode) *tarantool.Fut return conn.Do(req) } +// DoInstance sends the request into a target instance and returns a future. +func (p *ConnectionPool) DoInstance(req tarantool.Request, name string) *tarantool.Future { + conn := p.anyPool.GetConnection(name) + if conn == nil { + return newErrorFuture(ErrNoHealthyInstance) + } + + return conn.Do(req) +} + // // private // diff --git a/pool/connection_pool_test.go b/pool/connection_pool_test.go index 954277922..cadd83564 100644 --- a/pool/connection_pool_test.go +++ b/pool/connection_pool_test.go @@ -2540,6 +2540,76 @@ func TestDo_concurrent(t *testing.T) { wg.Wait() } +func TestDoInstance(t *testing.T) { + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + + connPool, err := pool.Connect(ctx, instances) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + req := tarantool.NewEvalRequest("return box.cfg.listen") + for _, server := range servers { + data, err := connPool.DoInstance(req, server).Get() + require.NoError(t, err) + assert.Equal(t, []interface{}{server}, data) + } +} + +func TestDoInstance_not_found(t *testing.T) { + roles := []bool{true, true, false, true, false} + + err := test_helpers.SetClusterRO(dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + + connPool, err := pool.Connect(ctx, []pool.Instance{}) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + data, err := connPool.DoInstance(tarantool.NewPingRequest(), "not_exist").Get() + assert.Nil(t, data) + require.ErrorIs(t, err, pool.ErrNoHealthyInstance) +} + +func TestDoInstance_concurrent(t *testing.T) { + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + connPool, err := pool.Connect(ctx, instances) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + eval := tarantool.NewEvalRequest("return box.cfg.listen") + ping := tarantool.NewPingRequest() + const concurrency = 100 + var wg sync.WaitGroup + wg.Add(concurrency) + + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + + for _, server := range servers { + data, err := connPool.DoInstance(eval, server).Get() + require.NoError(t, err) + assert.Equal(t, []interface{}{server}, data) + } + _, err := connPool.DoInstance(ping, "not_exist").Get() + require.ErrorIs(t, err, pool.ErrNoHealthyInstance) + }() + } + + wg.Wait() +} + func TestNewPrepared(t *testing.T) { test_helpers.SkipIfSQLUnsupported(t)