From b631c5a3838478bf9d544a9d4b09fd1d8a71b6d8 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 1 Oct 2024 13:35:50 -0700 Subject: [PATCH 1/9] Migrate test_connections_survive_primary_stepdown_spec.py to async --- ...nnections_survive_primary_stepdown_spec.py | 148 ++++++++++++++++++ ...nnections_survive_primary_stepdown_spec.py | 8 +- tools/synchro.py | 1 + 3 files changed, 154 insertions(+), 3 deletions(-) create mode 100644 test/asynchronous/test_connections_survive_primary_stepdown_spec.py diff --git a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py new file mode 100644 index 0000000000..5686fafd56 --- /dev/null +++ b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py @@ -0,0 +1,148 @@ +# Copyright 2019-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test compliance with the connections survive primary step down spec.""" +from __future__ import annotations + +import sys + +sys.path[0:0] = [""] + +from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest +from test.utils import ( + CMAPListener, + ensure_all_connected, + repl_set_step_down, +) + +from bson import SON +from pymongo import monitoring +from pymongo.asynchronous.collection import AsyncCollection +from pymongo.errors import NotPrimaryError +from pymongo.write_concern import WriteConcern + +_IS_SYNC = False + + +class TestAsyncConnectionsSurvivePrimaryStepDown(AsyncIntegrationTest): + listener: CMAPListener + coll: AsyncCollection + + @classmethod + @async_client_context.require_replica_set + async def _setup_class(cls): + await super()._setup_class() + cls.listener = CMAPListener() + cls.client = await cls.unmanaged_async_rs_or_single_client( + event_listeners=[cls.listener], retryWrites=False, heartbeatFrequencyMS=500 + ) + + # Ensure connections to all servers in replica set. This is to test + # that the is_writable flag is properly updated for connections that + # survive a replica set election. + ensure_all_connected(cls.client) + cls.listener.reset() + + cls.db = cls.client.get_database("step-down", write_concern=WriteConcern("majority")) + cls.coll = cls.db.get_collection("step-down", write_concern=WriteConcern("majority")) + + @classmethod + async def _tearDown_class(cls): + await cls.client.close() + + async def asyncSetUp(self): + # Note that all ops use same write-concern as self.db (majority). + await self.db.drop_collection("step-down") + await self.db.create_collection("step-down") + self.listener.reset() + + async def set_fail_point(self, command_args): + cmd = SON([("configureFailPoint", "failCommand")]) + cmd.update(command_args) + await self.client.admin.command(cmd) + + def verify_pool_cleared(self): + self.assertEqual(self.listener.event_count(monitoring.PoolClearedEvent), 1) + + def verify_pool_not_cleared(self): + self.assertEqual(self.listener.event_count(monitoring.PoolClearedEvent), 0) + + @async_client_context.require_version_min(4, 2, -1) + async def test_get_more_iteration(self): + # Insert 5 documents with WC majority. + await self.coll.insert_many([{"data": k} for k in range(5)]) + # Start a find operation and retrieve first batch of results. + batch_size = 2 + cursor = self.coll.find(batch_size=batch_size) + for _ in range(batch_size): + await cursor.next() + # Force step-down the primary. + repl_set_step_down(self.client, replSetStepDown=5, force=True) + # Get await anext batch of results. + for _ in range(batch_size): + await cursor.next() + # Verify pool not cleared. + self.verify_pool_not_cleared() + # Attempt insertion to mark server description as stale and prevent a + # NotPrimaryError on the subsequent operation. + try: + await self.coll.insert_one({}) + except NotPrimaryError: + pass + # Next insert should succeed on the new primary without clearing pool. + await self.coll.insert_one({}) + self.verify_pool_not_cleared() + + async def run_scenario(self, error_code, retry, pool_status_checker): + # Set fail point. + self.set_fail_point( + {"mode": {"times": 1}, "data": {"failCommands": ["insert"], "errorCode": error_code}} + ) + self.addAsyncCleanup(self.set_fail_point, {"mode": "off"}) + # Insert record and verify failure. + with self.assertRaises(NotPrimaryError) as exc: + await self.coll.insert_one({"test": 1}) + self.assertEqual(exc.exception.details["code"], error_code) # type: ignore[call-overload] + # Retry before CMAPListener assertion if retry_before=True. + if retry: + await self.coll.insert_one({"test": 1}) + # Verify pool cleared/not cleared. + pool_status_checker() + # Always retry here to ensure discovery of new primary. + await self.coll.insert_one({"test": 1}) + + @async_client_context.require_version_min(4, 2, -1) + @async_client_context.require_test_commands + async def test_not_primary_keep_connection_pool(self): + self.run_scenario(10107, True, self.verify_pool_not_cleared) + + @async_client_context.require_version_min(4, 0, 0) + @async_client_context.require_version_max(4, 1, 0, -1) + @async_client_context.require_test_commands + async def test_not_primary_reset_connection_pool(self): + self.run_scenario(10107, False, self.verify_pool_cleared) + + @async_client_context.require_version_min(4, 0, 0) + @async_client_context.require_test_commands + async def test_shutdown_in_progress(self): + self.run_scenario(91, False, self.verify_pool_cleared) + + @async_client_context.require_version_min(4, 0, 0) + @async_client_context.require_test_commands + async def test_interrupted_at_shutdown(self): + self.run_scenario(11600, False, self.verify_pool_cleared) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_connections_survive_primary_stepdown_spec.py b/test/test_connections_survive_primary_stepdown_spec.py index fba7675743..51df01e709 100644 --- a/test/test_connections_survive_primary_stepdown_spec.py +++ b/test/test_connections_survive_primary_stepdown_spec.py @@ -32,6 +32,8 @@ from pymongo.synchronous.collection import Collection from pymongo.write_concern import WriteConcern +_IS_SYNC = True + class TestConnectionsSurvivePrimaryStepDown(IntegrationTest): listener: CMAPListener @@ -39,8 +41,8 @@ class TestConnectionsSurvivePrimaryStepDown(IntegrationTest): @classmethod @client_context.require_replica_set - def setUpClass(cls): - super().setUpClass() + def _setup_class(cls): + super()._setup_class() cls.listener = CMAPListener() cls.client = cls.unmanaged_rs_or_single_client( event_listeners=[cls.listener], retryWrites=False, heartbeatFrequencyMS=500 @@ -56,7 +58,7 @@ def setUpClass(cls): cls.coll = cls.db.get_collection("step-down", write_concern=WriteConcern("majority")) @classmethod - def tearDownClass(cls): + def _tearDown_class(cls): cls.client.close() def setUp(self): diff --git a/tools/synchro.py b/tools/synchro.py index 3333b0de2e..5a5826fd02 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -186,6 +186,7 @@ def async_only_test(f: str) -> bool: "test_client_bulk_write.py", "test_client_context.py", "test_collection.py", + "test_connections_survive_primary_stepdown_spec.py", "test_cursor.py", "test_database.py", "test_encryption.py", From 6137f3262805cb1d717387b4d138c10ecf752201 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 1 Oct 2024 13:51:38 -0700 Subject: [PATCH 2/9] make an async ensure all connected --- ...nnections_survive_primary_stepdown_spec.py | 4 +-- ...nnections_survive_primary_stepdown_spec.py | 4 +-- test/utils.py | 34 +++++++++++++++++++ 3 files changed, 38 insertions(+), 4 deletions(-) diff --git a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py index 5686fafd56..e9d646b2eb 100644 --- a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py +++ b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py @@ -22,7 +22,7 @@ from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest from test.utils import ( CMAPListener, - ensure_all_connected, + async_ensure_all_connected, repl_set_step_down, ) @@ -51,7 +51,7 @@ async def _setup_class(cls): # Ensure connections to all servers in replica set. This is to test # that the is_writable flag is properly updated for connections that # survive a replica set election. - ensure_all_connected(cls.client) + async_ensure_all_connected(cls.client) cls.listener.reset() cls.db = cls.client.get_database("step-down", write_concern=WriteConcern("majority")) diff --git a/test/test_connections_survive_primary_stepdown_spec.py b/test/test_connections_survive_primary_stepdown_spec.py index 51df01e709..a932946882 100644 --- a/test/test_connections_survive_primary_stepdown_spec.py +++ b/test/test_connections_survive_primary_stepdown_spec.py @@ -22,7 +22,7 @@ from test import IntegrationTest, client_context, unittest from test.utils import ( CMAPListener, - ensure_all_connected, + async_ensure_all_connected, repl_set_step_down, ) @@ -51,7 +51,7 @@ def _setup_class(cls): # Ensure connections to all servers in replica set. This is to test # that the is_writable flag is properly updated for connections that # survive a replica set election. - ensure_all_connected(cls.client) + async_ensure_all_connected(cls.client) cls.listener.reset() cls.db = cls.client.get_database("step-down", write_concern=WriteConcern("majority")) diff --git a/test/utils.py b/test/utils.py index 9615034899..ae5c255f75 100644 --- a/test/utils.py +++ b/test/utils.py @@ -599,6 +599,40 @@ def discover(): ) +async def async_ensure_all_connected(client: AsyncMongoClient) -> None: + """Ensure that the client's connection pool has socket connections to all + members of a replica set. Raises ConfigurationError when called with a + non-replica set client. + + Depending on the use-case, the caller may need to clear any event listeners + that are configured on the client. + """ + hello: dict = await client.admin.command(HelloCompat.LEGACY_CMD) + if "setName" not in hello: + raise ConfigurationError("cluster is not a replica set") + + target_host_list = set(hello["hosts"] + hello.get("passives", [])) + connected_host_list = {hello["me"]} + + # Run hello until we have connected to each host at least once. + def discover(): + i = 0 + while i < 100 and connected_host_list != target_host_list: + hello: dict = client.admin.command( + HelloCompat.LEGACY_CMD, read_preference=ReadPreference.SECONDARY + ) + connected_host_list.update([hello["me"]]) + i += 1 + return connected_host_list + + try: + async_wait_until(lambda: target_host_list == discover(), "connected to all hosts") + except AssertionError as exc: + raise AssertionError( + f"{exc}, {connected_host_list} != {target_host_list}, {client.topology_description}" + ) + + def one(s): """Get one element of a set""" return next(iter(s)) From 467c99ae8e66aa5bba7c7ee70e7b019ab6351d4d Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 1 Oct 2024 14:18:15 -0700 Subject: [PATCH 3/9] update synchro --- test/test_connections_survive_primary_stepdown_spec.py | 4 ++-- tools/synchro.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/test/test_connections_survive_primary_stepdown_spec.py b/test/test_connections_survive_primary_stepdown_spec.py index a932946882..51df01e709 100644 --- a/test/test_connections_survive_primary_stepdown_spec.py +++ b/test/test_connections_survive_primary_stepdown_spec.py @@ -22,7 +22,7 @@ from test import IntegrationTest, client_context, unittest from test.utils import ( CMAPListener, - async_ensure_all_connected, + ensure_all_connected, repl_set_step_down, ) @@ -51,7 +51,7 @@ def _setup_class(cls): # Ensure connections to all servers in replica set. This is to test # that the is_writable flag is properly updated for connections that # survive a replica set election. - async_ensure_all_connected(cls.client) + ensure_all_connected(cls.client) cls.listener.reset() cls.db = cls.client.get_database("step-down", write_concern=WriteConcern("majority")) diff --git a/tools/synchro.py b/tools/synchro.py index 5a5826fd02..84f7779586 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -105,6 +105,7 @@ "AsyncTestGridFile": "TestGridFile", "AsyncTestGridFileNoConnect": "TestGridFileNoConnect", "async_set_fail_point": "set_fail_point", + "async_ensure_all_connected": "ensure_all_connected", } docstring_replacements: dict[tuple[str, str], str] = { From fa9606fe1e75ebf6338e5fe4ee4be88123b57210 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 1 Oct 2024 14:37:36 -0700 Subject: [PATCH 4/9] add await *facepalm* --- .../test_connections_survive_primary_stepdown_spec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py index e9d646b2eb..3edb3f5a89 100644 --- a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py +++ b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py @@ -51,7 +51,7 @@ async def _setup_class(cls): # Ensure connections to all servers in replica set. This is to test # that the is_writable flag is properly updated for connections that # survive a replica set election. - async_ensure_all_connected(cls.client) + await async_ensure_all_connected(cls.client) cls.listener.reset() cls.db = cls.client.get_database("step-down", write_concern=WriteConcern("majority")) From c6f835f7141748d1b98f6d0799efa0a11fb92c27 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 1 Oct 2024 14:40:47 -0700 Subject: [PATCH 5/9] add await *double facepalm* --- .../test_connections_survive_primary_stepdown_spec.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py index 3edb3f5a89..53e35ed94c 100644 --- a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py +++ b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py @@ -125,23 +125,23 @@ async def run_scenario(self, error_code, retry, pool_status_checker): @async_client_context.require_version_min(4, 2, -1) @async_client_context.require_test_commands async def test_not_primary_keep_connection_pool(self): - self.run_scenario(10107, True, self.verify_pool_not_cleared) + await self.run_scenario(10107, True, self.verify_pool_not_cleared) @async_client_context.require_version_min(4, 0, 0) @async_client_context.require_version_max(4, 1, 0, -1) @async_client_context.require_test_commands async def test_not_primary_reset_connection_pool(self): - self.run_scenario(10107, False, self.verify_pool_cleared) + await self.run_scenario(10107, False, self.verify_pool_cleared) @async_client_context.require_version_min(4, 0, 0) @async_client_context.require_test_commands async def test_shutdown_in_progress(self): - self.run_scenario(91, False, self.verify_pool_cleared) + await self.run_scenario(91, False, self.verify_pool_cleared) @async_client_context.require_version_min(4, 0, 0) @async_client_context.require_test_commands async def test_interrupted_at_shutdown(self): - self.run_scenario(11600, False, self.verify_pool_cleared) + await self.run_scenario(11600, False, self.verify_pool_cleared) if __name__ == "__main__": From b16a024636ee22e4a37f0ed967a49a67960f54b3 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 1 Oct 2024 15:03:35 -0700 Subject: [PATCH 6/9] add await *triple facepalm* --- test/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/utils.py b/test/utils.py index ae5c255f75..bbc6851153 100644 --- a/test/utils.py +++ b/test/utils.py @@ -626,7 +626,7 @@ def discover(): return connected_host_list try: - async_wait_until(lambda: target_host_list == discover(), "connected to all hosts") + await async_wait_until(lambda: target_host_list == discover(), "connected to all hosts") except AssertionError as exc: raise AssertionError( f"{exc}, {connected_host_list} != {target_host_list}, {client.topology_description}" From daa394536f09858eee29885f53437e88bcf1d409 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 1 Oct 2024 15:19:57 -0700 Subject: [PATCH 7/9] make discover async --- test/utils.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/test/utils.py b/test/utils.py index bbc6851153..dd30255be4 100644 --- a/test/utils.py +++ b/test/utils.py @@ -615,10 +615,10 @@ async def async_ensure_all_connected(client: AsyncMongoClient) -> None: connected_host_list = {hello["me"]} # Run hello until we have connected to each host at least once. - def discover(): + async def discover(): i = 0 while i < 100 and connected_host_list != target_host_list: - hello: dict = client.admin.command( + hello: dict = await client.admin.command( HelloCompat.LEGACY_CMD, read_preference=ReadPreference.SECONDARY ) connected_host_list.update([hello["me"]]) @@ -626,7 +626,11 @@ def discover(): return connected_host_list try: - await async_wait_until(lambda: target_host_list == discover(), "connected to all hosts") + + async def predicate(): + return target_host_list == await discover() + + await async_wait_until(predicate, "connected to all hosts") except AssertionError as exc: raise AssertionError( f"{exc}, {connected_host_list} != {target_host_list}, {client.topology_description}" From 04e02e327d98019ba2ebca304bb489c2b68d303a Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 1 Oct 2024 15:40:27 -0700 Subject: [PATCH 8/9] add missing await --- .../test_connections_survive_primary_stepdown_spec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py index 53e35ed94c..4926e548c7 100644 --- a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py +++ b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py @@ -106,7 +106,7 @@ async def test_get_more_iteration(self): async def run_scenario(self, error_code, retry, pool_status_checker): # Set fail point. - self.set_fail_point( + await self.set_fail_point( {"mode": {"times": 1}, "data": {"failCommands": ["insert"], "errorCode": error_code}} ) self.addAsyncCleanup(self.set_fail_point, {"mode": "off"}) From f8710fab99b5e01732f3b7317e3fff9d395ebee5 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 1 Oct 2024 16:08:13 -0700 Subject: [PATCH 9/9] make repl_set_step_down aysnc --- test/asynchronous/helpers.py | 11 +++++++++++ .../test_connections_survive_primary_stepdown_spec.py | 4 ++-- test/helpers.py | 11 +++++++++++ .../test_connections_survive_primary_stepdown_spec.py | 2 +- test/utils.py | 10 ---------- tools/synchro.py | 1 + 6 files changed, 26 insertions(+), 13 deletions(-) diff --git a/test/asynchronous/helpers.py b/test/asynchronous/helpers.py index 46f66af62d..b5fc5d8ac4 100644 --- a/test/asynchronous/helpers.py +++ b/test/asynchronous/helpers.py @@ -42,6 +42,7 @@ from bson.son import SON from pymongo import common, message +from pymongo.read_preferences import ReadPreference from pymongo.ssl_support import HAVE_SSL, _ssl # type:ignore[attr-defined] from pymongo.uri_parser import parse_uri @@ -150,6 +151,16 @@ def _create_user(authdb, user, pwd=None, roles=None, **kwargs): return authdb.command(cmd) +async def async_repl_set_step_down(client, **kwargs): + """Run replSetStepDown, first unfreezing a secondary with replSetFreeze.""" + cmd = SON([("replSetStepDown", 1)]) + cmd.update(kwargs) + + # Unfreeze a secondary to ensure a speedy election. + await client.admin.command("replSetFreeze", 0, read_preference=ReadPreference.SECONDARY) + await client.admin.command(cmd) + + class client_knobs: def __init__( self, diff --git a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py index 4926e548c7..289cf49751 100644 --- a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py +++ b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py @@ -20,10 +20,10 @@ sys.path[0:0] = [""] from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest +from test.asynchronous.helpers import async_repl_set_step_down from test.utils import ( CMAPListener, async_ensure_all_connected, - repl_set_step_down, ) from bson import SON @@ -88,7 +88,7 @@ async def test_get_more_iteration(self): for _ in range(batch_size): await cursor.next() # Force step-down the primary. - repl_set_step_down(self.client, replSetStepDown=5, force=True) + await async_repl_set_step_down(self.client, replSetStepDown=5, force=True) # Get await anext batch of results. for _ in range(batch_size): await cursor.next() diff --git a/test/helpers.py b/test/helpers.py index bf6186d1a0..11d5ab0374 100644 --- a/test/helpers.py +++ b/test/helpers.py @@ -42,6 +42,7 @@ from bson.son import SON from pymongo import common, message +from pymongo.read_preferences import ReadPreference from pymongo.ssl_support import HAVE_SSL, _ssl # type:ignore[attr-defined] from pymongo.uri_parser import parse_uri @@ -150,6 +151,16 @@ def _create_user(authdb, user, pwd=None, roles=None, **kwargs): return authdb.command(cmd) +def repl_set_step_down(client, **kwargs): + """Run replSetStepDown, first unfreezing a secondary with replSetFreeze.""" + cmd = SON([("replSetStepDown", 1)]) + cmd.update(kwargs) + + # Unfreeze a secondary to ensure a speedy election. + client.admin.command("replSetFreeze", 0, read_preference=ReadPreference.SECONDARY) + client.admin.command(cmd) + + class client_knobs: def __init__( self, diff --git a/test/test_connections_survive_primary_stepdown_spec.py b/test/test_connections_survive_primary_stepdown_spec.py index 51df01e709..54cc4e0482 100644 --- a/test/test_connections_survive_primary_stepdown_spec.py +++ b/test/test_connections_survive_primary_stepdown_spec.py @@ -20,10 +20,10 @@ sys.path[0:0] = [""] from test import IntegrationTest, client_context, unittest +from test.helpers import repl_set_step_down from test.utils import ( CMAPListener, ensure_all_connected, - repl_set_step_down, ) from bson import SON diff --git a/test/utils.py b/test/utils.py index dd30255be4..9c78cff3ad 100644 --- a/test/utils.py +++ b/test/utils.py @@ -799,16 +799,6 @@ async def async_wait_until(predicate, success_description, timeout=10): await asyncio.sleep(interval) -def repl_set_step_down(client, **kwargs): - """Run replSetStepDown, first unfreezing a secondary with replSetFreeze.""" - cmd = SON([("replSetStepDown", 1)]) - cmd.update(kwargs) - - # Unfreeze a secondary to ensure a speedy election. - client.admin.command("replSetFreeze", 0, read_preference=ReadPreference.SECONDARY) - client.admin.command(cmd) - - def is_mongos(client): res = client.admin.command(HelloCompat.LEGACY_CMD) return res.get("msg", "") == "isdbgrid" diff --git a/tools/synchro.py b/tools/synchro.py index 84f7779586..d8ec9ae46f 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -106,6 +106,7 @@ "AsyncTestGridFileNoConnect": "TestGridFileNoConnect", "async_set_fail_point": "set_fail_point", "async_ensure_all_connected": "ensure_all_connected", + "async_repl_set_step_down": "repl_set_step_down", } docstring_replacements: dict[tuple[str, str], str] = {