Skip to content

GODRIVER-3516 Fix the issue in getOrQueueForIdleConn where connectionPerished conn.… #2051

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions x/mongo/driver/topology/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"fmt"
"io"
"net"
"os"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -526,48 +525,6 @@ func (c *connection) closed() bool {
return atomic.LoadInt64(&c.state) == connDisconnected
}

// isAlive returns true if the connection is alive and ready to be used for an
// operation.
//
// Note that the liveness check can be slow (at least 1ms), so isAlive only
// checks the liveness of the connection if it's been idle for at least 10
// seconds. For frequently in-use connections, a network error during an
// operation will be the first indication of a dead connection.
func (c *connection) isAlive() bool {
if c.nc == nil {
return false
}

// If the connection has been idle for less than 10 seconds, skip the
// liveness check.
//
// The 10-seconds idle bypass is based on the liveness check implementation
// in the Python Driver. That implementation uses 1 second as the idle
// threshold, but we chose to be more conservative in the Go Driver because
// this is new behavior with unknown side-effects. See
// https://github.com/mongodb/mongo-python-driver/blob/e6b95f65953e01e435004af069a6976473eaf841/pymongo/synchronous/pool.py#L983-L985
idleStart, ok := c.idleStart.Load().(time.Time)
if !ok || idleStart.Add(10*time.Second).After(time.Now()) {
return true
}

// Set a 1ms read deadline and attempt to read 1 byte from the connection.
// Expect it to block for 1ms then return a deadline exceeded error. If it
// returns any other error, the connection is not usable, so return false.
// If it doesn't return an error and actually reads data, the connection is
// also not usable, so return false.
//
// Note that we don't need to un-set the read deadline because the "read"
// and "write" methods always reset the deadlines.
err := c.nc.SetReadDeadline(time.Now().Add(1 * time.Millisecond))
if err != nil {
return false
}
var b [1]byte
_, err = c.nc.Read(b[:])
return errors.Is(err, os.ErrDeadlineExceeded)
}

func (c *connection) idleTimeoutExpired() bool {
if c.idleTimeout == 0 {
return false
Expand Down
82 changes: 0 additions & 82 deletions x/mongo/driver/topology/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,85 +1161,3 @@ func TestConnectionError(t *testing.T) {
assert.ErrorContains(t, err, "client timed out waiting for server response")
})
}

func TestConnection_IsAlive(t *testing.T) {
t.Parallel()

t.Run("uninitialized", func(t *testing.T) {
t.Parallel()

conn := newConnection("")
assert.False(t,
conn.isAlive(),
"expected isAlive for an uninitialized connection to always return false")
})

t.Run("connection open", func(t *testing.T) {
t.Parallel()

cleanup := make(chan struct{})
defer close(cleanup)
addr := bootstrapConnections(t, 1, func(nc net.Conn) {
// Keep the connection open until the end of the test.
<-cleanup
_ = nc.Close()
})

conn := newConnection(address.Address(addr.String()))
err := conn.connect(context.Background())
require.NoError(t, err)

conn.idleStart.Store(time.Now().Add(-11 * time.Second))
assert.True(t,
conn.isAlive(),
"expected isAlive for an open connection to return true")
})

t.Run("connection closed", func(t *testing.T) {
t.Parallel()

conns := make(chan net.Conn)
addr := bootstrapConnections(t, 1, func(nc net.Conn) {
conns <- nc
})

conn := newConnection(address.Address(addr.String()))
err := conn.connect(context.Background())
require.NoError(t, err)

// Close the connection before calling isAlive.
nc := <-conns
err = nc.Close()
require.NoError(t, err)

conn.idleStart.Store(time.Now().Add(-11 * time.Second))
assert.False(t,
conn.isAlive(),
"expected isAlive for a closed connection to return false")
})

t.Run("connection reads data", func(t *testing.T) {
t.Parallel()

cleanup := make(chan struct{})
defer close(cleanup)
addr := bootstrapConnections(t, 1, func(nc net.Conn) {
// Write some data to the connection before calling isAlive.
_, err := nc.Write([]byte{5, 0, 0, 0, 0})
require.NoError(t, err)

// Keep the connection open until the end of the test.
<-cleanup
_ = nc.Close()
})

conn := newConnection(address.Address(addr.String()))
err := conn.connect(context.Background())
require.NoError(t, err)

conn.idleStart.Store(time.Now().Add(-11 * time.Second))
assert.False(t,
conn.isAlive(),
"expected isAlive for an open connection that reads data to return false")
})
}
6 changes: 2 additions & 4 deletions x/mongo/driver/topology/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,9 @@ type reason struct {
// connectionPerished checks if a given connection is perished and should be removed from the pool.
func connectionPerished(conn *connection) (reason, bool) {
switch {
case conn.closed() || !conn.isAlive():
case conn.closed():
// A connection would only be closed if it encountered a network error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@linfeip, I appreciate your contribution! Would you mind updating the comment to match the changes? Otherwise, it looks good to me!

Copy link
Author

@linfeip linfeip May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qingyang-hu Alright, I've already updated the comments.

// during an operation and closed itself. If a connection is not alive
// (e.g. the connection was closed by the server-side), it's also
// considered a network error.
// during an operation and closed itself.
return reason{
loggerConn: logger.ReasonConnClosedError,
event: event.ReasonError,
Expand Down
73 changes: 0 additions & 73 deletions x/mongo/driver/topology/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,79 +843,6 @@ func TestPool_checkOut(t *testing.T) {
assert.Containsf(t, err.Error(), "canceled", `expected error message to contain "canceled"`)
}

p.close(context.Background())
})
t.Run("discards connections closed by the server side", func(t *testing.T) {
t.Parallel()

cleanup := make(chan struct{})
defer close(cleanup)

ncs := make(chan net.Conn, 2)
addr := bootstrapConnections(t, 2, func(nc net.Conn) {
// Send all "server-side" connections to a channel so we can
// interact with them during the test.
ncs <- nc

<-cleanup
_ = nc.Close()
})

d := newdialer(&net.Dialer{})
p := newPool(poolConfig{
Address: address.Address(addr.String()),
}, WithDialer(func(Dialer) Dialer { return d }))
err := p.ready()
require.NoError(t, err)

// Add 1 idle connection to the pool by checking-out and checking-in
// a connection.
conn, err := p.checkOut(context.Background())
require.NoError(t, err)
err = p.checkIn(conn)
require.NoError(t, err)
assertConnectionsOpened(t, d, 1)
assert.Equalf(t, 1, p.availableConnectionCount(), "should be 1 idle connections in pool")
assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool")

// Make that connection appear as if it's been idle for a minute.
conn.idleStart.Store(time.Now().Add(-1 * time.Minute))

// Close the "server-side" of the connection we just created. The idle
// connection in the pool is now unusable because the "server-side"
// closed it.
nc := <-ncs
err = nc.Close()
require.NoError(t, err)

// In a separate goroutine, write a valid wire message to the 2nd
// connection that's about to be created. Stop waiting for a 2nd
// connection after 100ms to prevent leaking a goroutine.
go func() {
select {
case nc := <-ncs:
_, err := nc.Write([]byte{5, 0, 0, 0, 0})
require.NoError(t, err, "Write error")
case <-time.After(100 * time.Millisecond):
}
}()

// Check out a connection and try to read from it. Expect the pool to
// discard the connection that was closed by the "server-side" and
// return a newly created connection instead.
conn, err = p.checkOut(context.Background())
require.NoError(t, err)
msg, err := conn.readWireMessage(context.Background())
require.NoError(t, err)
assert.Equal(t, []byte{5, 0, 0, 0, 0}, msg)

err = p.checkIn(conn)
require.NoError(t, err)

assertConnectionsOpened(t, d, 2)
assert.Equalf(t, 1, p.availableConnectionCount(), "should be 1 idle connections in pool")
assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool")

p.close(context.Background())
})
}
Expand Down