From 6e4da11e1630e83ea6f199594ff8c1fce24ae9aa Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Mon, 9 Jan 2023 11:06:24 +0300 Subject: [PATCH 01/12] code health: do not pass a schema to init requests Initial requests should not use a schema because it has not been loaded yet. Part of #218 --- connection.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connection.go b/connection.go index 331e1c805..3e88ae4b7 100644 --- a/connection.go +++ b/connection.go @@ -702,7 +702,7 @@ func pack(h *smallWBuf, enc *encoder, reqid uint32, func (conn *Connection) writeRequest(w *bufio.Writer, req Request) error { var packet smallWBuf - err := pack(&packet, newEncoder(&packet), 0, req, ignoreStreamId, conn.Schema) + err := pack(&packet, newEncoder(&packet), 0, req, ignoreStreamId, nil) if err != nil { return fmt.Errorf("pack error: %w", err) From 58902ce1c1783522e5909f65826375b66ca9287a Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Mon, 9 Jan 2023 14:49:37 +0300 Subject: [PATCH 02/12] bugfix: load and usage schema data race We need to block shards to avoid schema usage by concurrent requests. Now it can be a ping request or a watch request so it does not look critical. We don't expect many of this requests and such requests do not use schema at all. Part of #218 --- connection.go | 3 +++ schema.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/connection.go b/connection.go index 3e88ae4b7..9a4b6f3d6 100644 --- a/connection.go +++ b/connection.go @@ -1368,6 +1368,9 @@ func (conn *Connection) OverrideSchema(s *Schema) { if s != nil { conn.mutex.Lock() defer conn.mutex.Unlock() + conn.lockShards() + defer conn.unlockShards() + conn.Schema = s } } diff --git a/schema.go b/schema.go index 35f65eb28..a182e8b10 100644 --- a/schema.go +++ b/schema.go @@ -329,7 +329,10 @@ func (conn *Connection) loadSchema() (err error) { schema.SpacesById[index.SpaceId].Indexes[index.Name] = index } + conn.lockShards() conn.Schema = schema + conn.unlockShards() + return nil } From a1cd9d191dff069c024ed98a3ceceebf9db78937 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Mon, 9 Jan 2023 14:54:05 +0300 Subject: [PATCH 03/12] bugfix: read state data race We need to use an atomic method to read the atomic value. Part of #218 --- connection.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/connection.go b/connection.go index 9a4b6f3d6..efdd09e65 100644 --- a/connection.go +++ b/connection.go @@ -1033,7 +1033,7 @@ func (conn *Connection) newFuture(ctx context.Context) (fut *Future) { shardn := fut.requestId & (conn.opts.Concurrency - 1) shard := &conn.shard[shardn] shard.rmut.Lock() - switch conn.state { + switch atomic.LoadUint32(&conn.state) { case connClosed: fut.err = ClientError{ ErrConnectionClosed, @@ -1733,9 +1733,10 @@ func (conn *Connection) shutdown() { conn.mutex.Lock() defer conn.mutex.Unlock() - if !atomic.CompareAndSwapUint32(&(conn.state), connConnected, connShutdown) { + if !atomic.CompareAndSwapUint32(&conn.state, connConnected, connShutdown) { return } + conn.cond.Broadcast() conn.notify(Shutdown) From a1a74763b8018449b4c1c76e6462f07d334f5f24 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Mon, 9 Jan 2023 18:01:19 +0300 Subject: [PATCH 04/12] bugfix: TestFutureSetStateRaceCondition data race A use-case from our code: a response per AppendPush(). It looks like it's enough to change the test. Part of #218 --- future_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/future_test.go b/future_test.go index bae974158..e7e9ab507 100644 --- a/future_test.go +++ b/future_test.go @@ -237,13 +237,13 @@ func TestFutureGetIteratorError(t *testing.T) { func TestFutureSetStateRaceCondition(t *testing.T) { err := errors.New("any error") resp := &Response{} - respAppend := &Response{} for i := 0; i < 1000; i++ { fut := NewFuture() for j := 0; j < 9; j++ { go func(opt int) { if opt%3 == 0 { + respAppend := &Response{} fut.AppendPush(respAppend) } else if opt%3 == 1 { fut.SetError(err) From 08d4b1f864075a0b64579e7919771bea450d118f Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Mon, 9 Jan 2023 17:58:59 +0300 Subject: [PATCH 05/12] bugfix: ConnectionMulti update addresses data race We need to update an addresses slice under a lock because we use the slice in ConnectionMulti.getCurrentConnection(). Part of #218 --- multi/multi.go | 2 ++ multi/multi_test.go | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/multi/multi.go b/multi/multi.go index ea950cfdf..7b4b65839 100644 --- a/multi/multi.go +++ b/multi/multi.go @@ -212,7 +212,9 @@ func (connMulti *ConnectionMulti) checker() { connMulti.deleteConnectionFromPool(v) } } + connMulti.mutex.Lock() connMulti.addrs = addrs + connMulti.mutex.Unlock() } case <-timer.C: for _, addr := range connMulti.addrs { diff --git a/multi/multi_test.go b/multi/multi_test.go index ad4fd1962..ad98912c2 100644 --- a/multi/multi_test.go +++ b/multi/multi_test.go @@ -200,13 +200,18 @@ func TestRefresh(t *testing.T) { t.Errorf("conn is nil after Connect") return } + + multiConn.mutex.RLock() curAddr := multiConn.addrs[0] + multiConn.mutex.RUnlock() // Wait for refresh timer. // Scenario 1 nodeload, 1 refresh, 1 nodeload. time.Sleep(10 * time.Second) + multiConn.mutex.RLock() newAddr := multiConn.addrs[0] + multiConn.mutex.RUnlock() if curAddr == newAddr { t.Errorf("Expect address refresh") From 62508712217eefc5fc860d8247e1003ae3a999bf Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Mon, 9 Jan 2023 18:00:07 +0300 Subject: [PATCH 06/12] bugfix: ConnectionMulti.Close() data race We need to use an atomic method to update the atomic value. Part of #218 --- multi/multi.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/multi/multi.go b/multi/multi.go index 7b4b65839..6aba2a426 100644 --- a/multi/multi.go +++ b/multi/multi.go @@ -263,7 +263,7 @@ func (connMulti *ConnectionMulti) Close() (err error) { defer connMulti.mutex.Unlock() close(connMulti.control) - connMulti.state = connClosed + atomic.StoreUint32(&connMulti.state, connClosed) for _, conn := range connMulti.pool { if err == nil { From 0d6e3979e3bbac14cce9e2bfdc8f73b6828007bb Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Mon, 9 Jan 2023 18:00:59 +0300 Subject: [PATCH 07/12] bugfix: poolWatcher.Unregister() data race You need to protect the poolWatcher.unregistered variable to avoid the data race. It does not look too critical because we don't expect that performance of poolWatcher.Unregister() is matter in cuncurrent calls case. It could also lead to crashes. Part of #218 --- connection_pool/watcher.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/connection_pool/watcher.go b/connection_pool/watcher.go index 2876f90bc..d60cfa171 100644 --- a/connection_pool/watcher.go +++ b/connection_pool/watcher.go @@ -33,7 +33,7 @@ func (c *watcherContainer) remove(watcher *poolWatcher) bool { if watcher == c.head { c.head = watcher.next return true - } else { + } else if c.head != nil { cur := c.head for cur.next != nil { if cur.next == watcher { @@ -85,7 +85,11 @@ type poolWatcher struct { // Unregister unregisters the pool watcher. func (w *poolWatcher) Unregister() { - if !w.unregistered && w.container.remove(w) { + w.mutex.Lock() + unregistered := w.unregistered + w.mutex.Unlock() + + if !unregistered && w.container.remove(w) { w.mutex.Lock() w.unregistered = true for _, watcher := range w.watchers { From ae38068419948dba4dd5e04006bc137336eaf8fe Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Mon, 9 Jan 2023 18:54:01 +0300 Subject: [PATCH 08/12] bugfix: TestConnectionHandlerOpenUpdateClose data race It is better to use atomic counters. Part of #218 --- connection_pool/connection_pool_test.go | 44 ++++++++++++++----------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index 970612b5e..b4451074c 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -7,6 +7,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "testing" "time" @@ -235,9 +236,8 @@ func TestClose(t *testing.T) { } type testHandler struct { - discovered, deactivated int + discovered, deactivated uint32 errs []error - mutex sync.Mutex } func (h *testHandler) addErr(err error) { @@ -246,10 +246,7 @@ func (h *testHandler) addErr(err error) { func (h *testHandler) Discovered(conn *tarantool.Connection, role connection_pool.Role) error { - h.mutex.Lock() - defer h.mutex.Unlock() - - h.discovered++ + discovered := atomic.AddUint32(&h.discovered, 1) if conn == nil { h.addErr(fmt.Errorf("discovered conn == nil")) @@ -260,14 +257,14 @@ func (h *testHandler) Discovered(conn *tarantool.Connection, // discovered >= 3 - update a connection after a role update addr := conn.Addr() if addr == servers[0] { - if h.discovered < 3 && role != connection_pool.MasterRole { + if discovered < 3 && role != connection_pool.MasterRole { h.addErr(fmt.Errorf("unexpected init role %d for addr %s", role, addr)) } - if h.discovered >= 3 && role != connection_pool.ReplicaRole { + if discovered >= 3 && role != connection_pool.ReplicaRole { h.addErr(fmt.Errorf("unexpected updated role %d for addr %s", role, addr)) } } else if addr == servers[1] { - if h.discovered >= 3 { + if discovered >= 3 { h.addErr(fmt.Errorf("unexpected discovery for addr %s", addr)) } if role != connection_pool.ReplicaRole { @@ -282,10 +279,7 @@ func (h *testHandler) Discovered(conn *tarantool.Connection, func (h *testHandler) Deactivated(conn *tarantool.Connection, role connection_pool.Role) error { - h.mutex.Lock() - defer h.mutex.Unlock() - - h.deactivated++ + deactivated := atomic.AddUint32(&h.deactivated, 1) if conn == nil { h.addErr(fmt.Errorf("removed conn == nil")) @@ -293,7 +287,7 @@ func (h *testHandler) Deactivated(conn *tarantool.Connection, } addr := conn.Addr() - if h.deactivated == 1 && addr == servers[0] { + if deactivated == 1 && addr == servers[0] { // A first close is a role update. if role != connection_pool.MasterRole { h.addErr(fmt.Errorf("unexpected removed role %d for addr %s", role, addr)) @@ -337,19 +331,24 @@ func TestConnectionHandlerOpenUpdateClose(t *testing.T) { for i := 0; i < 100; i++ { // Wait for read_only update, it should report about close connection // with old role. - if h.discovered >= 3 { + if atomic.LoadUint32(&h.discovered) >= 3 { break } time.Sleep(poolOpts.CheckTimeout) } - require.Equalf(t, h.deactivated, 1, "updated not reported as deactivated") - require.Equalf(t, h.discovered, 3, "updated not reported as discovered") + + discovered := atomic.LoadUint32(&h.discovered) + deactivated := atomic.LoadUint32(&h.deactivated) + require.Equalf(t, uint32(3), discovered, + "updated not reported as discovered") + require.Equalf(t, uint32(1), deactivated, + "updated not reported as deactivated") pool.Close() for i := 0; i < 100; i++ { // Wait for close of all connections. - if h.deactivated >= 3 { + if atomic.LoadUint32(&h.deactivated) >= 3 { break } time.Sleep(poolOpts.CheckTimeout) @@ -361,8 +360,13 @@ func TestConnectionHandlerOpenUpdateClose(t *testing.T) { connected, err := pool.ConnectedNow(connection_pool.ANY) require.Nilf(t, err, "failed to get connected state") require.Falsef(t, connected, "connection pool still be connected") - require.Equalf(t, len(poolServers)+1, h.discovered, "unexpected discovered count") - require.Equalf(t, len(poolServers)+1, h.deactivated, "unexpected deactivated count") + + discovered = atomic.LoadUint32(&h.discovered) + deactivated = atomic.LoadUint32(&h.deactivated) + require.Equalf(t, uint32(len(poolServers)+1), discovered, + "unexpected discovered count") + require.Equalf(t, uint32(len(poolServers)+1), deactivated, + "unexpected deactivated count") } type testAddErrorHandler struct { From 63f7f9aba7a0153c952ffde2283fec1378c8575e Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Mon, 9 Jan 2023 19:02:42 +0300 Subject: [PATCH 09/12] bugfix: TestConnectionHandlerUpdateError data race It is better to use atomic counters. Part of #218 --- connection_pool/connection_pool_test.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index b4451074c..860786fbe 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -403,18 +403,14 @@ func TestConnectionHandlerOpenError(t *testing.T) { } type testUpdateErrorHandler struct { - discovered, deactivated int - mutex sync.Mutex + discovered, deactivated uint32 } func (h *testUpdateErrorHandler) Discovered(conn *tarantool.Connection, role connection_pool.Role) error { - h.mutex.Lock() - defer h.mutex.Unlock() - - h.discovered++ + atomic.AddUint32(&h.discovered, 1) - if h.deactivated != 0 { + if atomic.LoadUint32(&h.deactivated) != 0 { // Don't add a connection into a pool again after it was deleted. return fmt.Errorf("any error") } @@ -423,10 +419,7 @@ func (h *testUpdateErrorHandler) Discovered(conn *tarantool.Connection, func (h *testUpdateErrorHandler) Deactivated(conn *tarantool.Connection, role connection_pool.Role) error { - h.mutex.Lock() - defer h.mutex.Unlock() - - h.deactivated++ + atomic.AddUint32(&h.deactivated, 1) return nil } @@ -477,7 +470,9 @@ func TestConnectionHandlerUpdateError(t *testing.T) { require.Nilf(t, err, "failed to get ConnectedNow()") require.Falsef(t, connected, "should be deactivated") - require.GreaterOrEqualf(t, h.discovered, h.deactivated, "discovered < deactivated") + discovered := atomic.LoadUint32(&h.discovered) + deactivated := atomic.LoadUint32(&h.deactivated) + require.GreaterOrEqualf(t, discovered, deactivated, "discovered < deactivated") require.Nilf(t, err, "failed to get ConnectedNow()") } From 9ec835055315a5beee268ba35b108db26f509da9 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Mon, 16 Jan 2023 13:30:51 +0300 Subject: [PATCH 10/12] bugfix: queue/Example_connectionPool data race Part of #218 --- queue/example_connection_pool_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/queue/example_connection_pool_test.go b/queue/example_connection_pool_test.go index 214e67274..e41cdc639 100644 --- a/queue/example_connection_pool_test.go +++ b/queue/example_connection_pool_test.go @@ -89,8 +89,8 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection, } } - atomic.AddInt32(&h.masterCnt, 1) fmt.Printf("Master %s is ready to work!\n", conn.Addr()) + atomic.AddInt32(&h.masterCnt, 1) return nil } @@ -185,8 +185,12 @@ func Example_connectionPool() { // Wait for a new master instance re-identification. <-h.masterUpdated - if h.err != nil { - fmt.Printf("Unable to re-identify in the pool: %s", h.err) + h.mutex.Lock() + err = h.err + h.mutex.Unlock() + + if err != nil { + fmt.Printf("Unable to re-identify in the pool: %s", err) return } From ba53a9532a11d74df851c0f42f396e412d350a7d Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Mon, 9 Jan 2023 18:29:48 +0300 Subject: [PATCH 11/12] ci: add tests with data race detector Tests execution result may differ due to different timings, so it is better to test together, rather than instead. Closes #218 --- .github/workflows/testing.yml | 20 ++++++++++++++++---- CHANGELOG.md | 3 +++ CONTRIBUTING.md | 7 +++++++ Makefile | 6 ++++++ 4 files changed, 32 insertions(+), 4 deletions(-) diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index c4516e5bd..05fee7835 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -97,16 +97,24 @@ jobs: run: make deps - name: Run regression tests - run: make test + run: | + make test + make testrace - name: Run regression tests with call_17 - run: make test TAGS="go_tarantool_call_17" + run: | + make test TAGS="go_tarantool_call_17" + make testrace TAGS="go_tarantool_call_17" - name: Run regression tests with msgpack.v5 - run: make test TAGS="go_tarantool_msgpack_v5" + run: | + make test TAGS="go_tarantool_msgpack_v5" + make testrace TAGS="go_tarantool_msgpack_v5" - name: Run regression tests with msgpack.v5 and call_17 - run: make test TAGS="go_tarantool_msgpack_v5,go_tarantool_call_17" + run: | + make test TAGS="go_tarantool_msgpack_v5,go_tarantool_call_17" + make testrace TAGS="go_tarantool_msgpack_v5,go_tarantool_call_17" - name: Run fuzzing tests if: ${{ matrix.fuzzing }} @@ -189,6 +197,7 @@ jobs: run: | source tarantool-enterprise/env.sh make test + make testrace env: TEST_TNT_SSL: ${{matrix.ssl}} @@ -196,6 +205,7 @@ jobs: run: | source tarantool-enterprise/env.sh make test TAGS="go_tarantool_call_17" + make testrace TAGS="go_tarantool_call_17" env: TEST_TNT_SSL: ${{matrix.ssl}} @@ -203,6 +213,7 @@ jobs: run: | source tarantool-enterprise/env.sh make test TAGS="go_tarantool_msgpack_v5" + make testrace TAGS="go_tarantool_msgpack_v5" env: TEST_TNT_SSL: ${{matrix.ssl}} @@ -210,6 +221,7 @@ jobs: run: | source tarantool-enterprise/env.sh make test TAGS="go_tarantool_msgpack_v5,go_tarantool_call_17" + make testrace TAGS="go_tarantool_msgpack_v5,go_tarantool_call_17" env: TEST_TNT_SSL: ${{matrix.ssl}} diff --git a/CHANGELOG.md b/CHANGELOG.md index acfe2a914..48b38049e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,11 +11,14 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ### Added - Support pagination (#246) +- A Makefile target to test with race detector (#218) ### Changed ### Fixed +- Several non-critical data race issues (#218) + ## [1.10.0] - 2022-12-31 The release improves compatibility with new Tarantool versions. diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 15bccd3a6..ba0f0c7ab 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -26,12 +26,18 @@ To run tests for the main package and each subpackage: make test ``` +To run tests for the main package and each subpackage with race detector: +```bash +make testrace +``` + The tests set up all required `tarantool` processes before run and clean up afterwards. If you want to run the tests with specific build tags: ```bash make test TAGS=go_tarantool_ssl_disable,go_tarantool_msgpack_v5 +make testrace TAGS=go_tarantool_ssl_disable,go_tarantool_msgpack_v5 ``` If you have Tarantool Enterprise Edition 2.10 or newer, you can run additional @@ -39,6 +45,7 @@ SSL tests. To do this, you need to set an environment variable 'TEST_TNT_SSL': ```bash TEST_TNT_SSL=true make test +TEST_TNT_SSL=true make testrace ``` If you want to run the tests for a specific package: diff --git a/Makefile b/Makefile index be1f6757f..60016ee3c 100644 --- a/Makefile +++ b/Makefile @@ -50,6 +50,12 @@ test: testdata: (cd ./testdata; ./generate.sh) +.PHONY: testrace +testrace: + @echo "Running all packages tests with data race detector" + go clean -testcache + go test -race -tags "$(TAGS)" ./... -v -p 1 + .PHONY: test-connection-pool test-connection-pool: @echo "Running tests in connection_pool package" From a6dd70ebf20735656c0e74d64d0f3fc1aa292cc2 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Mon, 16 Jan 2023 13:14:53 +0300 Subject: [PATCH 12/12] ci: fix benchstat build with go 1.13 --- Makefile | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 60016ee3c..34de54707 100644 --- a/Makefile +++ b/Makefile @@ -123,7 +123,13 @@ ${BENCH_PATH} bench-deps: rm -rf ${BENCH_PATH} mkdir ${BENCH_PATH} go clean -testcache - cd ${BENCH_PATH} && git clone https://go.googlesource.com/perf && cd perf && go install ./cmd/benchstat + # It is unable to build a latest version of benchstat with go 1.13. So + # we need to switch to an old commit. + cd ${BENCH_PATH} && \ + git clone https://go.googlesource.com/perf && \ + cd perf && \ + git checkout 91a04616dc65ba76dbe9e5cf746b923b1402d303 && \ + go install ./cmd/benchstat rm -rf ${BENCH_PATH}/perf .PHONY: bench