Skip to content

Commit 0787a81

Browse files
committed
Migrate test_retryable_reads.py to async
1 parent 1e395de commit 0787a81

File tree

3 files changed

+200
-0
lines changed

3 files changed

+200
-0
lines changed
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
# Copyright 2019-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Test retryable reads spec."""
16+
from __future__ import annotations
17+
18+
import os
19+
import pprint
20+
import sys
21+
import threading
22+
23+
from pymongo.errors import AutoReconnect
24+
25+
sys.path[0:0] = [""]
26+
27+
from test.asynchronous import (
28+
AsyncIntegrationTest,
29+
AsyncPyMongoTestCase,
30+
async_client_context,
31+
client_knobs,
32+
unittest,
33+
)
34+
from test.utils import (
35+
CMAPListener,
36+
OvertCommandListener,
37+
set_fail_point,
38+
)
39+
40+
from pymongo.monitoring import (
41+
ConnectionCheckedOutEvent,
42+
ConnectionCheckOutFailedEvent,
43+
ConnectionCheckOutFailedReason,
44+
PoolClearedEvent,
45+
)
46+
47+
_IS_SYNC = False
48+
49+
# Location of JSON test specifications.
50+
_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "retryable_reads", "legacy")
51+
52+
53+
class TestClientOptions(AsyncPyMongoTestCase):
54+
async def test_default(self):
55+
client = self.simple_client(connect=False)
56+
self.assertEqual(client.options.retry_reads, True)
57+
58+
async def test_kwargs(self):
59+
client = self.simple_client(retryReads=True, connect=False)
60+
self.assertEqual(client.options.retry_reads, True)
61+
client = self.simple_client(retryReads=False, connect=False)
62+
self.assertEqual(client.options.retry_reads, False)
63+
64+
async def test_uri(self):
65+
client = self.simple_client("mongodb://h/?retryReads=true", connect=False)
66+
self.assertEqual(client.options.retry_reads, True)
67+
client = self.simple_client("mongodb://h/?retryReads=false", connect=False)
68+
self.assertEqual(client.options.retry_reads, False)
69+
70+
71+
class FindThread(threading.Thread):
72+
def __init__(self, collection):
73+
super().__init__()
74+
self.daemon = True
75+
self.collection = collection
76+
self.passed = False
77+
78+
async def run(self):
79+
await self.collection.find_one({})
80+
self.passed = True
81+
82+
83+
class TestPoolPausedError(AsyncIntegrationTest):
84+
# Pools don't get paused in load balanced mode.
85+
RUN_ON_LOAD_BALANCER = False
86+
RUN_ON_SERVERLESS = False
87+
88+
@async_client_context.require_sync
89+
@async_client_context.require_failCommand_blockConnection
90+
@client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05)
91+
async def test_pool_paused_error_is_retryable(self):
92+
if "PyPy" in sys.version:
93+
# Tracked in PYTHON-3519
94+
self.skipTest("Test is flakey on PyPy")
95+
cmap_listener = CMAPListener()
96+
cmd_listener = OvertCommandListener()
97+
client = await self.async_rs_or_single_client(
98+
maxPoolSize=1, event_listeners=[cmap_listener, cmd_listener]
99+
)
100+
self.addAsyncCleanup(client.close)
101+
for _ in range(10):
102+
cmap_listener.reset()
103+
cmd_listener.reset()
104+
threads = [FindThread(client.pymongo_test.test) for _ in range(2)]
105+
fail_command = {
106+
"mode": {"times": 1},
107+
"data": {
108+
"failCommands": ["find"],
109+
"blockConnection": True,
110+
"blockTimeMS": 1000,
111+
"errorCode": 91,
112+
},
113+
}
114+
async with self.fail_point(fail_command):
115+
for thread in threads:
116+
thread.start()
117+
for thread in threads:
118+
thread.join()
119+
for thread in threads:
120+
self.assertTrue(thread.passed)
121+
122+
# It's possible that SDAM can rediscover the server and mark the
123+
# pool ready before the thread in the wait queue has a chance
124+
# to run. Repeat the test until the thread actually encounters
125+
# a PoolClearedError.
126+
if cmap_listener.event_count(ConnectionCheckOutFailedEvent):
127+
break
128+
129+
# Via CMAP monitoring, assert that the first check out succeeds.
130+
cmap_events = cmap_listener.events_by_type(
131+
(ConnectionCheckedOutEvent, ConnectionCheckOutFailedEvent, PoolClearedEvent)
132+
)
133+
msg = pprint.pformat(cmap_listener.events)
134+
self.assertIsInstance(cmap_events[0], ConnectionCheckedOutEvent, msg)
135+
self.assertIsInstance(cmap_events[1], PoolClearedEvent, msg)
136+
self.assertIsInstance(cmap_events[2], ConnectionCheckOutFailedEvent, msg)
137+
self.assertEqual(cmap_events[2].reason, ConnectionCheckOutFailedReason.CONN_ERROR, msg)
138+
self.assertIsInstance(cmap_events[3], ConnectionCheckedOutEvent, msg)
139+
140+
# Connection check out failures are not reflected in command
141+
# monitoring because we only publish command events _after_ checking
142+
# out a connection.
143+
started = cmd_listener.started_events
144+
msg = pprint.pformat(cmd_listener.results)
145+
self.assertEqual(3, len(started), msg)
146+
succeeded = cmd_listener.succeeded_events
147+
self.assertEqual(2, len(succeeded), msg)
148+
failed = cmd_listener.failed_events
149+
self.assertEqual(1, len(failed), msg)
150+
151+
152+
class TestRetryableReads(AsyncIntegrationTest):
153+
@async_client_context.require_multiple_mongoses
154+
@async_client_context.require_failCommand_fail_point
155+
async def test_retryable_reads_in_sharded_cluster_multiple_available(self):
156+
fail_command = {
157+
"configureFailPoint": "failCommand",
158+
"mode": {"times": 1},
159+
"data": {
160+
"failCommands": ["find"],
161+
"closeConnection": True,
162+
"appName": "retryableReadTest",
163+
},
164+
}
165+
166+
mongos_clients = []
167+
168+
for mongos in async_client_context.mongos_seeds().split(","):
169+
client = await self.async_rs_or_single_client(mongos)
170+
set_fail_point(client, fail_command)
171+
self.addAsyncCleanup(client.close)
172+
mongos_clients.append(client)
173+
174+
listener = OvertCommandListener()
175+
client = self.async_rs_or_single_client(
176+
async_client_context.mongos_seeds(),
177+
appName="retryableReadTest",
178+
event_listeners=[listener],
179+
retryReads=True,
180+
)
181+
182+
async with self.fail_point(fail_command):
183+
with self.assertRaises(AutoReconnect):
184+
await client.t.t.find_one({})
185+
186+
# Disable failpoints on each mongos
187+
for client in mongos_clients:
188+
fail_command["mode"] = "off"
189+
set_fail_point(client, fail_command)
190+
191+
self.assertEqual(len(listener.failed_events), 2)
192+
self.assertEqual(len(listener.succeeded_events), 0)
193+
194+
195+
if __name__ == "__main__":
196+
unittest.main()

test/test_retryable_reads.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
PoolClearedEvent,
4545
)
4646

47+
_IS_SYNC = True
48+
4749
# Location of JSON test specifications.
4850
_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "retryable_reads", "legacy")
4951

@@ -83,6 +85,7 @@ class TestPoolPausedError(IntegrationTest):
8385
RUN_ON_LOAD_BALANCER = False
8486
RUN_ON_SERVERLESS = False
8587

88+
@client_context.require_sync
8689
@client_context.require_failCommand_blockConnection
8790
@client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05)
8891
def test_pool_paused_error_is_retryable(self):

tools/synchro.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@
180180
"test_logger.py",
181181
"test_monitoring.py",
182182
"test_raw_bson.py",
183+
"test_retryable_reads.py",
183184
"test_session.py",
184185
"test_transactions.py",
185186
]

0 commit comments

Comments
 (0)