Skip to content

Commit af23139

Browse files
authored
PYTHON-4805 Migrate test_connections_survive_primary_stepdown_spec.py to async (#1889)
1 parent 2a83349 commit af23139

File tree

6 files changed

+217
-14
lines changed

6 files changed

+217
-14
lines changed

test/asynchronous/helpers.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
from bson.son import SON
4444
from pymongo import common, message
45+
from pymongo.read_preferences import ReadPreference
4546
from pymongo.ssl_support import HAVE_SSL, _ssl # type:ignore[attr-defined]
4647
from pymongo.uri_parser import parse_uri
4748

@@ -150,6 +151,16 @@ def _create_user(authdb, user, pwd=None, roles=None, **kwargs):
150151
return authdb.command(cmd)
151152

152153

154+
async def async_repl_set_step_down(client, **kwargs):
155+
"""Run replSetStepDown, first unfreezing a secondary with replSetFreeze."""
156+
cmd = SON([("replSetStepDown", 1)])
157+
cmd.update(kwargs)
158+
159+
# Unfreeze a secondary to ensure a speedy election.
160+
await client.admin.command("replSetFreeze", 0, read_preference=ReadPreference.SECONDARY)
161+
await client.admin.command(cmd)
162+
163+
153164
class client_knobs:
154165
def __init__(
155166
self,
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# Copyright 2019-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Test compliance with the connections survive primary step down spec."""
16+
from __future__ import annotations
17+
18+
import sys
19+
20+
sys.path[0:0] = [""]
21+
22+
from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest
23+
from test.asynchronous.helpers import async_repl_set_step_down
24+
from test.utils import (
25+
CMAPListener,
26+
async_ensure_all_connected,
27+
)
28+
29+
from bson import SON
30+
from pymongo import monitoring
31+
from pymongo.asynchronous.collection import AsyncCollection
32+
from pymongo.errors import NotPrimaryError
33+
from pymongo.write_concern import WriteConcern
34+
35+
_IS_SYNC = False
36+
37+
38+
class TestAsyncConnectionsSurvivePrimaryStepDown(AsyncIntegrationTest):
39+
listener: CMAPListener
40+
coll: AsyncCollection
41+
42+
@classmethod
43+
@async_client_context.require_replica_set
44+
async def _setup_class(cls):
45+
await super()._setup_class()
46+
cls.listener = CMAPListener()
47+
cls.client = await cls.unmanaged_async_rs_or_single_client(
48+
event_listeners=[cls.listener], retryWrites=False, heartbeatFrequencyMS=500
49+
)
50+
51+
# Ensure connections to all servers in replica set. This is to test
52+
# that the is_writable flag is properly updated for connections that
53+
# survive a replica set election.
54+
await async_ensure_all_connected(cls.client)
55+
cls.listener.reset()
56+
57+
cls.db = cls.client.get_database("step-down", write_concern=WriteConcern("majority"))
58+
cls.coll = cls.db.get_collection("step-down", write_concern=WriteConcern("majority"))
59+
60+
@classmethod
61+
async def _tearDown_class(cls):
62+
await cls.client.close()
63+
64+
async def asyncSetUp(self):
65+
# Note that all ops use same write-concern as self.db (majority).
66+
await self.db.drop_collection("step-down")
67+
await self.db.create_collection("step-down")
68+
self.listener.reset()
69+
70+
async def set_fail_point(self, command_args):
71+
cmd = SON([("configureFailPoint", "failCommand")])
72+
cmd.update(command_args)
73+
await self.client.admin.command(cmd)
74+
75+
def verify_pool_cleared(self):
76+
self.assertEqual(self.listener.event_count(monitoring.PoolClearedEvent), 1)
77+
78+
def verify_pool_not_cleared(self):
79+
self.assertEqual(self.listener.event_count(monitoring.PoolClearedEvent), 0)
80+
81+
@async_client_context.require_version_min(4, 2, -1)
82+
async def test_get_more_iteration(self):
83+
# Insert 5 documents with WC majority.
84+
await self.coll.insert_many([{"data": k} for k in range(5)])
85+
# Start a find operation and retrieve first batch of results.
86+
batch_size = 2
87+
cursor = self.coll.find(batch_size=batch_size)
88+
for _ in range(batch_size):
89+
await cursor.next()
90+
# Force step-down the primary.
91+
await async_repl_set_step_down(self.client, replSetStepDown=5, force=True)
92+
# Get await anext batch of results.
93+
for _ in range(batch_size):
94+
await cursor.next()
95+
# Verify pool not cleared.
96+
self.verify_pool_not_cleared()
97+
# Attempt insertion to mark server description as stale and prevent a
98+
# NotPrimaryError on the subsequent operation.
99+
try:
100+
await self.coll.insert_one({})
101+
except NotPrimaryError:
102+
pass
103+
# Next insert should succeed on the new primary without clearing pool.
104+
await self.coll.insert_one({})
105+
self.verify_pool_not_cleared()
106+
107+
async def run_scenario(self, error_code, retry, pool_status_checker):
108+
# Set fail point.
109+
await self.set_fail_point(
110+
{"mode": {"times": 1}, "data": {"failCommands": ["insert"], "errorCode": error_code}}
111+
)
112+
self.addAsyncCleanup(self.set_fail_point, {"mode": "off"})
113+
# Insert record and verify failure.
114+
with self.assertRaises(NotPrimaryError) as exc:
115+
await self.coll.insert_one({"test": 1})
116+
self.assertEqual(exc.exception.details["code"], error_code) # type: ignore[call-overload]
117+
# Retry before CMAPListener assertion if retry_before=True.
118+
if retry:
119+
await self.coll.insert_one({"test": 1})
120+
# Verify pool cleared/not cleared.
121+
pool_status_checker()
122+
# Always retry here to ensure discovery of new primary.
123+
await self.coll.insert_one({"test": 1})
124+
125+
@async_client_context.require_version_min(4, 2, -1)
126+
@async_client_context.require_test_commands
127+
async def test_not_primary_keep_connection_pool(self):
128+
await self.run_scenario(10107, True, self.verify_pool_not_cleared)
129+
130+
@async_client_context.require_version_min(4, 0, 0)
131+
@async_client_context.require_version_max(4, 1, 0, -1)
132+
@async_client_context.require_test_commands
133+
async def test_not_primary_reset_connection_pool(self):
134+
await self.run_scenario(10107, False, self.verify_pool_cleared)
135+
136+
@async_client_context.require_version_min(4, 0, 0)
137+
@async_client_context.require_test_commands
138+
async def test_shutdown_in_progress(self):
139+
await self.run_scenario(91, False, self.verify_pool_cleared)
140+
141+
@async_client_context.require_version_min(4, 0, 0)
142+
@async_client_context.require_test_commands
143+
async def test_interrupted_at_shutdown(self):
144+
await self.run_scenario(11600, False, self.verify_pool_cleared)
145+
146+
147+
if __name__ == "__main__":
148+
unittest.main()

test/helpers.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
from bson.son import SON
4444
from pymongo import common, message
45+
from pymongo.read_preferences import ReadPreference
4546
from pymongo.ssl_support import HAVE_SSL, _ssl # type:ignore[attr-defined]
4647
from pymongo.uri_parser import parse_uri
4748

@@ -150,6 +151,16 @@ def _create_user(authdb, user, pwd=None, roles=None, **kwargs):
150151
return authdb.command(cmd)
151152

152153

154+
def repl_set_step_down(client, **kwargs):
155+
"""Run replSetStepDown, first unfreezing a secondary with replSetFreeze."""
156+
cmd = SON([("replSetStepDown", 1)])
157+
cmd.update(kwargs)
158+
159+
# Unfreeze a secondary to ensure a speedy election.
160+
client.admin.command("replSetFreeze", 0, read_preference=ReadPreference.SECONDARY)
161+
client.admin.command(cmd)
162+
163+
153164
class client_knobs:
154165
def __init__(
155166
self,

test/test_connections_survive_primary_stepdown_spec.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
sys.path[0:0] = [""]
2121

2222
from test import IntegrationTest, client_context, unittest
23+
from test.helpers import repl_set_step_down
2324
from test.utils import (
2425
CMAPListener,
2526
ensure_all_connected,
26-
repl_set_step_down,
2727
)
2828

2929
from bson import SON
@@ -32,15 +32,17 @@
3232
from pymongo.synchronous.collection import Collection
3333
from pymongo.write_concern import WriteConcern
3434

35+
_IS_SYNC = True
36+
3537

3638
class TestConnectionsSurvivePrimaryStepDown(IntegrationTest):
3739
listener: CMAPListener
3840
coll: Collection
3941

4042
@classmethod
4143
@client_context.require_replica_set
42-
def setUpClass(cls):
43-
super().setUpClass()
44+
def _setup_class(cls):
45+
super()._setup_class()
4446
cls.listener = CMAPListener()
4547
cls.client = cls.unmanaged_rs_or_single_client(
4648
event_listeners=[cls.listener], retryWrites=False, heartbeatFrequencyMS=500
@@ -56,7 +58,7 @@ def setUpClass(cls):
5658
cls.coll = cls.db.get_collection("step-down", write_concern=WriteConcern("majority"))
5759

5860
@classmethod
59-
def tearDownClass(cls):
61+
def _tearDown_class(cls):
6062
cls.client.close()
6163

6264
def setUp(self):

test/utils.py

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,44 @@ def discover():
599599
)
600600

601601

602+
async def async_ensure_all_connected(client: AsyncMongoClient) -> None:
603+
"""Ensure that the client's connection pool has socket connections to all
604+
members of a replica set. Raises ConfigurationError when called with a
605+
non-replica set client.
606+
607+
Depending on the use-case, the caller may need to clear any event listeners
608+
that are configured on the client.
609+
"""
610+
hello: dict = await client.admin.command(HelloCompat.LEGACY_CMD)
611+
if "setName" not in hello:
612+
raise ConfigurationError("cluster is not a replica set")
613+
614+
target_host_list = set(hello["hosts"] + hello.get("passives", []))
615+
connected_host_list = {hello["me"]}
616+
617+
# Run hello until we have connected to each host at least once.
618+
async def discover():
619+
i = 0
620+
while i < 100 and connected_host_list != target_host_list:
621+
hello: dict = await client.admin.command(
622+
HelloCompat.LEGACY_CMD, read_preference=ReadPreference.SECONDARY
623+
)
624+
connected_host_list.update([hello["me"]])
625+
i += 1
626+
return connected_host_list
627+
628+
try:
629+
630+
async def predicate():
631+
return target_host_list == await discover()
632+
633+
await async_wait_until(predicate, "connected to all hosts")
634+
except AssertionError as exc:
635+
raise AssertionError(
636+
f"{exc}, {connected_host_list} != {target_host_list}, {client.topology_description}"
637+
)
638+
639+
602640
def one(s):
603641
"""Get one element of a set"""
604642
return next(iter(s))
@@ -761,16 +799,6 @@ async def async_wait_until(predicate, success_description, timeout=10):
761799
await asyncio.sleep(interval)
762800

763801

764-
def repl_set_step_down(client, **kwargs):
765-
"""Run replSetStepDown, first unfreezing a secondary with replSetFreeze."""
766-
cmd = SON([("replSetStepDown", 1)])
767-
cmd.update(kwargs)
768-
769-
# Unfreeze a secondary to ensure a speedy election.
770-
client.admin.command("replSetFreeze", 0, read_preference=ReadPreference.SECONDARY)
771-
client.admin.command(cmd)
772-
773-
774802
def is_mongos(client):
775803
res = client.admin.command(HelloCompat.LEGACY_CMD)
776804
return res.get("msg", "") == "isdbgrid"

tools/synchro.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@
105105
"AsyncTestGridFile": "TestGridFile",
106106
"AsyncTestGridFileNoConnect": "TestGridFileNoConnect",
107107
"async_set_fail_point": "set_fail_point",
108+
"async_ensure_all_connected": "ensure_all_connected",
109+
"async_repl_set_step_down": "repl_set_step_down",
108110
}
109111

110112
docstring_replacements: dict[tuple[str, str], str] = {
@@ -186,6 +188,7 @@ def async_only_test(f: str) -> bool:
186188
"test_client_bulk_write.py",
187189
"test_client_context.py",
188190
"test_collection.py",
191+
"test_connections_survive_primary_stepdown_spec.py",
189192
"test_cursor.py",
190193
"test_database.py",
191194
"test_encryption.py",

0 commit comments

Comments
 (0)