Skip to content

Commit 5f4922f

Browse files
committed
make an async ensure all connected
1 parent cacf350 commit 5f4922f

File tree

3 files changed

+38
-4
lines changed

3 files changed

+38
-4
lines changed

test/asynchronous/test_connections_survive_primary_stepdown_spec.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest
2323
from test.utils import (
2424
CMAPListener,
25-
ensure_all_connected,
25+
async_ensure_all_connected,
2626
repl_set_step_down,
2727
)
2828

@@ -51,7 +51,7 @@ async def _setup_class(cls):
5151
# Ensure connections to all servers in replica set. This is to test
5252
# that the is_writable flag is properly updated for connections that
5353
# survive a replica set election.
54-
ensure_all_connected(cls.client)
54+
async_ensure_all_connected(cls.client)
5555
cls.listener.reset()
5656

5757
cls.db = cls.client.get_database("step-down", write_concern=WriteConcern("majority"))

test/test_connections_survive_primary_stepdown_spec.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from test import IntegrationTest, client_context, unittest
2323
from test.utils import (
2424
CMAPListener,
25-
ensure_all_connected,
25+
async_ensure_all_connected,
2626
repl_set_step_down,
2727
)
2828

@@ -51,7 +51,7 @@ def _setup_class(cls):
5151
# Ensure connections to all servers in replica set. This is to test
5252
# that the is_writable flag is properly updated for connections that
5353
# survive a replica set election.
54-
ensure_all_connected(cls.client)
54+
async_ensure_all_connected(cls.client)
5555
cls.listener.reset()
5656

5757
cls.db = cls.client.get_database("step-down", write_concern=WriteConcern("majority"))

test/utils.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,40 @@ def discover():
599599
)
600600

601601

602+
async def async_ensure_all_connected(client: AsyncMongoClient) -> None:
603+
"""Ensure that the client's connection pool has socket connections to all
604+
members of a replica set. Raises ConfigurationError when called with a
605+
non-replica set client.
606+
607+
Depending on the use-case, the caller may need to clear any event listeners
608+
that are configured on the client.
609+
"""
610+
hello: dict = await client.admin.command(HelloCompat.LEGACY_CMD)
611+
if "setName" not in hello:
612+
raise ConfigurationError("cluster is not a replica set")
613+
614+
target_host_list = set(hello["hosts"] + hello.get("passives", []))
615+
connected_host_list = {hello["me"]}
616+
617+
# Run hello until we have connected to each host at least once.
618+
def discover():
619+
i = 0
620+
while i < 100 and connected_host_list != target_host_list:
621+
hello: dict = client.admin.command(
622+
HelloCompat.LEGACY_CMD, read_preference=ReadPreference.SECONDARY
623+
)
624+
connected_host_list.update([hello["me"]])
625+
i += 1
626+
return connected_host_list
627+
628+
try:
629+
async_wait_until(lambda: target_host_list == discover(), "connected to all hosts")
630+
except AssertionError as exc:
631+
raise AssertionError(
632+
f"{exc}, {connected_host_list} != {target_host_list}, {client.topology_description}"
633+
)
634+
635+
602636
def one(s):
603637
"""Get one element of a set"""
604638
return next(iter(s))

0 commit comments

Comments
 (0)