diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index c4516e5bd..05fee7835 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -97,16 +97,24 @@ jobs: run: make deps - name: Run regression tests - run: make test + run: | + make test + make testrace - name: Run regression tests with call_17 - run: make test TAGS="go_tarantool_call_17" + run: | + make test TAGS="go_tarantool_call_17" + make testrace TAGS="go_tarantool_call_17" - name: Run regression tests with msgpack.v5 - run: make test TAGS="go_tarantool_msgpack_v5" + run: | + make test TAGS="go_tarantool_msgpack_v5" + make testrace TAGS="go_tarantool_msgpack_v5" - name: Run regression tests with msgpack.v5 and call_17 - run: make test TAGS="go_tarantool_msgpack_v5,go_tarantool_call_17" + run: | + make test TAGS="go_tarantool_msgpack_v5,go_tarantool_call_17" + make testrace TAGS="go_tarantool_msgpack_v5,go_tarantool_call_17" - name: Run fuzzing tests if: ${{ matrix.fuzzing }} @@ -189,6 +197,7 @@ jobs: run: | source tarantool-enterprise/env.sh make test + make testrace env: TEST_TNT_SSL: ${{matrix.ssl}} @@ -196,6 +205,7 @@ jobs: run: | source tarantool-enterprise/env.sh make test TAGS="go_tarantool_call_17" + make testrace TAGS="go_tarantool_call_17" env: TEST_TNT_SSL: ${{matrix.ssl}} @@ -203,6 +213,7 @@ jobs: run: | source tarantool-enterprise/env.sh make test TAGS="go_tarantool_msgpack_v5" + make testrace TAGS="go_tarantool_msgpack_v5" env: TEST_TNT_SSL: ${{matrix.ssl}} @@ -210,6 +221,7 @@ jobs: run: | source tarantool-enterprise/env.sh make test TAGS="go_tarantool_msgpack_v5,go_tarantool_call_17" + make testrace TAGS="go_tarantool_msgpack_v5,go_tarantool_call_17" env: TEST_TNT_SSL: ${{matrix.ssl}} diff --git a/CHANGELOG.md b/CHANGELOG.md index acfe2a914..48b38049e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,11 +11,14 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ### Added - Support pagination (#246) +- A Makefile target to test with race detector (#218) ### Changed ### Fixed +- Several non-critical data race issues (#218) + ## [1.10.0] - 2022-12-31 The release improves compatibility with new Tarantool versions. diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 15bccd3a6..ba0f0c7ab 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -26,12 +26,18 @@ To run tests for the main package and each subpackage: make test ``` +To run tests for the main package and each subpackage with race detector: +```bash +make testrace +``` + The tests set up all required `tarantool` processes before run and clean up afterwards. If you want to run the tests with specific build tags: ```bash make test TAGS=go_tarantool_ssl_disable,go_tarantool_msgpack_v5 +make testrace TAGS=go_tarantool_ssl_disable,go_tarantool_msgpack_v5 ``` If you have Tarantool Enterprise Edition 2.10 or newer, you can run additional @@ -39,6 +45,7 @@ SSL tests. To do this, you need to set an environment variable 'TEST_TNT_SSL': ```bash TEST_TNT_SSL=true make test +TEST_TNT_SSL=true make testrace ``` If you want to run the tests for a specific package: diff --git a/Makefile b/Makefile index be1f6757f..34de54707 100644 --- a/Makefile +++ b/Makefile @@ -50,6 +50,12 @@ test: testdata: (cd ./testdata; ./generate.sh) +.PHONY: testrace +testrace: + @echo "Running all packages tests with data race detector" + go clean -testcache + go test -race -tags "$(TAGS)" ./... -v -p 1 + .PHONY: test-connection-pool test-connection-pool: @echo "Running tests in connection_pool package" @@ -117,7 +123,13 @@ ${BENCH_PATH} bench-deps: rm -rf ${BENCH_PATH} mkdir ${BENCH_PATH} go clean -testcache - cd ${BENCH_PATH} && git clone https://go.googlesource.com/perf && cd perf && go install ./cmd/benchstat + # It is unable to build a latest version of benchstat with go 1.13. So + # we need to switch to an old commit. + cd ${BENCH_PATH} && \ + git clone https://go.googlesource.com/perf && \ + cd perf && \ + git checkout 91a04616dc65ba76dbe9e5cf746b923b1402d303 && \ + go install ./cmd/benchstat rm -rf ${BENCH_PATH}/perf .PHONY: bench diff --git a/connection.go b/connection.go index 331e1c805..efdd09e65 100644 --- a/connection.go +++ b/connection.go @@ -702,7 +702,7 @@ func pack(h *smallWBuf, enc *encoder, reqid uint32, func (conn *Connection) writeRequest(w *bufio.Writer, req Request) error { var packet smallWBuf - err := pack(&packet, newEncoder(&packet), 0, req, ignoreStreamId, conn.Schema) + err := pack(&packet, newEncoder(&packet), 0, req, ignoreStreamId, nil) if err != nil { return fmt.Errorf("pack error: %w", err) @@ -1033,7 +1033,7 @@ func (conn *Connection) newFuture(ctx context.Context) (fut *Future) { shardn := fut.requestId & (conn.opts.Concurrency - 1) shard := &conn.shard[shardn] shard.rmut.Lock() - switch conn.state { + switch atomic.LoadUint32(&conn.state) { case connClosed: fut.err = ClientError{ ErrConnectionClosed, @@ -1368,6 +1368,9 @@ func (conn *Connection) OverrideSchema(s *Schema) { if s != nil { conn.mutex.Lock() defer conn.mutex.Unlock() + conn.lockShards() + defer conn.unlockShards() + conn.Schema = s } } @@ -1730,9 +1733,10 @@ func (conn *Connection) shutdown() { conn.mutex.Lock() defer conn.mutex.Unlock() - if !atomic.CompareAndSwapUint32(&(conn.state), connConnected, connShutdown) { + if !atomic.CompareAndSwapUint32(&conn.state, connConnected, connShutdown) { return } + conn.cond.Broadcast() conn.notify(Shutdown) diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index 970612b5e..860786fbe 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -7,6 +7,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "testing" "time" @@ -235,9 +236,8 @@ func TestClose(t *testing.T) { } type testHandler struct { - discovered, deactivated int + discovered, deactivated uint32 errs []error - mutex sync.Mutex } func (h *testHandler) addErr(err error) { @@ -246,10 +246,7 @@ func (h *testHandler) addErr(err error) { func (h *testHandler) Discovered(conn *tarantool.Connection, role connection_pool.Role) error { - h.mutex.Lock() - defer h.mutex.Unlock() - - h.discovered++ + discovered := atomic.AddUint32(&h.discovered, 1) if conn == nil { h.addErr(fmt.Errorf("discovered conn == nil")) @@ -260,14 +257,14 @@ func (h *testHandler) Discovered(conn *tarantool.Connection, // discovered >= 3 - update a connection after a role update addr := conn.Addr() if addr == servers[0] { - if h.discovered < 3 && role != connection_pool.MasterRole { + if discovered < 3 && role != connection_pool.MasterRole { h.addErr(fmt.Errorf("unexpected init role %d for addr %s", role, addr)) } - if h.discovered >= 3 && role != connection_pool.ReplicaRole { + if discovered >= 3 && role != connection_pool.ReplicaRole { h.addErr(fmt.Errorf("unexpected updated role %d for addr %s", role, addr)) } } else if addr == servers[1] { - if h.discovered >= 3 { + if discovered >= 3 { h.addErr(fmt.Errorf("unexpected discovery for addr %s", addr)) } if role != connection_pool.ReplicaRole { @@ -282,10 +279,7 @@ func (h *testHandler) Discovered(conn *tarantool.Connection, func (h *testHandler) Deactivated(conn *tarantool.Connection, role connection_pool.Role) error { - h.mutex.Lock() - defer h.mutex.Unlock() - - h.deactivated++ + deactivated := atomic.AddUint32(&h.deactivated, 1) if conn == nil { h.addErr(fmt.Errorf("removed conn == nil")) @@ -293,7 +287,7 @@ func (h *testHandler) Deactivated(conn *tarantool.Connection, } addr := conn.Addr() - if h.deactivated == 1 && addr == servers[0] { + if deactivated == 1 && addr == servers[0] { // A first close is a role update. if role != connection_pool.MasterRole { h.addErr(fmt.Errorf("unexpected removed role %d for addr %s", role, addr)) @@ -337,19 +331,24 @@ func TestConnectionHandlerOpenUpdateClose(t *testing.T) { for i := 0; i < 100; i++ { // Wait for read_only update, it should report about close connection // with old role. - if h.discovered >= 3 { + if atomic.LoadUint32(&h.discovered) >= 3 { break } time.Sleep(poolOpts.CheckTimeout) } - require.Equalf(t, h.deactivated, 1, "updated not reported as deactivated") - require.Equalf(t, h.discovered, 3, "updated not reported as discovered") + + discovered := atomic.LoadUint32(&h.discovered) + deactivated := atomic.LoadUint32(&h.deactivated) + require.Equalf(t, uint32(3), discovered, + "updated not reported as discovered") + require.Equalf(t, uint32(1), deactivated, + "updated not reported as deactivated") pool.Close() for i := 0; i < 100; i++ { // Wait for close of all connections. - if h.deactivated >= 3 { + if atomic.LoadUint32(&h.deactivated) >= 3 { break } time.Sleep(poolOpts.CheckTimeout) @@ -361,8 +360,13 @@ func TestConnectionHandlerOpenUpdateClose(t *testing.T) { connected, err := pool.ConnectedNow(connection_pool.ANY) require.Nilf(t, err, "failed to get connected state") require.Falsef(t, connected, "connection pool still be connected") - require.Equalf(t, len(poolServers)+1, h.discovered, "unexpected discovered count") - require.Equalf(t, len(poolServers)+1, h.deactivated, "unexpected deactivated count") + + discovered = atomic.LoadUint32(&h.discovered) + deactivated = atomic.LoadUint32(&h.deactivated) + require.Equalf(t, uint32(len(poolServers)+1), discovered, + "unexpected discovered count") + require.Equalf(t, uint32(len(poolServers)+1), deactivated, + "unexpected deactivated count") } type testAddErrorHandler struct { @@ -399,18 +403,14 @@ func TestConnectionHandlerOpenError(t *testing.T) { } type testUpdateErrorHandler struct { - discovered, deactivated int - mutex sync.Mutex + discovered, deactivated uint32 } func (h *testUpdateErrorHandler) Discovered(conn *tarantool.Connection, role connection_pool.Role) error { - h.mutex.Lock() - defer h.mutex.Unlock() - - h.discovered++ + atomic.AddUint32(&h.discovered, 1) - if h.deactivated != 0 { + if atomic.LoadUint32(&h.deactivated) != 0 { // Don't add a connection into a pool again after it was deleted. return fmt.Errorf("any error") } @@ -419,10 +419,7 @@ func (h *testUpdateErrorHandler) Discovered(conn *tarantool.Connection, func (h *testUpdateErrorHandler) Deactivated(conn *tarantool.Connection, role connection_pool.Role) error { - h.mutex.Lock() - defer h.mutex.Unlock() - - h.deactivated++ + atomic.AddUint32(&h.deactivated, 1) return nil } @@ -473,7 +470,9 @@ func TestConnectionHandlerUpdateError(t *testing.T) { require.Nilf(t, err, "failed to get ConnectedNow()") require.Falsef(t, connected, "should be deactivated") - require.GreaterOrEqualf(t, h.discovered, h.deactivated, "discovered < deactivated") + discovered := atomic.LoadUint32(&h.discovered) + deactivated := atomic.LoadUint32(&h.deactivated) + require.GreaterOrEqualf(t, discovered, deactivated, "discovered < deactivated") require.Nilf(t, err, "failed to get ConnectedNow()") } diff --git a/connection_pool/watcher.go b/connection_pool/watcher.go index 2876f90bc..d60cfa171 100644 --- a/connection_pool/watcher.go +++ b/connection_pool/watcher.go @@ -33,7 +33,7 @@ func (c *watcherContainer) remove(watcher *poolWatcher) bool { if watcher == c.head { c.head = watcher.next return true - } else { + } else if c.head != nil { cur := c.head for cur.next != nil { if cur.next == watcher { @@ -85,7 +85,11 @@ type poolWatcher struct { // Unregister unregisters the pool watcher. func (w *poolWatcher) Unregister() { - if !w.unregistered && w.container.remove(w) { + w.mutex.Lock() + unregistered := w.unregistered + w.mutex.Unlock() + + if !unregistered && w.container.remove(w) { w.mutex.Lock() w.unregistered = true for _, watcher := range w.watchers { diff --git a/future_test.go b/future_test.go index bae974158..e7e9ab507 100644 --- a/future_test.go +++ b/future_test.go @@ -237,13 +237,13 @@ func TestFutureGetIteratorError(t *testing.T) { func TestFutureSetStateRaceCondition(t *testing.T) { err := errors.New("any error") resp := &Response{} - respAppend := &Response{} for i := 0; i < 1000; i++ { fut := NewFuture() for j := 0; j < 9; j++ { go func(opt int) { if opt%3 == 0 { + respAppend := &Response{} fut.AppendPush(respAppend) } else if opt%3 == 1 { fut.SetError(err) diff --git a/multi/multi.go b/multi/multi.go index ea950cfdf..6aba2a426 100644 --- a/multi/multi.go +++ b/multi/multi.go @@ -212,7 +212,9 @@ func (connMulti *ConnectionMulti) checker() { connMulti.deleteConnectionFromPool(v) } } + connMulti.mutex.Lock() connMulti.addrs = addrs + connMulti.mutex.Unlock() } case <-timer.C: for _, addr := range connMulti.addrs { @@ -261,7 +263,7 @@ func (connMulti *ConnectionMulti) Close() (err error) { defer connMulti.mutex.Unlock() close(connMulti.control) - connMulti.state = connClosed + atomic.StoreUint32(&connMulti.state, connClosed) for _, conn := range connMulti.pool { if err == nil { diff --git a/multi/multi_test.go b/multi/multi_test.go index ad4fd1962..ad98912c2 100644 --- a/multi/multi_test.go +++ b/multi/multi_test.go @@ -200,13 +200,18 @@ func TestRefresh(t *testing.T) { t.Errorf("conn is nil after Connect") return } + + multiConn.mutex.RLock() curAddr := multiConn.addrs[0] + multiConn.mutex.RUnlock() // Wait for refresh timer. // Scenario 1 nodeload, 1 refresh, 1 nodeload. time.Sleep(10 * time.Second) + multiConn.mutex.RLock() newAddr := multiConn.addrs[0] + multiConn.mutex.RUnlock() if curAddr == newAddr { t.Errorf("Expect address refresh") diff --git a/queue/example_connection_pool_test.go b/queue/example_connection_pool_test.go index 214e67274..e41cdc639 100644 --- a/queue/example_connection_pool_test.go +++ b/queue/example_connection_pool_test.go @@ -89,8 +89,8 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection, } } - atomic.AddInt32(&h.masterCnt, 1) fmt.Printf("Master %s is ready to work!\n", conn.Addr()) + atomic.AddInt32(&h.masterCnt, 1) return nil } @@ -185,8 +185,12 @@ func Example_connectionPool() { // Wait for a new master instance re-identification. <-h.masterUpdated - if h.err != nil { - fmt.Printf("Unable to re-identify in the pool: %s", h.err) + h.mutex.Lock() + err = h.err + h.mutex.Unlock() + + if err != nil { + fmt.Printf("Unable to re-identify in the pool: %s", err) return } diff --git a/schema.go b/schema.go index 35f65eb28..a182e8b10 100644 --- a/schema.go +++ b/schema.go @@ -329,7 +329,10 @@ func (conn *Connection) loadSchema() (err error) { schema.SpacesById[index.SpaceId].Indexes[index.Name] = index } + conn.lockShards() conn.Schema = schema + conn.unlockShards() + return nil }