Skip to content

Commit 02d6cc9

Browse files
authored
PYTHON-5107 - Convert test.test_streaming_protocol to async (#2126)
1 parent 7a4150a commit 02d6cc9

File tree

3 files changed

+232
-5
lines changed

3 files changed

+232
-5
lines changed
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
# Copyright 2020-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Test the database module."""
16+
from __future__ import annotations
17+
18+
import sys
19+
import time
20+
21+
sys.path[0:0] = [""]
22+
23+
from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest
24+
from test.utils import (
25+
HeartbeatEventListener,
26+
ServerEventListener,
27+
async_wait_until,
28+
)
29+
30+
from pymongo import monitoring
31+
from pymongo.hello import HelloCompat
32+
33+
_IS_SYNC = False
34+
35+
36+
class TestStreamingProtocol(AsyncIntegrationTest):
37+
@async_client_context.require_failCommand_appName
38+
async def test_failCommand_streaming(self):
39+
listener = ServerEventListener()
40+
hb_listener = HeartbeatEventListener()
41+
client = await self.async_rs_or_single_client(
42+
event_listeners=[listener, hb_listener],
43+
heartbeatFrequencyMS=500,
44+
appName="failingHeartbeatTest",
45+
)
46+
# Force a connection.
47+
await client.admin.command("ping")
48+
address = await client.address
49+
listener.reset()
50+
51+
fail_hello = {
52+
"configureFailPoint": "failCommand",
53+
"mode": {"times": 4},
54+
"data": {
55+
"failCommands": [HelloCompat.LEGACY_CMD, "hello"],
56+
"closeConnection": False,
57+
"errorCode": 10107,
58+
"appName": "failingHeartbeatTest",
59+
},
60+
}
61+
async with self.fail_point(fail_hello):
62+
63+
def _marked_unknown(event):
64+
return (
65+
event.server_address == address
66+
and not event.new_description.is_server_type_known
67+
)
68+
69+
def _discovered_node(event):
70+
return (
71+
event.server_address == address
72+
and not event.previous_description.is_server_type_known
73+
and event.new_description.is_server_type_known
74+
)
75+
76+
def marked_unknown():
77+
return len(listener.matching(_marked_unknown)) >= 1
78+
79+
def rediscovered():
80+
return len(listener.matching(_discovered_node)) >= 1
81+
82+
# Topology events are not published synchronously
83+
await async_wait_until(marked_unknown, "mark node unknown")
84+
await async_wait_until(rediscovered, "rediscover node")
85+
86+
# Server should be selectable.
87+
await client.admin.command("ping")
88+
89+
@async_client_context.require_failCommand_appName
90+
async def test_streaming_rtt(self):
91+
listener = ServerEventListener()
92+
hb_listener = HeartbeatEventListener()
93+
# On Windows, RTT can actually be 0.0 because time.time() only has
94+
# 1-15 millisecond resolution. We need to delay the initial hello
95+
# to ensure that RTT is never zero.
96+
name = "streamingRttTest"
97+
delay_hello: dict = {
98+
"configureFailPoint": "failCommand",
99+
"mode": {"times": 1000},
100+
"data": {
101+
"failCommands": [HelloCompat.LEGACY_CMD, "hello"],
102+
"blockConnection": True,
103+
"blockTimeMS": 20,
104+
# This can be uncommented after SERVER-49220 is fixed.
105+
# 'appName': name,
106+
},
107+
}
108+
async with self.fail_point(delay_hello):
109+
client = await self.async_rs_or_single_client(
110+
event_listeners=[listener, hb_listener], heartbeatFrequencyMS=500, appName=name
111+
)
112+
# Force a connection.
113+
await client.admin.command("ping")
114+
address = await client.address
115+
116+
delay_hello["data"]["blockTimeMS"] = 500
117+
delay_hello["data"]["appName"] = name
118+
async with self.fail_point(delay_hello):
119+
120+
def rtt_exceeds_250_ms():
121+
# XXX: Add a public TopologyDescription getter to MongoClient?
122+
topology = client._topology
123+
sd = topology.description.server_descriptions()[address]
124+
assert sd.round_trip_time is not None
125+
return sd.round_trip_time > 0.250
126+
127+
await async_wait_until(rtt_exceeds_250_ms, "exceed 250ms RTT")
128+
129+
# Server should be selectable.
130+
await client.admin.command("ping")
131+
132+
def changed_event(event):
133+
return event.server_address == address and isinstance(
134+
event, monitoring.ServerDescriptionChangedEvent
135+
)
136+
137+
# There should only be one event published, for the initial discovery.
138+
events = listener.matching(changed_event)
139+
self.assertEqual(1, len(events))
140+
self.assertGreater(events[0].new_description.round_trip_time, 0)
141+
142+
@async_client_context.require_failCommand_appName
143+
async def test_monitor_waits_after_server_check_error(self):
144+
# This test implements:
145+
# https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring-tests.md#monitors-sleep-at-least-minheartbeatfreqencyms-between-checks
146+
fail_hello = {
147+
"mode": {"times": 5},
148+
"data": {
149+
"failCommands": [HelloCompat.LEGACY_CMD, "hello"],
150+
"errorCode": 1234,
151+
"appName": "SDAMMinHeartbeatFrequencyTest",
152+
},
153+
}
154+
async with self.fail_point(fail_hello):
155+
start = time.time()
156+
client = await self.async_single_client(
157+
appName="SDAMMinHeartbeatFrequencyTest", serverSelectionTimeoutMS=5000
158+
)
159+
# Force a connection.
160+
await client.admin.command("ping")
161+
duration = time.time() - start
162+
# Explanation of the expected events:
163+
# 0ms: run configureFailPoint
164+
# 1ms: create MongoClient
165+
# 2ms: failed monitor handshake, 1
166+
# 502ms: failed monitor handshake, 2
167+
# 1002ms: failed monitor handshake, 3
168+
# 1502ms: failed monitor handshake, 4
169+
# 2002ms: failed monitor handshake, 5
170+
# 2502ms: monitor handshake succeeds
171+
# 2503ms: run awaitable hello
172+
# 2504ms: application handshake succeeds
173+
# 2505ms: ping command succeeds
174+
self.assertGreaterEqual(duration, 2)
175+
self.assertLessEqual(duration, 3.5)
176+
177+
@async_client_context.require_failCommand_appName
178+
async def test_heartbeat_awaited_flag(self):
179+
hb_listener = HeartbeatEventListener()
180+
client = await self.async_single_client(
181+
event_listeners=[hb_listener],
182+
heartbeatFrequencyMS=500,
183+
appName="heartbeatEventAwaitedFlag",
184+
)
185+
# Force a connection.
186+
await client.admin.command("ping")
187+
188+
def hb_succeeded(event):
189+
return isinstance(event, monitoring.ServerHeartbeatSucceededEvent)
190+
191+
def hb_failed(event):
192+
return isinstance(event, monitoring.ServerHeartbeatFailedEvent)
193+
194+
fail_heartbeat = {
195+
"mode": {"times": 2},
196+
"data": {
197+
"failCommands": [HelloCompat.LEGACY_CMD, "hello"],
198+
"closeConnection": True,
199+
"appName": "heartbeatEventAwaitedFlag",
200+
},
201+
}
202+
async with self.fail_point(fail_heartbeat):
203+
await async_wait_until(
204+
lambda: hb_listener.matching(hb_failed), "published failed event"
205+
)
206+
# Reconnect.
207+
await client.admin.command("ping")
208+
209+
hb_succeeded_events = hb_listener.matching(hb_succeeded)
210+
hb_failed_events = hb_listener.matching(hb_failed)
211+
self.assertFalse(hb_succeeded_events[0].awaited)
212+
self.assertTrue(hb_failed_events[0].awaited)
213+
# Depending on thread scheduling, the failed heartbeat could occur on
214+
# the second or third check.
215+
events = [type(e) for e in hb_listener.events[:4]]
216+
if events == [
217+
monitoring.ServerHeartbeatStartedEvent,
218+
monitoring.ServerHeartbeatSucceededEvent,
219+
monitoring.ServerHeartbeatStartedEvent,
220+
monitoring.ServerHeartbeatFailedEvent,
221+
]:
222+
self.assertFalse(hb_succeeded_events[1].awaited)
223+
else:
224+
self.assertTrue(hb_succeeded_events[1].awaited)
225+
226+
227+
if __name__ == "__main__":
228+
unittest.main()

test/test_streaming_protocol.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
from pymongo import monitoring
3131
from pymongo.hello import HelloCompat
3232

33+
_IS_SYNC = True
34+
3335

3436
class TestStreamingProtocol(IntegrationTest):
3537
@client_context.require_failCommand_appName
@@ -41,7 +43,6 @@ def test_failCommand_streaming(self):
4143
heartbeatFrequencyMS=500,
4244
appName="failingHeartbeatTest",
4345
)
44-
self.addCleanup(client.close)
4546
# Force a connection.
4647
client.admin.command("ping")
4748
address = client.address
@@ -78,7 +79,7 @@ def marked_unknown():
7879
def rediscovered():
7980
return len(listener.matching(_discovered_node)) >= 1
8081

81-
# Topology events are published asynchronously
82+
# Topology events are not published synchronously
8283
wait_until(marked_unknown, "mark node unknown")
8384
wait_until(rediscovered, "rediscover node")
8485

@@ -108,7 +109,6 @@ def test_streaming_rtt(self):
108109
client = self.rs_or_single_client(
109110
event_listeners=[listener, hb_listener], heartbeatFrequencyMS=500, appName=name
110111
)
111-
self.addCleanup(client.close)
112112
# Force a connection.
113113
client.admin.command("ping")
114114
address = client.address
@@ -156,7 +156,6 @@ def test_monitor_waits_after_server_check_error(self):
156156
client = self.single_client(
157157
appName="SDAMMinHeartbeatFrequencyTest", serverSelectionTimeoutMS=5000
158158
)
159-
self.addCleanup(client.close)
160159
# Force a connection.
161160
client.admin.command("ping")
162161
duration = time.time() - start
@@ -183,7 +182,6 @@ def test_heartbeat_awaited_flag(self):
183182
heartbeatFrequencyMS=500,
184183
appName="heartbeatEventAwaitedFlag",
185184
)
186-
self.addCleanup(client.close)
187185
# Force a connection.
188186
client.admin.command("ping")
189187

tools/synchro.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ def async_only_test(f: str) -> bool:
234234
"test_sessions_unified.py",
235235
"test_srv_polling.py",
236236
"test_ssl.py",
237+
"test_streaming_protocol.py",
237238
"test_transactions.py",
238239
"test_unified_format.py",
239240
"unified_format.py",

0 commit comments

Comments
 (0)