From 15c4a177c2cdcff1e2396816be208c471db9f52e Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 3 Feb 2025 10:04:39 -0500 Subject: [PATCH 1/3] PYTHON-5107 - Convert test.test_streaming_protocol to async --- test/asynchronous/test_streaming_protocol.py | 228 +++++++++++++++++++ test/test_streaming_protocol.py | 8 +- 2 files changed, 231 insertions(+), 5 deletions(-) create mode 100644 test/asynchronous/test_streaming_protocol.py diff --git a/test/asynchronous/test_streaming_protocol.py b/test/asynchronous/test_streaming_protocol.py new file mode 100644 index 0000000000..141f4bf961 --- /dev/null +++ b/test/asynchronous/test_streaming_protocol.py @@ -0,0 +1,228 @@ +# Copyright 2020-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test the database module.""" +from __future__ import annotations + +import sys +import time + +sys.path[0:0] = [""] + +from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest +from test.utils import ( + HeartbeatEventListener, + ServerEventListener, + async_wait_until, +) + +from pymongo import monitoring +from pymongo.hello import HelloCompat + +_IS_SYNC = False + + +class TestStreamingProtocol(AsyncIntegrationTest): + @async_client_context.require_failCommand_appName + async def test_failCommand_streaming(self): + listener = ServerEventListener() + hb_listener = HeartbeatEventListener() + client = await self.async_rs_or_single_client( + event_listeners=[listener, hb_listener], + heartbeatFrequencyMS=500, + appName="failingHeartbeatTest", + ) + # Force a connection. + await client.admin.command("ping") + address = await client.address + listener.reset() + + fail_hello = { + "configureFailPoint": "failCommand", + "mode": {"times": 4}, + "data": { + "failCommands": [HelloCompat.LEGACY_CMD, "hello"], + "closeConnection": False, + "errorCode": 10107, + "appName": "failingHeartbeatTest", + }, + } + async with self.fail_point(fail_hello): + + def _marked_unknown(event): + return ( + event.server_address == address + and not event.new_description.is_server_type_known + ) + + def _discovered_node(event): + return ( + event.server_address == address + and not event.previous_description.is_server_type_known + and event.new_description.is_server_type_known + ) + + def marked_unknown(): + return len(listener.matching(_marked_unknown)) >= 1 + + def rediscovered(): + return len(listener.matching(_discovered_node)) >= 1 + + # Topology events are published asynchronously + await async_wait_until(marked_unknown, "mark node unknown") + await async_wait_until(rediscovered, "rediscover node") + + # Server should be selectable. + await client.admin.command("ping") + + @async_client_context.require_failCommand_appName + async def test_streaming_rtt(self): + listener = ServerEventListener() + hb_listener = HeartbeatEventListener() + # On Windows, RTT can actually be 0.0 because time.time() only has + # 1-15 millisecond resolution. We need to delay the initial hello + # to ensure that RTT is never zero. + name = "streamingRttTest" + delay_hello: dict = { + "configureFailPoint": "failCommand", + "mode": {"times": 1000}, + "data": { + "failCommands": [HelloCompat.LEGACY_CMD, "hello"], + "blockConnection": True, + "blockTimeMS": 20, + # This can be uncommented after SERVER-49220 is fixed. + # 'appName': name, + }, + } + async with self.fail_point(delay_hello): + client = await self.async_rs_or_single_client( + event_listeners=[listener, hb_listener], heartbeatFrequencyMS=500, appName=name + ) + # Force a connection. + await client.admin.command("ping") + address = await client.address + + delay_hello["data"]["blockTimeMS"] = 500 + delay_hello["data"]["appName"] = name + async with self.fail_point(delay_hello): + + def rtt_exceeds_250_ms(): + # XXX: Add a public TopologyDescription getter to MongoClient? + topology = client._topology + sd = topology.description.server_descriptions()[address] + assert sd.round_trip_time is not None + return sd.round_trip_time > 0.250 + + await async_wait_until(rtt_exceeds_250_ms, "exceed 250ms RTT") + + # Server should be selectable. + await client.admin.command("ping") + + def changed_event(event): + return event.server_address == address and isinstance( + event, monitoring.ServerDescriptionChangedEvent + ) + + # There should only be one event published, for the initial discovery. + events = listener.matching(changed_event) + self.assertEqual(1, len(events)) + self.assertGreater(events[0].new_description.round_trip_time, 0) + + @async_client_context.require_failCommand_appName + async def test_monitor_waits_after_server_check_error(self): + # This test implements: + # https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring-tests.md#monitors-sleep-at-least-minheartbeatfreqencyms-between-checks + fail_hello = { + "mode": {"times": 5}, + "data": { + "failCommands": [HelloCompat.LEGACY_CMD, "hello"], + "errorCode": 1234, + "appName": "SDAMMinHeartbeatFrequencyTest", + }, + } + async with self.fail_point(fail_hello): + start = time.time() + client = await self.async_single_client( + appName="SDAMMinHeartbeatFrequencyTest", serverSelectionTimeoutMS=5000 + ) + # Force a connection. + await client.admin.command("ping") + duration = time.time() - start + # Explanation of the expected events: + # 0ms: run configureFailPoint + # 1ms: create MongoClient + # 2ms: failed monitor handshake, 1 + # 502ms: failed monitor handshake, 2 + # 1002ms: failed monitor handshake, 3 + # 1502ms: failed monitor handshake, 4 + # 2002ms: failed monitor handshake, 5 + # 2502ms: monitor handshake succeeds + # 2503ms: run awaitable hello + # 2504ms: application handshake succeeds + # 2505ms: ping command succeeds + self.assertGreaterEqual(duration, 2) + self.assertLessEqual(duration, 3.5) + + @async_client_context.require_failCommand_appName + async def test_heartbeat_awaited_flag(self): + hb_listener = HeartbeatEventListener() + client = await self.async_single_client( + event_listeners=[hb_listener], + heartbeatFrequencyMS=500, + appName="heartbeatEventAwaitedFlag", + ) + # Force a connection. + await client.admin.command("ping") + + def hb_succeeded(event): + return isinstance(event, monitoring.ServerHeartbeatSucceededEvent) + + def hb_failed(event): + return isinstance(event, monitoring.ServerHeartbeatFailedEvent) + + fail_heartbeat = { + "mode": {"times": 2}, + "data": { + "failCommands": [HelloCompat.LEGACY_CMD, "hello"], + "closeConnection": True, + "appName": "heartbeatEventAwaitedFlag", + }, + } + async with self.fail_point(fail_heartbeat): + await async_wait_until( + lambda: hb_listener.matching(hb_failed), "published failed event" + ) + # Reconnect. + await client.admin.command("ping") + + hb_succeeded_events = hb_listener.matching(hb_succeeded) + hb_failed_events = hb_listener.matching(hb_failed) + self.assertFalse(hb_succeeded_events[0].awaited) + self.assertTrue(hb_failed_events[0].awaited) + # Depending on thread scheduling, the failed heartbeat could occur on + # the second or third check. + events = [type(e) for e in hb_listener.events[:4]] + if events == [ + monitoring.ServerHeartbeatStartedEvent, + monitoring.ServerHeartbeatSucceededEvent, + monitoring.ServerHeartbeatStartedEvent, + monitoring.ServerHeartbeatFailedEvent, + ]: + self.assertFalse(hb_succeeded_events[1].awaited) + else: + self.assertTrue(hb_succeeded_events[1].awaited) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_streaming_protocol.py b/test/test_streaming_protocol.py index d782aa1dd7..e87d49deab 100644 --- a/test/test_streaming_protocol.py +++ b/test/test_streaming_protocol.py @@ -20,7 +20,7 @@ sys.path[0:0] = [""] -from test import IntegrationTest, client_context, unittest +from test.synchronous import IntegrationTest, client_context, unittest from test.utils import ( HeartbeatEventListener, ServerEventListener, @@ -30,6 +30,8 @@ from pymongo import monitoring from pymongo.hello import HelloCompat +_IS_SYNC = False + class TestStreamingProtocol(IntegrationTest): @client_context.require_failCommand_appName @@ -41,7 +43,6 @@ def test_failCommand_streaming(self): heartbeatFrequencyMS=500, appName="failingHeartbeatTest", ) - self.addCleanup(client.close) # Force a connection. client.admin.command("ping") address = client.address @@ -108,7 +109,6 @@ def test_streaming_rtt(self): client = self.rs_or_single_client( event_listeners=[listener, hb_listener], heartbeatFrequencyMS=500, appName=name ) - self.addCleanup(client.close) # Force a connection. client.admin.command("ping") address = client.address @@ -156,7 +156,6 @@ def test_monitor_waits_after_server_check_error(self): client = self.single_client( appName="SDAMMinHeartbeatFrequencyTest", serverSelectionTimeoutMS=5000 ) - self.addCleanup(client.close) # Force a connection. client.admin.command("ping") duration = time.time() - start @@ -183,7 +182,6 @@ def test_heartbeat_awaited_flag(self): heartbeatFrequencyMS=500, appName="heartbeatEventAwaitedFlag", ) - self.addCleanup(client.close) # Force a connection. client.admin.command("ping") From d2d7f175582feaef260916808e64e4e8c96ffec9 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 3 Feb 2025 11:54:37 -0500 Subject: [PATCH 2/3] Fix synchro --- test/test_streaming_protocol.py | 6 +++--- tools/synchro.py | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/test/test_streaming_protocol.py b/test/test_streaming_protocol.py index e87d49deab..7fc158163b 100644 --- a/test/test_streaming_protocol.py +++ b/test/test_streaming_protocol.py @@ -20,7 +20,7 @@ sys.path[0:0] = [""] -from test.synchronous import IntegrationTest, client_context, unittest +from test import IntegrationTest, client_context, unittest from test.utils import ( HeartbeatEventListener, ServerEventListener, @@ -30,7 +30,7 @@ from pymongo import monitoring from pymongo.hello import HelloCompat -_IS_SYNC = False +_IS_SYNC = True class TestStreamingProtocol(IntegrationTest): @@ -79,7 +79,7 @@ def marked_unknown(): def rediscovered(): return len(listener.matching(_discovered_node)) >= 1 - # Topology events are published asynchronously + # Topology events are published synchronously wait_until(marked_unknown, "mark node unknown") wait_until(rediscovered, "rediscover node") diff --git a/tools/synchro.py b/tools/synchro.py index ba0a4712e3..31159a82ee 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -226,6 +226,7 @@ def async_only_test(f: str) -> bool: "test_retryable_writes.py", "test_retryable_writes_unified.py", "test_session.py", + "test_streaming_protocol.py", "test_transactions.py", "unified_format.py", ] From 9b2a6fc7c887e285903696268e60ae5035cc9a5e Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 3 Feb 2025 15:06:26 -0500 Subject: [PATCH 3/3] Fix comment --- test/asynchronous/test_streaming_protocol.py | 2 +- test/test_streaming_protocol.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/test_streaming_protocol.py b/test/asynchronous/test_streaming_protocol.py index 141f4bf961..fd890d29fb 100644 --- a/test/asynchronous/test_streaming_protocol.py +++ b/test/asynchronous/test_streaming_protocol.py @@ -79,7 +79,7 @@ def marked_unknown(): def rediscovered(): return len(listener.matching(_discovered_node)) >= 1 - # Topology events are published asynchronously + # Topology events are not published synchronously await async_wait_until(marked_unknown, "mark node unknown") await async_wait_until(rediscovered, "rediscover node") diff --git a/test/test_streaming_protocol.py b/test/test_streaming_protocol.py index 7fc158163b..894e89e208 100644 --- a/test/test_streaming_protocol.py +++ b/test/test_streaming_protocol.py @@ -79,7 +79,7 @@ def marked_unknown(): def rediscovered(): return len(listener.matching(_discovered_node)) >= 1 - # Topology events are published synchronously + # Topology events are not published synchronously wait_until(marked_unknown, "mark node unknown") wait_until(rediscovered, "rediscover node")