Skip to content

Commit 702c86c

Browse files
authored
PYTHON-5095 - Convert test_read_write_concern_spec to async (#2111)
1 parent 6b141d1 commit 702c86c

File tree

4 files changed

+364
-11
lines changed

4 files changed

+364
-11
lines changed

test/asynchronous/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1176,15 +1176,15 @@ def unmanaged_simple_client(
11761176

11771177
async def disable_replication(self, client):
11781178
"""Disable replication on all secondaries."""
1179-
for h, p in client.secondaries:
1179+
for h, p in await client.secondaries:
11801180
secondary = await self.async_single_client(h, p)
1181-
secondary.admin.command("configureFailPoint", "stopReplProducer", mode="alwaysOn")
1181+
await secondary.admin.command("configureFailPoint", "stopReplProducer", mode="alwaysOn")
11821182

11831183
async def enable_replication(self, client):
11841184
"""Enable replication on all secondaries."""
1185-
for h, p in client.secondaries:
1185+
for h, p in await client.secondaries:
11861186
secondary = await self.async_single_client(h, p)
1187-
secondary.admin.command("configureFailPoint", "stopReplProducer", mode="off")
1187+
await secondary.admin.command("configureFailPoint", "stopReplProducer", mode="off")
11881188

11891189

11901190
class AsyncUnitTest(AsyncPyMongoTestCase):
Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,344 @@
1+
# Copyright 2018-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Run the read and write concern tests."""
16+
from __future__ import annotations
17+
18+
import json
19+
import os
20+
import sys
21+
import warnings
22+
from pathlib import Path
23+
24+
sys.path[0:0] = [""]
25+
26+
from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest
27+
from test.asynchronous.unified_format import generate_test_classes
28+
from test.utils import OvertCommandListener
29+
30+
from pymongo import DESCENDING
31+
from pymongo.asynchronous.mongo_client import AsyncMongoClient
32+
from pymongo.errors import (
33+
BulkWriteError,
34+
ConfigurationError,
35+
WriteConcernError,
36+
WriteError,
37+
WTimeoutError,
38+
)
39+
from pymongo.operations import IndexModel, InsertOne
40+
from pymongo.read_concern import ReadConcern
41+
from pymongo.write_concern import WriteConcern
42+
43+
_IS_SYNC = False
44+
45+
# Location of JSON test specifications.
46+
if _IS_SYNC:
47+
TEST_PATH = os.path.join(Path(__file__).resolve().parent, "read_write_concern")
48+
else:
49+
TEST_PATH = os.path.join(Path(__file__).resolve().parent.parent, "read_write_concern")
50+
51+
52+
class TestReadWriteConcernSpec(AsyncIntegrationTest):
53+
async def test_omit_default_read_write_concern(self):
54+
listener = OvertCommandListener()
55+
# Client with default readConcern and writeConcern
56+
client = await self.async_rs_or_single_client(event_listeners=[listener])
57+
collection = client.pymongo_test.collection
58+
# Prepare for tests of find() and aggregate().
59+
await collection.insert_many([{} for _ in range(10)])
60+
self.addAsyncCleanup(collection.drop)
61+
self.addAsyncCleanup(client.pymongo_test.collection2.drop)
62+
# Commands MUST NOT send the default read/write concern to the server.
63+
64+
async def rename_and_drop():
65+
# Ensure collection exists.
66+
await collection.insert_one({})
67+
await collection.rename("collection2")
68+
await client.pymongo_test.collection2.drop()
69+
70+
async def insert_command_default_write_concern():
71+
await collection.database.command(
72+
"insert", "collection", documents=[{}], write_concern=WriteConcern()
73+
)
74+
75+
async def aggregate_op():
76+
await (await collection.aggregate([])).to_list()
77+
78+
ops = [
79+
("aggregate", aggregate_op),
80+
("find", lambda: collection.find().to_list()),
81+
("insert_one", lambda: collection.insert_one({})),
82+
("update_one", lambda: collection.update_one({}, {"$set": {"x": 1}})),
83+
("update_many", lambda: collection.update_many({}, {"$set": {"x": 1}})),
84+
("delete_one", lambda: collection.delete_one({})),
85+
("delete_many", lambda: collection.delete_many({})),
86+
("bulk_write", lambda: collection.bulk_write([InsertOne({})])),
87+
("rename_and_drop", rename_and_drop),
88+
("command", insert_command_default_write_concern),
89+
]
90+
91+
for name, f in ops:
92+
listener.reset()
93+
await f()
94+
95+
self.assertGreaterEqual(len(listener.started_events), 1)
96+
for _i, event in enumerate(listener.started_events):
97+
self.assertNotIn(
98+
"readConcern",
99+
event.command,
100+
f"{name} sent default readConcern with {event.command_name}",
101+
)
102+
self.assertNotIn(
103+
"writeConcern",
104+
event.command,
105+
f"{name} sent default writeConcern with {event.command_name}",
106+
)
107+
108+
async def assertWriteOpsRaise(self, write_concern, expected_exception):
109+
wc = write_concern.document
110+
# Set socket timeout to avoid indefinite stalls
111+
client = await self.async_rs_or_single_client(
112+
w=wc["w"], wTimeoutMS=wc["wtimeout"], socketTimeoutMS=30000
113+
)
114+
db = client.get_database("pymongo_test")
115+
coll = db.test
116+
117+
async def insert_command():
118+
await coll.database.command(
119+
"insert",
120+
"new_collection",
121+
documents=[{}],
122+
writeConcern=write_concern.document,
123+
parse_write_concern_error=True,
124+
)
125+
126+
ops = [
127+
("insert_one", lambda: coll.insert_one({})),
128+
("insert_many", lambda: coll.insert_many([{}, {}])),
129+
("update_one", lambda: coll.update_one({}, {"$set": {"x": 1}})),
130+
("update_many", lambda: coll.update_many({}, {"$set": {"x": 1}})),
131+
("delete_one", lambda: coll.delete_one({})),
132+
("delete_many", lambda: coll.delete_many({})),
133+
("bulk_write", lambda: coll.bulk_write([InsertOne({})])),
134+
("command", insert_command),
135+
("aggregate", lambda: coll.aggregate([{"$out": "out"}])),
136+
# SERVER-46668 Delete all the documents in the collection to
137+
# workaround a hang in createIndexes.
138+
("delete_many", lambda: coll.delete_many({})),
139+
("create_index", lambda: coll.create_index([("a", DESCENDING)])),
140+
("create_indexes", lambda: coll.create_indexes([IndexModel("b")])),
141+
("drop_index", lambda: coll.drop_index([("a", DESCENDING)])),
142+
("create", lambda: db.create_collection("new")),
143+
("rename", lambda: coll.rename("new")),
144+
("drop", lambda: db.new.drop()),
145+
]
146+
# SERVER-47194: dropDatabase does not respect wtimeout in 3.6.
147+
if async_client_context.version[:2] != (3, 6):
148+
ops.append(("drop_database", lambda: client.drop_database(db)))
149+
150+
for name, f in ops:
151+
# Ensure insert_many and bulk_write still raise BulkWriteError.
152+
if name in ("insert_many", "bulk_write"):
153+
expected = BulkWriteError
154+
else:
155+
expected = expected_exception
156+
with self.assertRaises(expected, msg=name) as cm:
157+
await f()
158+
if expected == BulkWriteError:
159+
bulk_result = cm.exception.details
160+
assert bulk_result is not None
161+
wc_errors = bulk_result["writeConcernErrors"]
162+
self.assertTrue(wc_errors)
163+
164+
@async_client_context.require_replica_set
165+
async def test_raise_write_concern_error(self):
166+
self.addAsyncCleanup(async_client_context.client.drop_database, "pymongo_test")
167+
assert async_client_context.w is not None
168+
await self.assertWriteOpsRaise(
169+
WriteConcern(w=async_client_context.w + 1, wtimeout=1), WriteConcernError
170+
)
171+
172+
@async_client_context.require_secondaries_count(1)
173+
@async_client_context.require_test_commands
174+
async def test_raise_wtimeout(self):
175+
self.addAsyncCleanup(async_client_context.client.drop_database, "pymongo_test")
176+
self.addAsyncCleanup(self.enable_replication, async_client_context.client)
177+
# Disable replication to guarantee a wtimeout error.
178+
await self.disable_replication(async_client_context.client)
179+
await self.assertWriteOpsRaise(
180+
WriteConcern(w=async_client_context.w, wtimeout=1), WTimeoutError
181+
)
182+
183+
@async_client_context.require_failCommand_fail_point
184+
async def test_error_includes_errInfo(self):
185+
expected_wce = {
186+
"code": 100,
187+
"codeName": "UnsatisfiableWriteConcern",
188+
"errmsg": "Not enough data-bearing nodes",
189+
"errInfo": {"writeConcern": {"w": 2, "wtimeout": 0, "provenance": "clientSupplied"}},
190+
}
191+
cause_wce = {
192+
"configureFailPoint": "failCommand",
193+
"mode": {"times": 2},
194+
"data": {"failCommands": ["insert"], "writeConcernError": expected_wce},
195+
}
196+
async with self.fail_point(cause_wce):
197+
# Write concern error on insert includes errInfo.
198+
with self.assertRaises(WriteConcernError) as ctx:
199+
await self.db.test.insert_one({})
200+
self.assertEqual(ctx.exception.details, expected_wce)
201+
202+
# Test bulk_write as well.
203+
with self.assertRaises(BulkWriteError) as ctx:
204+
await self.db.test.bulk_write([InsertOne({})])
205+
expected_details = {
206+
"writeErrors": [],
207+
"writeConcernErrors": [expected_wce],
208+
"nInserted": 1,
209+
"nUpserted": 0,
210+
"nMatched": 0,
211+
"nModified": 0,
212+
"nRemoved": 0,
213+
"upserted": [],
214+
}
215+
self.assertEqual(ctx.exception.details, expected_details)
216+
217+
@async_client_context.require_version_min(4, 9)
218+
async def test_write_error_details_exposes_errinfo(self):
219+
listener = OvertCommandListener()
220+
client = await self.async_rs_or_single_client(event_listeners=[listener])
221+
db = client.errinfotest
222+
self.addAsyncCleanup(client.drop_database, "errinfotest")
223+
validator = {"x": {"$type": "string"}}
224+
await db.create_collection("test", validator=validator)
225+
with self.assertRaises(WriteError) as ctx:
226+
await db.test.insert_one({"x": 1})
227+
self.assertEqual(ctx.exception.code, 121)
228+
self.assertIsNotNone(ctx.exception.details)
229+
assert ctx.exception.details is not None
230+
self.assertIsNotNone(ctx.exception.details.get("errInfo"))
231+
for event in listener.succeeded_events:
232+
if event.command_name == "insert":
233+
self.assertEqual(event.reply["writeErrors"][0], ctx.exception.details)
234+
break
235+
else:
236+
self.fail("Couldn't find insert event.")
237+
238+
239+
def normalize_write_concern(concern):
240+
result = {}
241+
for key in concern:
242+
if key.lower() == "wtimeoutms":
243+
result["wtimeout"] = concern[key]
244+
elif key == "journal":
245+
result["j"] = concern[key]
246+
else:
247+
result[key] = concern[key]
248+
return result
249+
250+
251+
def create_connection_string_test(test_case):
252+
def run_test(self):
253+
uri = test_case["uri"]
254+
valid = test_case["valid"]
255+
warning = test_case["warning"]
256+
257+
if not valid:
258+
if warning is False:
259+
self.assertRaises(
260+
(ConfigurationError, ValueError), AsyncMongoClient, uri, connect=False
261+
)
262+
else:
263+
with warnings.catch_warnings():
264+
warnings.simplefilter("error", UserWarning)
265+
self.assertRaises(UserWarning, AsyncMongoClient, uri, connect=False)
266+
else:
267+
client = AsyncMongoClient(uri, connect=False)
268+
if "writeConcern" in test_case:
269+
document = client.write_concern.document
270+
self.assertEqual(document, normalize_write_concern(test_case["writeConcern"]))
271+
if "readConcern" in test_case:
272+
document = client.read_concern.document
273+
self.assertEqual(document, test_case["readConcern"])
274+
275+
return run_test
276+
277+
278+
def create_document_test(test_case):
279+
def run_test(self):
280+
valid = test_case["valid"]
281+
282+
if "writeConcern" in test_case:
283+
normalized = normalize_write_concern(test_case["writeConcern"])
284+
if not valid:
285+
self.assertRaises((ConfigurationError, ValueError), WriteConcern, **normalized)
286+
else:
287+
write_concern = WriteConcern(**normalized)
288+
self.assertEqual(write_concern.document, test_case["writeConcernDocument"])
289+
self.assertEqual(write_concern.acknowledged, test_case["isAcknowledged"])
290+
self.assertEqual(write_concern.is_server_default, test_case["isServerDefault"])
291+
if "readConcern" in test_case:
292+
# Any string for 'level' is equally valid
293+
read_concern = ReadConcern(**test_case["readConcern"])
294+
self.assertEqual(read_concern.document, test_case["readConcernDocument"])
295+
self.assertEqual(not bool(read_concern.level), test_case["isServerDefault"])
296+
297+
return run_test
298+
299+
300+
def create_tests():
301+
for dirpath, _, filenames in os.walk(TEST_PATH):
302+
dirname = os.path.split(dirpath)[-1]
303+
304+
if dirname == "operation":
305+
# This directory is tested by TestOperations.
306+
continue
307+
elif dirname == "connection-string":
308+
create_test = create_connection_string_test
309+
else:
310+
create_test = create_document_test
311+
312+
for filename in filenames:
313+
with open(os.path.join(dirpath, filename)) as test_stream:
314+
test_cases = json.load(test_stream)["tests"]
315+
316+
fname = os.path.splitext(filename)[0]
317+
for test_case in test_cases:
318+
new_test = create_test(test_case)
319+
test_name = "test_{}_{}_{}".format(
320+
dirname.replace("-", "_"),
321+
fname.replace("-", "_"),
322+
str(test_case["description"].lower().replace(" ", "_")),
323+
)
324+
325+
new_test.__name__ = test_name
326+
setattr(TestReadWriteConcernSpec, new_test.__name__, new_test)
327+
328+
329+
create_tests()
330+
331+
332+
# Generate unified tests.
333+
# PyMongo does not support MapReduce.
334+
globals().update(
335+
generate_test_classes(
336+
os.path.join(TEST_PATH, "operation"),
337+
module=__name__,
338+
expected_failures=["MapReduce .*"],
339+
)
340+
)
341+
342+
343+
if __name__ == "__main__":
344+
unittest.main()

0 commit comments

Comments
 (0)