Skip to content

Commit 46e4660

Browse files
committed
PYTHON-4579 Stop gossiping $clusterTime on SDAM connections
1 parent cfe7784 commit 46e4660

File tree

5 files changed

+34
-30
lines changed

5 files changed

+34
-30
lines changed

pymongo/asynchronous/monitor.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
import logging
2222
import time
2323
import weakref
24-
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast
24+
from typing import TYPE_CHECKING, Any, Optional
2525

2626
from pymongo import common, periodic_executor
2727
from pymongo._csot import MovingMinimum
28-
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
28+
from pymongo.errors import NetworkTimeout, _OperationCancelled
2929
from pymongo.hello import Hello
3030
from pymongo.lock import _async_create_lock
3131
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
@@ -250,13 +250,7 @@ async def _check_server(self) -> ServerDescription:
250250
self._conn_id = None
251251
start = time.monotonic()
252252
try:
253-
try:
254-
return await self._check_once()
255-
except (OperationFailure, NotPrimaryError) as exc:
256-
# Update max cluster time even when hello fails.
257-
details = cast(Mapping[str, Any], exc.details)
258-
await self._topology.receive_cluster_time(details.get("$clusterTime"))
259-
raise
253+
return await self._check_once()
260254
except asyncio.CancelledError:
261255
raise
262256
except ReferenceError:
@@ -355,7 +349,6 @@ async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float]
355349
356350
Can raise ConnectionFailure or OperationFailure.
357351
"""
358-
cluster_time = self._topology.max_cluster_time()
359352
start = time.monotonic()
360353
if conn.more_to_come:
361354
# Read the next streaming hello (MongoDB 4.4+).
@@ -365,13 +358,13 @@ async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float]
365358
):
366359
# Initiate streaming hello (MongoDB 4.4+).
367360
response = await conn._hello(
368-
cluster_time,
361+
None,
369362
self._server_description.topology_version,
370363
self._settings.heartbeat_frequency,
371364
)
372365
else:
373366
# New connection handshake or polling hello (MongoDB <4.4).
374-
response = await conn._hello(cluster_time, None, None)
367+
response = await conn._hello(None, None, None)
375368
duration = _monotonic_duration(start)
376369
return response, duration
377370

pymongo/asynchronous/topology.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,8 @@ async def _process_change(
493493

494494
self._description = new_td
495495
await self._update_servers()
496-
self._receive_cluster_time_no_lock(server_description.cluster_time)
496+
# TODO: Verify that app errors update the $clusterTime.
497+
# self._receive_cluster_time_no_lock(server_description.cluster_time)
497498

498499
if self._publish_tp and not suppress_event:
499500
assert self._events is not None

pymongo/synchronous/monitor.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
import logging
2222
import time
2323
import weakref
24-
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast
24+
from typing import TYPE_CHECKING, Any, Optional
2525

2626
from pymongo import common, periodic_executor
2727
from pymongo._csot import MovingMinimum
28-
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
28+
from pymongo.errors import NetworkTimeout, _OperationCancelled
2929
from pymongo.hello import Hello
3030
from pymongo.lock import _create_lock
3131
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
@@ -250,6 +250,7 @@ def _check_server(self) -> ServerDescription:
250250
self._conn_id = None
251251
start = time.monotonic()
252252
try:
253+
<<<<<<< HEAD
253254
try:
254255
return self._check_once()
255256
except (OperationFailure, NotPrimaryError) as exc:
@@ -259,6 +260,17 @@ def _check_server(self) -> ServerDescription:
259260
raise
260261
except asyncio.CancelledError:
261262
raise
263+
||||||| parent of 14c8432bc (PYTHON-4579 Stop gossiping $clusterTime on SDAM connections)
264+
try:
265+
return self._check_once()
266+
except (OperationFailure, NotPrimaryError) as exc:
267+
# Update max cluster time even when hello fails.
268+
details = cast(Mapping[str, Any], exc.details)
269+
self._topology.receive_cluster_time(details.get("$clusterTime"))
270+
raise
271+
=======
272+
return self._check_once()
273+
>>>>>>> 14c8432bc (PYTHON-4579 Stop gossiping $clusterTime on SDAM connections)
262274
except ReferenceError:
263275
raise
264276
except Exception as error:
@@ -355,7 +367,6 @@ def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]:
355367
356368
Can raise ConnectionFailure or OperationFailure.
357369
"""
358-
cluster_time = self._topology.max_cluster_time()
359370
start = time.monotonic()
360371
if conn.more_to_come:
361372
# Read the next streaming hello (MongoDB 4.4+).
@@ -365,13 +376,13 @@ def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]:
365376
):
366377
# Initiate streaming hello (MongoDB 4.4+).
367378
response = conn._hello(
368-
cluster_time,
379+
None,
369380
self._server_description.topology_version,
370381
self._settings.heartbeat_frequency,
371382
)
372383
else:
373384
# New connection handshake or polling hello (MongoDB <4.4).
374-
response = conn._hello(cluster_time, None, None)
385+
response = conn._hello(None, None, None)
375386
duration = _monotonic_duration(start)
376387
return response, duration
377388

pymongo/synchronous/topology.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,8 @@ def _process_change(
493493

494494
self._description = new_td
495495
self._update_servers()
496-
self._receive_cluster_time_no_lock(server_description.cluster_time)
496+
# TODO: Verify that app errors update the $clusterTime.
497+
# self._receive_cluster_time_no_lock(server_description.cluster_time)
497498

498499
if self._publish_tp and not suppress_event:
499500
assert self._events is not None

test/test_discovery_and_monitoring.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ class TestClusterTimeComparison(unittest.TestCase):
244244
def test_cluster_time_comparison(self):
245245
t = create_mock_topology("mongodb://host")
246246

247-
def send_cluster_time(time, inc, should_update):
247+
def send_cluster_time(time, inc):
248248
old = t.max_cluster_time()
249249
new = {"clusterTime": Timestamp(time, inc)}
250250
got_hello(
@@ -259,16 +259,14 @@ def send_cluster_time(time, inc, should_update):
259259
)
260260

261261
actual = t.max_cluster_time()
262-
if should_update:
263-
self.assertEqual(actual, new)
264-
else:
265-
self.assertEqual(actual, old)
266-
267-
send_cluster_time(0, 1, True)
268-
send_cluster_time(2, 2, True)
269-
send_cluster_time(2, 1, False)
270-
send_cluster_time(1, 3, False)
271-
send_cluster_time(2, 3, True)
262+
# We never update $clusterTime from monitoring connections.
263+
self.assertEqual(actual, old)
264+
265+
send_cluster_time(0, 1)
266+
send_cluster_time(2, 2)
267+
send_cluster_time(2, 1)
268+
send_cluster_time(1, 3)
269+
send_cluster_time(2, 3)
272270

273271

274272
class TestIgnoreStaleErrors(IntegrationTest):

0 commit comments

Comments
 (0)