Skip to content

Commit cacf350

Browse files
committed
Migrate test_connections_survive_primary_stepdown_spec.py to async
1 parent 0279407 commit cacf350

File tree

3 files changed

+154
-3
lines changed

3 files changed

+154
-3
lines changed
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# Copyright 2019-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Test compliance with the connections survive primary step down spec."""
16+
from __future__ import annotations
17+
18+
import sys
19+
20+
sys.path[0:0] = [""]
21+
22+
from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest
23+
from test.utils import (
24+
CMAPListener,
25+
ensure_all_connected,
26+
repl_set_step_down,
27+
)
28+
29+
from bson import SON
30+
from pymongo import monitoring
31+
from pymongo.asynchronous.collection import AsyncCollection
32+
from pymongo.errors import NotPrimaryError
33+
from pymongo.write_concern import WriteConcern
34+
35+
_IS_SYNC = False
36+
37+
38+
class TestAsyncConnectionsSurvivePrimaryStepDown(AsyncIntegrationTest):
39+
listener: CMAPListener
40+
coll: AsyncCollection
41+
42+
@classmethod
43+
@async_client_context.require_replica_set
44+
async def _setup_class(cls):
45+
await super()._setup_class()
46+
cls.listener = CMAPListener()
47+
cls.client = await cls.unmanaged_async_rs_or_single_client(
48+
event_listeners=[cls.listener], retryWrites=False, heartbeatFrequencyMS=500
49+
)
50+
51+
# Ensure connections to all servers in replica set. This is to test
52+
# that the is_writable flag is properly updated for connections that
53+
# survive a replica set election.
54+
ensure_all_connected(cls.client)
55+
cls.listener.reset()
56+
57+
cls.db = cls.client.get_database("step-down", write_concern=WriteConcern("majority"))
58+
cls.coll = cls.db.get_collection("step-down", write_concern=WriteConcern("majority"))
59+
60+
@classmethod
61+
async def _tearDown_class(cls):
62+
await cls.client.close()
63+
64+
async def asyncSetUp(self):
65+
# Note that all ops use same write-concern as self.db (majority).
66+
await self.db.drop_collection("step-down")
67+
await self.db.create_collection("step-down")
68+
self.listener.reset()
69+
70+
async def set_fail_point(self, command_args):
71+
cmd = SON([("configureFailPoint", "failCommand")])
72+
cmd.update(command_args)
73+
await self.client.admin.command(cmd)
74+
75+
def verify_pool_cleared(self):
76+
self.assertEqual(self.listener.event_count(monitoring.PoolClearedEvent), 1)
77+
78+
def verify_pool_not_cleared(self):
79+
self.assertEqual(self.listener.event_count(monitoring.PoolClearedEvent), 0)
80+
81+
@async_client_context.require_version_min(4, 2, -1)
82+
async def test_get_more_iteration(self):
83+
# Insert 5 documents with WC majority.
84+
await self.coll.insert_many([{"data": k} for k in range(5)])
85+
# Start a find operation and retrieve first batch of results.
86+
batch_size = 2
87+
cursor = self.coll.find(batch_size=batch_size)
88+
for _ in range(batch_size):
89+
await cursor.next()
90+
# Force step-down the primary.
91+
repl_set_step_down(self.client, replSetStepDown=5, force=True)
92+
# Get await anext batch of results.
93+
for _ in range(batch_size):
94+
await cursor.next()
95+
# Verify pool not cleared.
96+
self.verify_pool_not_cleared()
97+
# Attempt insertion to mark server description as stale and prevent a
98+
# NotPrimaryError on the subsequent operation.
99+
try:
100+
await self.coll.insert_one({})
101+
except NotPrimaryError:
102+
pass
103+
# Next insert should succeed on the new primary without clearing pool.
104+
await self.coll.insert_one({})
105+
self.verify_pool_not_cleared()
106+
107+
async def run_scenario(self, error_code, retry, pool_status_checker):
108+
# Set fail point.
109+
self.set_fail_point(
110+
{"mode": {"times": 1}, "data": {"failCommands": ["insert"], "errorCode": error_code}}
111+
)
112+
self.addAsyncCleanup(self.set_fail_point, {"mode": "off"})
113+
# Insert record and verify failure.
114+
with self.assertRaises(NotPrimaryError) as exc:
115+
await self.coll.insert_one({"test": 1})
116+
self.assertEqual(exc.exception.details["code"], error_code) # type: ignore[call-overload]
117+
# Retry before CMAPListener assertion if retry_before=True.
118+
if retry:
119+
await self.coll.insert_one({"test": 1})
120+
# Verify pool cleared/not cleared.
121+
pool_status_checker()
122+
# Always retry here to ensure discovery of new primary.
123+
await self.coll.insert_one({"test": 1})
124+
125+
@async_client_context.require_version_min(4, 2, -1)
126+
@async_client_context.require_test_commands
127+
async def test_not_primary_keep_connection_pool(self):
128+
self.run_scenario(10107, True, self.verify_pool_not_cleared)
129+
130+
@async_client_context.require_version_min(4, 0, 0)
131+
@async_client_context.require_version_max(4, 1, 0, -1)
132+
@async_client_context.require_test_commands
133+
async def test_not_primary_reset_connection_pool(self):
134+
self.run_scenario(10107, False, self.verify_pool_cleared)
135+
136+
@async_client_context.require_version_min(4, 0, 0)
137+
@async_client_context.require_test_commands
138+
async def test_shutdown_in_progress(self):
139+
self.run_scenario(91, False, self.verify_pool_cleared)
140+
141+
@async_client_context.require_version_min(4, 0, 0)
142+
@async_client_context.require_test_commands
143+
async def test_interrupted_at_shutdown(self):
144+
self.run_scenario(11600, False, self.verify_pool_cleared)
145+
146+
147+
if __name__ == "__main__":
148+
unittest.main()

test/test_connections_survive_primary_stepdown_spec.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,17 @@
3232
from pymongo.synchronous.collection import Collection
3333
from pymongo.write_concern import WriteConcern
3434

35+
_IS_SYNC = True
36+
3537

3638
class TestConnectionsSurvivePrimaryStepDown(IntegrationTest):
3739
listener: CMAPListener
3840
coll: Collection
3941

4042
@classmethod
4143
@client_context.require_replica_set
42-
def setUpClass(cls):
43-
super().setUpClass()
44+
def _setup_class(cls):
45+
super()._setup_class()
4446
cls.listener = CMAPListener()
4547
cls.client = cls.unmanaged_rs_or_single_client(
4648
event_listeners=[cls.listener], retryWrites=False, heartbeatFrequencyMS=500
@@ -56,7 +58,7 @@ def setUpClass(cls):
5658
cls.coll = cls.db.get_collection("step-down", write_concern=WriteConcern("majority"))
5759

5860
@classmethod
59-
def tearDownClass(cls):
61+
def _tearDown_class(cls):
6062
cls.client.close()
6163

6264
def setUp(self):

tools/synchro.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ def async_only_test(f: str) -> bool:
186186
"test_client_bulk_write.py",
187187
"test_client_context.py",
188188
"test_collection.py",
189+
"test_connections_survive_primary_stepdown_spec.py",
189190
"test_cursor.py",
190191
"test_database.py",
191192
"test_encryption.py",

0 commit comments

Comments
 (0)