diff --git a/x/mongo/driver/topology/connection.go b/x/mongo/driver/topology/connection.go index 28ed3581e8..b1bf1d13f1 100644 --- a/x/mongo/driver/topology/connection.go +++ b/x/mongo/driver/topology/connection.go @@ -14,7 +14,6 @@ import ( "fmt" "io" "net" - "os" "strings" "sync" "sync/atomic" @@ -526,48 +525,6 @@ func (c *connection) closed() bool { return atomic.LoadInt64(&c.state) == connDisconnected } -// isAlive returns true if the connection is alive and ready to be used for an -// operation. -// -// Note that the liveness check can be slow (at least 1ms), so isAlive only -// checks the liveness of the connection if it's been idle for at least 10 -// seconds. For frequently in-use connections, a network error during an -// operation will be the first indication of a dead connection. -func (c *connection) isAlive() bool { - if c.nc == nil { - return false - } - - // If the connection has been idle for less than 10 seconds, skip the - // liveness check. - // - // The 10-seconds idle bypass is based on the liveness check implementation - // in the Python Driver. That implementation uses 1 second as the idle - // threshold, but we chose to be more conservative in the Go Driver because - // this is new behavior with unknown side-effects. See - // https://github.com/mongodb/mongo-python-driver/blob/e6b95f65953e01e435004af069a6976473eaf841/pymongo/synchronous/pool.py#L983-L985 - idleStart, ok := c.idleStart.Load().(time.Time) - if !ok || idleStart.Add(10*time.Second).After(time.Now()) { - return true - } - - // Set a 1ms read deadline and attempt to read 1 byte from the connection. - // Expect it to block for 1ms then return a deadline exceeded error. If it - // returns any other error, the connection is not usable, so return false. - // If it doesn't return an error and actually reads data, the connection is - // also not usable, so return false. - // - // Note that we don't need to un-set the read deadline because the "read" - // and "write" methods always reset the deadlines. - err := c.nc.SetReadDeadline(time.Now().Add(1 * time.Millisecond)) - if err != nil { - return false - } - var b [1]byte - _, err = c.nc.Read(b[:]) - return errors.Is(err, os.ErrDeadlineExceeded) -} - func (c *connection) idleTimeoutExpired() bool { if c.idleTimeout == 0 { return false diff --git a/x/mongo/driver/topology/connection_test.go b/x/mongo/driver/topology/connection_test.go index 4780959a97..5b2f39f272 100644 --- a/x/mongo/driver/topology/connection_test.go +++ b/x/mongo/driver/topology/connection_test.go @@ -1161,85 +1161,3 @@ func TestConnectionError(t *testing.T) { assert.ErrorContains(t, err, "client timed out waiting for server response") }) } - -func TestConnection_IsAlive(t *testing.T) { - t.Parallel() - - t.Run("uninitialized", func(t *testing.T) { - t.Parallel() - - conn := newConnection("") - assert.False(t, - conn.isAlive(), - "expected isAlive for an uninitialized connection to always return false") - }) - - t.Run("connection open", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - // Keep the connection open until the end of the test. - <-cleanup - _ = nc.Close() - }) - - conn := newConnection(address.Address(addr.String())) - err := conn.connect(context.Background()) - require.NoError(t, err) - - conn.idleStart.Store(time.Now().Add(-11 * time.Second)) - assert.True(t, - conn.isAlive(), - "expected isAlive for an open connection to return true") - }) - - t.Run("connection closed", func(t *testing.T) { - t.Parallel() - - conns := make(chan net.Conn) - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - conns <- nc - }) - - conn := newConnection(address.Address(addr.String())) - err := conn.connect(context.Background()) - require.NoError(t, err) - - // Close the connection before calling isAlive. - nc := <-conns - err = nc.Close() - require.NoError(t, err) - - conn.idleStart.Store(time.Now().Add(-11 * time.Second)) - assert.False(t, - conn.isAlive(), - "expected isAlive for a closed connection to return false") - }) - - t.Run("connection reads data", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - // Write some data to the connection before calling isAlive. - _, err := nc.Write([]byte{5, 0, 0, 0, 0}) - require.NoError(t, err) - - // Keep the connection open until the end of the test. - <-cleanup - _ = nc.Close() - }) - - conn := newConnection(address.Address(addr.String())) - err := conn.connect(context.Background()) - require.NoError(t, err) - - conn.idleStart.Store(time.Now().Add(-11 * time.Second)) - assert.False(t, - conn.isAlive(), - "expected isAlive for an open connection that reads data to return false") - }) -} diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index d6568e844f..162bb9c1af 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -169,11 +169,9 @@ type reason struct { // connectionPerished checks if a given connection is perished and should be removed from the pool. func connectionPerished(conn *connection) (reason, bool) { switch { - case conn.closed() || !conn.isAlive(): + case conn.closed(): // A connection would only be closed if it encountered a network error - // during an operation and closed itself. If a connection is not alive - // (e.g. the connection was closed by the server-side), it's also - // considered a network error. + // during an operation and closed itself. return reason{ loggerConn: logger.ReasonConnClosedError, event: event.ReasonError, diff --git a/x/mongo/driver/topology/pool_test.go b/x/mongo/driver/topology/pool_test.go index 94d4de285d..f58e1cf204 100644 --- a/x/mongo/driver/topology/pool_test.go +++ b/x/mongo/driver/topology/pool_test.go @@ -843,79 +843,6 @@ func TestPool_checkOut(t *testing.T) { assert.Containsf(t, err.Error(), "canceled", `expected error message to contain "canceled"`) } - p.close(context.Background()) - }) - t.Run("discards connections closed by the server side", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - - ncs := make(chan net.Conn, 2) - addr := bootstrapConnections(t, 2, func(nc net.Conn) { - // Send all "server-side" connections to a channel so we can - // interact with them during the test. - ncs <- nc - - <-cleanup - _ = nc.Close() - }) - - d := newdialer(&net.Dialer{}) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - }, WithDialer(func(Dialer) Dialer { return d })) - err := p.ready() - require.NoError(t, err) - - // Add 1 idle connection to the pool by checking-out and checking-in - // a connection. - conn, err := p.checkOut(context.Background()) - require.NoError(t, err) - err = p.checkIn(conn) - require.NoError(t, err) - assertConnectionsOpened(t, d, 1) - assert.Equalf(t, 1, p.availableConnectionCount(), "should be 1 idle connections in pool") - assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool") - - // Make that connection appear as if it's been idle for a minute. - conn.idleStart.Store(time.Now().Add(-1 * time.Minute)) - - // Close the "server-side" of the connection we just created. The idle - // connection in the pool is now unusable because the "server-side" - // closed it. - nc := <-ncs - err = nc.Close() - require.NoError(t, err) - - // In a separate goroutine, write a valid wire message to the 2nd - // connection that's about to be created. Stop waiting for a 2nd - // connection after 100ms to prevent leaking a goroutine. - go func() { - select { - case nc := <-ncs: - _, err := nc.Write([]byte{5, 0, 0, 0, 0}) - require.NoError(t, err, "Write error") - case <-time.After(100 * time.Millisecond): - } - }() - - // Check out a connection and try to read from it. Expect the pool to - // discard the connection that was closed by the "server-side" and - // return a newly created connection instead. - conn, err = p.checkOut(context.Background()) - require.NoError(t, err) - msg, err := conn.readWireMessage(context.Background()) - require.NoError(t, err) - assert.Equal(t, []byte{5, 0, 0, 0, 0}, msg) - - err = p.checkIn(conn) - require.NoError(t, err) - - assertConnectionsOpened(t, d, 2) - assert.Equalf(t, 1, p.availableConnectionCount(), "should be 1 idle connections in pool") - assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool") - p.close(context.Background()) }) }