Skip to content

Commit 7a539f2

Browse files
authored
PYTHON-2294 Resync SDAM spec tests to workaround slow elections Windows and macOS (#468)
PYTHON-2296 Test behavior of connectTimeoutMS=0 with streaming protocol PYTHON-2311 Workaround inherent race in flaky streaming protocol test
1 parent 98f8470 commit 7a539f2

File tree

7 files changed

+230
-21
lines changed

7 files changed

+230
-21
lines changed

pymongo/common.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,16 @@ def validate_timeout_or_zero(option, value):
333333
return validate_positive_float(option, value) / 1000.0
334334

335335

336+
def validate_timeout_or_none_or_zero(option, value):
337+
"""Validates a timeout specified in milliseconds returning
338+
a value in floating point seconds. value=0 and value="0" are treated the
339+
same as value=None which means unlimited timeout.
340+
"""
341+
if value is None or value == 0 or value == "0":
342+
return None
343+
return validate_positive_float(option, value) / 1000.0
344+
345+
336346
def validate_max_staleness(option, value):
337347
"""Validates maxStalenessSeconds according to the Max Staleness Spec."""
338348
if value == -1 or value == "-1":
@@ -593,7 +603,7 @@ def validate_tzinfo(dummy, value):
593603
'authmechanismproperties': validate_auth_mechanism_properties,
594604
'authsource': validate_string,
595605
'compressors': validate_compressors,
596-
'connecttimeoutms': validate_timeout_or_none,
606+
'connecttimeoutms': validate_timeout_or_none_or_zero,
597607
'directconnection': validate_boolean_or_string,
598608
'heartbeatfrequencyms': validate_timeout_or_none,
599609
'journal': validate_boolean_or_string,
@@ -608,7 +618,7 @@ def validate_tzinfo(dummy, value):
608618
'retryreads': validate_boolean_or_string,
609619
'retrywrites': validate_boolean_or_string,
610620
'serverselectiontimeoutms': validate_timeout_or_zero,
611-
'sockettimeoutms': validate_timeout_or_none,
621+
'sockettimeoutms': validate_timeout_or_none_or_zero,
612622
'ssl_keyfile': validate_readable,
613623
'tls': validate_boolean_or_string,
614624
'tlsallowinvalidcertificates': validate_allow_invalid_certs,

pymongo/mongo_client.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,11 +227,13 @@ def __init__(
227227
- `socketTimeoutMS`: (integer or None) Controls how long (in
228228
milliseconds) the driver will wait for a response after sending an
229229
ordinary (non-monitoring) database operation before concluding that
230-
a network error has occurred. Defaults to ``None`` (no timeout).
230+
a network error has occurred. ``0`` or ``None`` means no timeout.
231+
Defaults to ``None`` (no timeout).
231232
- `connectTimeoutMS`: (integer or None) Controls how long (in
232233
milliseconds) the driver will wait during server monitoring when
233234
connecting a new socket to a server before concluding the server
234-
is unavailable. Defaults to ``20000`` (20 seconds).
235+
is unavailable. ``0`` or ``None`` means no timeout.
236+
Defaults to ``20000`` (20 seconds).
235237
- `server_selector`: (callable or None) Optional, user-provided
236238
function that augments server selection rules. The function should
237239
accept as an argument a list of
@@ -631,7 +633,7 @@ def __init__(
631633
# Determine connection timeout from kwargs.
632634
timeout = keyword_opts.get("connecttimeoutms")
633635
if timeout is not None:
634-
timeout = common.validate_timeout_or_none(
636+
timeout = common.validate_timeout_or_none_or_zero(
635637
keyword_opts.cased_key("connecttimeoutms"), timeout)
636638
res = uri_parser.parse_uri(
637639
entity, port, validate=True, warn=True, normalize=False,
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
{
2+
"runOn": [
3+
{
4+
"minServerVersion": "4.4"
5+
}
6+
],
7+
"database_name": "sdam-tests",
8+
"collection_name": "connectTimeoutMS",
9+
"data": [],
10+
"tests": [
11+
{
12+
"description": "connectTimeoutMS=0",
13+
"clientOptions": {
14+
"retryWrites": false,
15+
"connectTimeoutMS": 0,
16+
"heartbeatFrequencyMS": 500,
17+
"appname": "connectTimeoutMS=0"
18+
},
19+
"operations": [
20+
{
21+
"name": "insertMany",
22+
"object": "collection",
23+
"arguments": {
24+
"documents": [
25+
{
26+
"_id": 1
27+
},
28+
{
29+
"_id": 2
30+
}
31+
]
32+
}
33+
},
34+
{
35+
"name": "configureFailPoint",
36+
"object": "testRunner",
37+
"arguments": {
38+
"failPoint": {
39+
"configureFailPoint": "failCommand",
40+
"mode": {
41+
"times": 2
42+
},
43+
"data": {
44+
"failCommands": [
45+
"isMaster"
46+
],
47+
"appName": "connectTimeoutMS=0",
48+
"blockConnection": true,
49+
"blockTimeMS": 550
50+
}
51+
}
52+
}
53+
},
54+
{
55+
"name": "wait",
56+
"object": "testRunner",
57+
"arguments": {
58+
"ms": 750
59+
}
60+
},
61+
{
62+
"name": "insertMany",
63+
"object": "collection",
64+
"arguments": {
65+
"documents": [
66+
{
67+
"_id": 3
68+
},
69+
{
70+
"_id": 4
71+
}
72+
]
73+
}
74+
},
75+
{
76+
"name": "assertEventCount",
77+
"object": "testRunner",
78+
"arguments": {
79+
"event": "ServerMarkedUnknownEvent",
80+
"count": 0
81+
}
82+
},
83+
{
84+
"name": "assertEventCount",
85+
"object": "testRunner",
86+
"arguments": {
87+
"event": "PoolClearedEvent",
88+
"count": 0
89+
}
90+
}
91+
],
92+
"expectations": [
93+
{
94+
"command_started_event": {
95+
"command": {
96+
"insert": "connectTimeoutMS",
97+
"documents": [
98+
{
99+
"_id": 1
100+
},
101+
{
102+
"_id": 2
103+
}
104+
]
105+
},
106+
"command_name": "insert",
107+
"database_name": "sdam-tests"
108+
}
109+
},
110+
{
111+
"command_started_event": {
112+
"command": {
113+
"insert": "connectTimeoutMS",
114+
"documents": [
115+
{
116+
"_id": 3
117+
},
118+
{
119+
"_id": 4
120+
}
121+
]
122+
},
123+
"command_name": "insert",
124+
"database_name": "sdam-tests"
125+
}
126+
}
127+
],
128+
"outcome": {
129+
"collection": {
130+
"data": [
131+
{
132+
"_id": 1
133+
},
134+
{
135+
"_id": 2
136+
},
137+
{
138+
"_id": 3
139+
},
140+
{
141+
"_id": 4
142+
}
143+
]
144+
}
145+
}
146+
}
147+
]
148+
}

test/discovery_and_monitoring_integration/rediscover-quickly-after-step-down.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@
6464
"command_name": "replSetStepDown",
6565
"arguments": {
6666
"command": {
67-
"replSetStepDown": 20,
68-
"secondaryCatchUpPeriodSecs": 20,
67+
"replSetStepDown": 30,
68+
"secondaryCatchUpPeriodSecs": 30,
6969
"force": false
7070
}
7171
}
@@ -74,7 +74,7 @@
7474
"name": "waitForPrimaryChange",
7575
"object": "testRunner",
7676
"arguments": {
77-
"timeoutMS": 5000
77+
"timeoutMS": 15000
7878
}
7979
},
8080
{

test/test_client.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,24 @@ def test_keyword_arg_defaults(self):
134134
self.assertEqual(ReadPreference.PRIMARY, client.read_preference)
135135
self.assertAlmostEqual(12, client.server_selection_timeout)
136136

137+
def test_connect_timeout(self):
138+
client = MongoClient(connect=False, connectTimeoutMS=None,
139+
socketTimeoutMS=None)
140+
pool_opts = client._MongoClient__options.pool_options
141+
self.assertEqual(None, pool_opts.socket_timeout)
142+
self.assertEqual(None, pool_opts.connect_timeout)
143+
client = MongoClient(connect=False, connectTimeoutMS=0,
144+
socketTimeoutMS=0)
145+
pool_opts = client._MongoClient__options.pool_options
146+
self.assertEqual(None, pool_opts.socket_timeout)
147+
self.assertEqual(None, pool_opts.connect_timeout)
148+
client = MongoClient(
149+
'mongodb://localhost/?connectTimeoutMS=0&socketTimeoutMS=0',
150+
connect=False)
151+
pool_opts = client._MongoClient__options.pool_options
152+
self.assertEqual(None, pool_opts.socket_timeout)
153+
self.assertEqual(None, pool_opts.connect_timeout)
154+
137155
def test_types(self):
138156
self.assertRaises(TypeError, MongoClient, 1)
139157
self.assertRaises(TypeError, MongoClient, 1.14)
@@ -996,8 +1014,8 @@ def test_socket_timeout_ms_validation(self):
9961014
c = connected(rs_or_single_client(socketTimeoutMS=None))
9971015
self.assertEqual(None, get_pool(c).opts.socket_timeout)
9981016

999-
self.assertRaises(ValueError,
1000-
rs_or_single_client, socketTimeoutMS=0)
1017+
c = connected(rs_or_single_client(socketTimeoutMS=0))
1018+
self.assertEqual(None, get_pool(c).opts.socket_timeout)
10011019

10021020
self.assertRaises(ValueError,
10031021
rs_or_single_client, socketTimeoutMS=-1)

test/test_session.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1249,6 +1249,7 @@ def test_cluster_time(self):
12491249
# Prevent heartbeats from updating $clusterTime between operations.
12501250
client = rs_or_single_client(event_listeners=[listener],
12511251
heartbeatFrequencyMS=999999)
1252+
self.addCleanup(client.close)
12521253
collection = client.pymongo_test.collection
12531254
# Prepare for tests of find() and aggregate().
12541255
collection.insert_many([{} for _ in range(10)])

test/test_streaming_protocol.py

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
unittest)
2727
from test.utils import (HeartbeatEventListener,
2828
rs_or_single_client,
29+
single_client,
2930
ServerEventListener,
3031
wait_until)
3132

@@ -160,14 +161,6 @@ def hb_started(event):
160161
return (isinstance(event, monitoring.ServerHeartbeatStartedEvent)
161162
and event.connection_id == address)
162163

163-
def hb_succeeded(event):
164-
return (isinstance(event, monitoring.ServerHeartbeatSucceededEvent)
165-
and event.connection_id == address)
166-
167-
def hb_failed(event):
168-
return (isinstance(event, monitoring.ServerHeartbeatFailedEvent)
169-
and event.connection_id == address)
170-
171164
hb_started_events = hb_listener.matching(hb_started)
172165
# Explanation of the expected heartbeat events:
173166
# Time: event
@@ -186,13 +179,50 @@ def hb_failed(event):
186179
# This can be reduced to ~15 after SERVER-49220 is fixed.
187180
self.assertLess(len(hb_started_events), 40)
188181

189-
# Check the awaited flag.
182+
@client_context.require_failCommand_appName
183+
def test_heartbeat_awaited_flag(self):
184+
hb_listener = HeartbeatEventListener()
185+
client = single_client(
186+
event_listeners=[hb_listener], heartbeatFrequencyMS=500,
187+
appName='heartbeatEventAwaitedFlag')
188+
self.addCleanup(client.close)
189+
# Force a connection.
190+
client.admin.command('ping')
191+
192+
def hb_succeeded(event):
193+
return isinstance(event, monitoring.ServerHeartbeatSucceededEvent)
194+
195+
def hb_failed(event):
196+
return isinstance(event, monitoring.ServerHeartbeatFailedEvent)
197+
198+
fail_heartbeat = {
199+
'mode': {'times': 2},
200+
'data': {
201+
'failCommands': ['isMaster'],
202+
'closeConnection': True,
203+
'appName': 'heartbeatEventAwaitedFlag',
204+
},
205+
}
206+
with self.fail_point(fail_heartbeat):
207+
wait_until(lambda: hb_listener.matching(hb_failed),
208+
"published failed event")
209+
# Reconnect.
210+
client.admin.command('ping')
211+
190212
hb_succeeded_events = hb_listener.matching(hb_succeeded)
191213
hb_failed_events = hb_listener.matching(hb_failed)
192214
self.assertFalse(hb_succeeded_events[0].awaited)
193-
self.assertTrue(hb_succeeded_events[1].awaited)
194215
self.assertTrue(hb_failed_events[0].awaited)
195-
self.assertFalse(hb_failed_events[1].awaited)
216+
# Depending on thread scheduling, the failed heartbeat could occur on
217+
# the second or third check.
218+
events = [type(e) for e in hb_listener.results[:4]]
219+
if events == [monitoring.ServerHeartbeatStartedEvent,
220+
monitoring.ServerHeartbeatSucceededEvent,
221+
monitoring.ServerHeartbeatStartedEvent,
222+
monitoring.ServerHeartbeatFailedEvent]:
223+
self.assertFalse(hb_succeeded_events[1].awaited)
224+
else:
225+
self.assertTrue(hb_succeeded_events[1].awaited)
196226

197227

198228
if __name__ == "__main__":

0 commit comments

Comments
 (0)