Skip to content

Commit bfba548

Browse files
authored
PYTHON-4789 Migrate test_retryable_reads.py to async (#1877)
1 parent 8791aa0 commit bfba548

File tree

3 files changed

+194
-4
lines changed

3 files changed

+194
-4
lines changed
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
# Copyright 2019-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Test retryable reads spec."""
16+
from __future__ import annotations
17+
18+
import os
19+
import pprint
20+
import sys
21+
import threading
22+
23+
from pymongo.errors import AutoReconnect
24+
25+
sys.path[0:0] = [""]
26+
27+
from test.asynchronous import (
28+
AsyncIntegrationTest,
29+
AsyncPyMongoTestCase,
30+
async_client_context,
31+
client_knobs,
32+
unittest,
33+
)
34+
from test.utils import (
35+
CMAPListener,
36+
OvertCommandListener,
37+
async_set_fail_point,
38+
)
39+
40+
from pymongo.monitoring import (
41+
ConnectionCheckedOutEvent,
42+
ConnectionCheckOutFailedEvent,
43+
ConnectionCheckOutFailedReason,
44+
PoolClearedEvent,
45+
)
46+
47+
_IS_SYNC = False
48+
49+
50+
class TestClientOptions(AsyncPyMongoTestCase):
51+
async def test_default(self):
52+
client = self.simple_client(connect=False)
53+
self.assertEqual(client.options.retry_reads, True)
54+
55+
async def test_kwargs(self):
56+
client = self.simple_client(retryReads=True, connect=False)
57+
self.assertEqual(client.options.retry_reads, True)
58+
client = self.simple_client(retryReads=False, connect=False)
59+
self.assertEqual(client.options.retry_reads, False)
60+
61+
async def test_uri(self):
62+
client = self.simple_client("mongodb://h/?retryReads=true", connect=False)
63+
self.assertEqual(client.options.retry_reads, True)
64+
client = self.simple_client("mongodb://h/?retryReads=false", connect=False)
65+
self.assertEqual(client.options.retry_reads, False)
66+
67+
68+
class FindThread(threading.Thread):
69+
def __init__(self, collection):
70+
super().__init__()
71+
self.daemon = True
72+
self.collection = collection
73+
self.passed = False
74+
75+
async def run(self):
76+
await self.collection.find_one({})
77+
self.passed = True
78+
79+
80+
class TestPoolPausedError(AsyncIntegrationTest):
81+
# Pools don't get paused in load balanced mode.
82+
RUN_ON_LOAD_BALANCER = False
83+
RUN_ON_SERVERLESS = False
84+
85+
@async_client_context.require_sync
86+
@async_client_context.require_failCommand_blockConnection
87+
@client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05)
88+
async def test_pool_paused_error_is_retryable(self):
89+
if "PyPy" in sys.version:
90+
# Tracked in PYTHON-3519
91+
self.skipTest("Test is flakey on PyPy")
92+
cmap_listener = CMAPListener()
93+
cmd_listener = OvertCommandListener()
94+
client = await self.async_rs_or_single_client(
95+
maxPoolSize=1, event_listeners=[cmap_listener, cmd_listener]
96+
)
97+
for _ in range(10):
98+
cmap_listener.reset()
99+
cmd_listener.reset()
100+
threads = [FindThread(client.pymongo_test.test) for _ in range(2)]
101+
fail_command = {
102+
"mode": {"times": 1},
103+
"data": {
104+
"failCommands": ["find"],
105+
"blockConnection": True,
106+
"blockTimeMS": 1000,
107+
"errorCode": 91,
108+
},
109+
}
110+
async with self.fail_point(fail_command):
111+
for thread in threads:
112+
thread.start()
113+
for thread in threads:
114+
thread.join()
115+
for thread in threads:
116+
self.assertTrue(thread.passed)
117+
118+
# It's possible that SDAM can rediscover the server and mark the
119+
# pool ready before the thread in the wait queue has a chance
120+
# to run. Repeat the test until the thread actually encounters
121+
# a PoolClearedError.
122+
if cmap_listener.event_count(ConnectionCheckOutFailedEvent):
123+
break
124+
125+
# Via CMAP monitoring, assert that the first check out succeeds.
126+
cmap_events = cmap_listener.events_by_type(
127+
(ConnectionCheckedOutEvent, ConnectionCheckOutFailedEvent, PoolClearedEvent)
128+
)
129+
msg = pprint.pformat(cmap_listener.events)
130+
self.assertIsInstance(cmap_events[0], ConnectionCheckedOutEvent, msg)
131+
self.assertIsInstance(cmap_events[1], PoolClearedEvent, msg)
132+
self.assertIsInstance(cmap_events[2], ConnectionCheckOutFailedEvent, msg)
133+
self.assertEqual(cmap_events[2].reason, ConnectionCheckOutFailedReason.CONN_ERROR, msg)
134+
self.assertIsInstance(cmap_events[3], ConnectionCheckedOutEvent, msg)
135+
136+
# Connection check out failures are not reflected in command
137+
# monitoring because we only publish command events _after_ checking
138+
# out a connection.
139+
started = cmd_listener.started_events
140+
msg = pprint.pformat(cmd_listener.results)
141+
self.assertEqual(3, len(started), msg)
142+
succeeded = cmd_listener.succeeded_events
143+
self.assertEqual(2, len(succeeded), msg)
144+
failed = cmd_listener.failed_events
145+
self.assertEqual(1, len(failed), msg)
146+
147+
148+
class TestRetryableReads(AsyncIntegrationTest):
149+
@async_client_context.require_multiple_mongoses
150+
@async_client_context.require_failCommand_fail_point
151+
async def test_retryable_reads_in_sharded_cluster_multiple_available(self):
152+
fail_command = {
153+
"configureFailPoint": "failCommand",
154+
"mode": {"times": 1},
155+
"data": {
156+
"failCommands": ["find"],
157+
"closeConnection": True,
158+
"appName": "retryableReadTest",
159+
},
160+
}
161+
162+
mongos_clients = []
163+
164+
for mongos in async_client_context.mongos_seeds().split(","):
165+
client = await self.async_rs_or_single_client(mongos)
166+
await async_set_fail_point(client, fail_command)
167+
mongos_clients.append(client)
168+
169+
listener = OvertCommandListener()
170+
client = await self.async_rs_or_single_client(
171+
async_client_context.mongos_seeds(),
172+
appName="retryableReadTest",
173+
event_listeners=[listener],
174+
retryReads=True,
175+
)
176+
177+
async with self.fail_point(fail_command):
178+
with self.assertRaises(AutoReconnect):
179+
await client.t.t.find_one({})
180+
181+
# Disable failpoints on each mongos
182+
for client in mongos_clients:
183+
fail_command["mode"] = "off"
184+
await async_set_fail_point(client, fail_command)
185+
186+
self.assertEqual(len(listener.failed_events), 2)
187+
self.assertEqual(len(listener.succeeded_events), 0)
188+
189+
190+
if __name__ == "__main__":
191+
unittest.main()

test/test_retryable_reads.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@
4444
PoolClearedEvent,
4545
)
4646

47-
# Location of JSON test specifications.
48-
_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "retryable_reads", "legacy")
47+
_IS_SYNC = True
4948

5049

5150
class TestClientOptions(PyMongoTestCase):
@@ -83,6 +82,7 @@ class TestPoolPausedError(IntegrationTest):
8382
RUN_ON_LOAD_BALANCER = False
8483
RUN_ON_SERVERLESS = False
8584

85+
@client_context.require_sync
8686
@client_context.require_failCommand_blockConnection
8787
@client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05)
8888
def test_pool_paused_error_is_retryable(self):
@@ -94,7 +94,6 @@ def test_pool_paused_error_is_retryable(self):
9494
client = self.rs_or_single_client(
9595
maxPoolSize=1, event_listeners=[cmap_listener, cmd_listener]
9696
)
97-
self.addCleanup(client.close)
9897
for _ in range(10):
9998
cmap_listener.reset()
10099
cmd_listener.reset()
@@ -165,7 +164,6 @@ def test_retryable_reads_in_sharded_cluster_multiple_available(self):
165164
for mongos in client_context.mongos_seeds().split(","):
166165
client = self.rs_or_single_client(mongos)
167166
set_fail_point(client, fail_command)
168-
self.addCleanup(client.close)
169167
mongos_clients.append(client)
170168

171169
listener = OvertCommandListener()

tools/synchro.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ def async_only_test(f: str) -> bool:
193193
"test_logger.py",
194194
"test_monitoring.py",
195195
"test_raw_bson.py",
196+
"test_retryable_reads.py",
196197
"test_retryable_writes.py",
197198
"test_session.py",
198199
"test_transactions.py",

0 commit comments

Comments
 (0)