Skip to content

PYTHON-4579 Stop gossiping $clusterTime on SDAM connections #1925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions pymongo/asynchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import logging
import time
import weakref
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast
from typing import TYPE_CHECKING, Any, Optional

from pymongo import common, periodic_executor
from pymongo._csot import MovingMinimum
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
from pymongo.errors import NetworkTimeout, _OperationCancelled
from pymongo.hello import Hello
from pymongo.lock import _async_create_lock
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
Expand Down Expand Up @@ -255,13 +255,7 @@ async def _check_server(self) -> ServerDescription:
self._conn_id = None
start = time.monotonic()
try:
try:
return await self._check_once()
except (OperationFailure, NotPrimaryError) as exc:
# Update max cluster time even when hello fails.
details = cast(Mapping[str, Any], exc.details)
await self._topology.receive_cluster_time(details.get("$clusterTime"))
raise
return await self._check_once()
except ReferenceError:
raise
except Exception as error:
Expand Down Expand Up @@ -358,7 +352,6 @@ async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float]

Can raise ConnectionFailure or OperationFailure.
"""
cluster_time = self._topology.max_cluster_time()
start = time.monotonic()
if conn.more_to_come:
# Read the next streaming hello (MongoDB 4.4+).
Expand All @@ -368,13 +361,12 @@ async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float]
):
# Initiate streaming hello (MongoDB 4.4+).
response = await conn._hello(
cluster_time,
self._server_description.topology_version,
self._settings.heartbeat_frequency,
)
else:
# New connection handshake or polling hello (MongoDB <4.4).
response = await conn._hello(cluster_time, None, None)
response = await conn._hello(None, None)
duration = _monotonic_duration(start)
return response, duration

Expand Down
4 changes: 4 additions & 0 deletions pymongo/asynchronous/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ async def command(
)

response_doc = unpacked_docs[0]
if not conn.ready:
cluster_time = response_doc.get("$clusterTime")
if cluster_time:
conn._cluster_time = cluster_time
if client:
await client._process_response(response_doc, session)
if check:
Expand Down
13 changes: 7 additions & 6 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
from pymongo.pyopenssl_context import _sslConn
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import _ServerMode
from pymongo.typings import ClusterTime, _Address, _CollationIn
from pymongo.typings import _Address, _CollationIn
from pymongo.write_concern import WriteConcern

try:
Expand Down Expand Up @@ -310,6 +310,8 @@ def __init__(
self.connect_rtt = 0.0
self._client_id = pool._client_id
self.creation_time = time.monotonic()
# For gossiping $clusterTime from the connection handshake to the client.
self._cluster_time = None

def set_conn_timeout(self, timeout: Optional[float]) -> None:
"""Cache last timeout to avoid duplicate calls to conn.settimeout."""
Expand Down Expand Up @@ -374,11 +376,10 @@ def hello_cmd(self) -> dict[str, Any]:
return {HelloCompat.LEGACY_CMD: 1, "helloOk": True}

async def hello(self) -> Hello:
return await self._hello(None, None, None)
return await self._hello(None, None)

async def _hello(
self,
cluster_time: Optional[ClusterTime],
topology_version: Optional[Any],
heartbeat_frequency: Optional[int],
) -> Hello[dict[str, Any]]:
Expand All @@ -401,9 +402,6 @@ async def _hello(
if self.opts.connect_timeout:
self.set_conn_timeout(self.opts.connect_timeout + heartbeat_frequency)

if not performing_handshake and cluster_time is not None:
cmd["$clusterTime"] = cluster_time

creds = self.opts._credentials
if creds:
if creds.mechanism == "DEFAULT" and creds.username:
Expand Down Expand Up @@ -1316,6 +1314,9 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
conn.close_conn(ConnectionClosedReason.ERROR)
raise

if handler:
await handler.client._topology.receive_cluster_time(conn._cluster_time)

return conn

@contextlib.asynccontextmanager
Expand Down
1 change: 0 additions & 1 deletion pymongo/asynchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,6 @@ async def _process_change(

self._description = new_td
await self._update_servers()
self._receive_cluster_time_no_lock(server_description.cluster_time)

if self._publish_tp and not suppress_event:
assert self._events is not None
Expand Down
16 changes: 4 additions & 12 deletions pymongo/synchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import logging
import time
import weakref
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast
from typing import TYPE_CHECKING, Any, Optional

from pymongo import common, periodic_executor
from pymongo._csot import MovingMinimum
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
from pymongo.errors import NetworkTimeout, _OperationCancelled
from pymongo.hello import Hello
from pymongo.lock import _create_lock
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
Expand Down Expand Up @@ -253,13 +253,7 @@ def _check_server(self) -> ServerDescription:
self._conn_id = None
start = time.monotonic()
try:
try:
return self._check_once()
except (OperationFailure, NotPrimaryError) as exc:
# Update max cluster time even when hello fails.
details = cast(Mapping[str, Any], exc.details)
self._topology.receive_cluster_time(details.get("$clusterTime"))
raise
return self._check_once()
except ReferenceError:
raise
except Exception as error:
Expand Down Expand Up @@ -356,7 +350,6 @@ def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]:

Can raise ConnectionFailure or OperationFailure.
"""
cluster_time = self._topology.max_cluster_time()
start = time.monotonic()
if conn.more_to_come:
# Read the next streaming hello (MongoDB 4.4+).
Expand All @@ -366,13 +359,12 @@ def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]:
):
# Initiate streaming hello (MongoDB 4.4+).
response = conn._hello(
cluster_time,
self._server_description.topology_version,
self._settings.heartbeat_frequency,
)
else:
# New connection handshake or polling hello (MongoDB <4.4).
response = conn._hello(cluster_time, None, None)
response = conn._hello(None, None)
duration = _monotonic_duration(start)
return response, duration

Expand Down
4 changes: 4 additions & 0 deletions pymongo/synchronous/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ def command(
)

response_doc = unpacked_docs[0]
if not conn.ready:
cluster_time = response_doc.get("$clusterTime")
if cluster_time:
conn._cluster_time = cluster_time
if client:
client._process_response(response_doc, session)
if check:
Expand Down
13 changes: 7 additions & 6 deletions pymongo/synchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
from pymongo.synchronous.auth import _AuthContext
from pymongo.synchronous.client_session import ClientSession
from pymongo.synchronous.mongo_client import MongoClient, _MongoClientErrorHandler
from pymongo.typings import ClusterTime, _Address, _CollationIn
from pymongo.typings import _Address, _CollationIn
from pymongo.write_concern import WriteConcern

try:
Expand Down Expand Up @@ -310,6 +310,8 @@ def __init__(
self.connect_rtt = 0.0
self._client_id = pool._client_id
self.creation_time = time.monotonic()
# For gossiping $clusterTime from the connection handshake to the client.
self._cluster_time = None

def set_conn_timeout(self, timeout: Optional[float]) -> None:
"""Cache last timeout to avoid duplicate calls to conn.settimeout."""
Expand Down Expand Up @@ -374,11 +376,10 @@ def hello_cmd(self) -> dict[str, Any]:
return {HelloCompat.LEGACY_CMD: 1, "helloOk": True}

def hello(self) -> Hello:
return self._hello(None, None, None)
return self._hello(None, None)

def _hello(
self,
cluster_time: Optional[ClusterTime],
topology_version: Optional[Any],
heartbeat_frequency: Optional[int],
) -> Hello[dict[str, Any]]:
Expand All @@ -401,9 +402,6 @@ def _hello(
if self.opts.connect_timeout:
self.set_conn_timeout(self.opts.connect_timeout + heartbeat_frequency)

if not performing_handshake and cluster_time is not None:
cmd["$clusterTime"] = cluster_time

creds = self.opts._credentials
if creds:
if creds.mechanism == "DEFAULT" and creds.username:
Expand Down Expand Up @@ -1310,6 +1308,9 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
conn.close_conn(ConnectionClosedReason.ERROR)
raise

if handler:
handler.client._topology.receive_cluster_time(conn._cluster_time)

return conn

@contextlib.contextmanager
Expand Down
1 change: 0 additions & 1 deletion pymongo/synchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,6 @@ def _process_change(

self._description = new_td
self._update_servers()
self._receive_cluster_time_no_lock(server_description.cluster_time)

if self._publish_tp and not suppress_event:
assert self._events is not None
Expand Down
42 changes: 38 additions & 4 deletions test/asynchronous/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
async_client_context,
unittest,
)
from test.asynchronous.helpers import client_knobs
from test.utils import (
EventListener,
HeartbeatEventListener,
OvertCommandListener,
async_wait_until,
)
Expand Down Expand Up @@ -1135,12 +1137,10 @@ async def asyncSetUp(self):
if "$clusterTime" not in (await async_client_context.hello):
raise SkipTest("$clusterTime not supported")

# Sessions prose test: 3) $clusterTime in commands
async def test_cluster_time(self):
listener = SessionTestListener()
# Prevent heartbeats from updating $clusterTime between operations.
client = await self.async_rs_or_single_client(
event_listeners=[listener], heartbeatFrequencyMS=999999
)
client = await self.async_rs_or_single_client(event_listeners=[listener])
collection = client.pymongo_test.collection
# Prepare for tests of find() and aggregate().
await collection.insert_many([{} for _ in range(10)])
Expand Down Expand Up @@ -1219,6 +1219,40 @@ async def aggregate():
f"{f.__name__} sent wrong $clusterTime with {event.command_name}",
)

# Sessions prose test: 20) Drivers do not gossip `$clusterTime` on SDAM commands
async def test_cluster_time_not_used_by_sdam(self):
heartbeat_listener = HeartbeatEventListener()
cmd_listener = OvertCommandListener()
with client_knobs(min_heartbeat_interval=0.01):
c1 = await self.async_single_client(
event_listeners=[heartbeat_listener, cmd_listener], heartbeatFrequencyMS=10
)
cluster_time = (await c1.admin.command({"ping": 1}))["$clusterTime"]
self.assertEqual(c1._topology.max_cluster_time(), cluster_time)

# Advance the server's $clusterTime by performing an insert via another client.
await self.db.test.insert_one({"advance": "$clusterTime"})
# Wait until the client C1 processes the next pair of SDAM heartbeat started + succeeded events.
heartbeat_listener.reset()

async def next_heartbeat():
events = heartbeat_listener.events
for i in range(len(events) - 1):
if isinstance(events[i], monitoring.ServerHeartbeatStartedEvent):
if isinstance(events[i + 1], monitoring.ServerHeartbeatSucceededEvent):
return True
return False

await async_wait_until(
next_heartbeat, "never found pair of heartbeat started + succeeded events"
)
# Assert that C1's max $clusterTime is still the same and has not been updated by SDAM.
cmd_listener.reset()
await c1.admin.command({"ping": 1})
started = cmd_listener.started_events[0]
self.assertEqual(started.command_name, "ping")
self.assertEqual(started.command["$clusterTime"], cluster_time)


if __name__ == "__main__":
unittest.main()
20 changes: 9 additions & 11 deletions test/test_discovery_and_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class TestClusterTimeComparison(unittest.TestCase):
def test_cluster_time_comparison(self):
t = create_mock_topology("mongodb://host")

def send_cluster_time(time, inc, should_update):
def send_cluster_time(time, inc):
old = t.max_cluster_time()
new = {"clusterTime": Timestamp(time, inc)}
got_hello(
Expand All @@ -259,16 +259,14 @@ def send_cluster_time(time, inc, should_update):
)

actual = t.max_cluster_time()
if should_update:
self.assertEqual(actual, new)
else:
self.assertEqual(actual, old)

send_cluster_time(0, 1, True)
send_cluster_time(2, 2, True)
send_cluster_time(2, 1, False)
send_cluster_time(1, 3, False)
send_cluster_time(2, 3, True)
# We never update $clusterTime from monitoring connections.
self.assertEqual(actual, old)

send_cluster_time(0, 1)
send_cluster_time(2, 2)
send_cluster_time(2, 1)
send_cluster_time(1, 3)
send_cluster_time(2, 3)


class TestIgnoreStaleErrors(IntegrationTest):
Expand Down
Loading
Loading