From 5a1e7cb93e38e9677409ec2d95ef1278fef1e46d Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Fri, 17 Jan 2025 13:50:43 -0500 Subject: [PATCH 1/6] PYTHON-5044 - Successive AsyncMongoClients on a single loop always timeout on server selection --- pymongo/asynchronous/mongo_client.py | 2 ++ pymongo/network_layer.py | 31 +++++++++++++++++----------- pymongo/periodic_executor.py | 2 ++ pymongo/synchronous/mongo_client.py | 1 + 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 1600e50628..1f75353112 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -1565,6 +1565,8 @@ async def close(self) -> None: # TODO: PYTHON-1921 Encrypted MongoClients cannot be re-opened. await self._encrypter.close() self._closed = True + # Yield to the asyncio event loop so all executor tasks properly exit after cancellation + await asyncio.sleep(0) if not _IS_SYNC: # Add support for contextlib.aclosing. diff --git a/pymongo/network_layer.py b/pymongo/network_layer.py index c1db31f89c..11c66bf16e 100644 --- a/pymongo/network_layer.py +++ b/pymongo/network_layer.py @@ -267,18 +267,25 @@ async def async_receive_data( else: read_task = create_task(_async_receive(sock, length, loop)) # type: ignore[arg-type] tasks = [read_task, cancellation_task] - done, pending = await asyncio.wait( - tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED - ) - for task in pending: - task.cancel() - if pending: - await asyncio.wait(pending) - if len(done) == 0: - raise socket.timeout("timed out") - if read_task in done: - return read_task.result() - raise _OperationCancelled("operation cancelled") + try: + done, pending = await asyncio.wait( + tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED + ) + for task in pending: + task.cancel() + if pending: + await asyncio.wait(pending) + if len(done) == 0: + raise socket.timeout("timed out") + if read_task in done: + return read_task.result() + raise _OperationCancelled("operation cancelled") + except asyncio.CancelledError: + for task in tasks: + task.cancel() + await asyncio.wait(tasks) + raise + finally: sock.settimeout(sock_timeout) diff --git a/pymongo/periodic_executor.py b/pymongo/periodic_executor.py index 2f89b91deb..ca22a21d14 100644 --- a/pymongo/periodic_executor.py +++ b/pymongo/periodic_executor.py @@ -75,6 +75,8 @@ def close(self, dummy: Any = None) -> None: callback; see monitor.py. """ self._stopped = True + if self._task: + self._task.cancel() async def join(self, timeout: Optional[int] = None) -> None: if self._task is not None: diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index a694a58c1e..92cfe78713 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -1559,6 +1559,7 @@ def close(self) -> None: # TODO: PYTHON-1921 Encrypted MongoClients cannot be re-opened. self._encrypter.close() self._closed = True + # Yield to the asyncio event loop so all executor tasks properly exit after cancellation if not _IS_SYNC: # Add support for contextlib.closing. From cdc2cf0f4c8234cc68ef5d3db9439925e765cd95 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 21 Jan 2025 11:08:53 -0500 Subject: [PATCH 2/6] Only join executors on async --- pymongo/asynchronous/mongo_client.py | 4 ++-- pymongo/asynchronous/monitor.py | 4 ++++ pymongo/periodic_executor.py | 11 ++--------- pymongo/synchronous/mongo_client.py | 3 ++- pymongo/synchronous/monitor.py | 4 ++++ test/__init__.py | 2 -- test/asynchronous/__init__.py | 2 -- 7 files changed, 14 insertions(+), 16 deletions(-) diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 1f75353112..0026827108 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -1559,14 +1559,14 @@ async def close(self) -> None: # Stop the periodic task thread and then send pending killCursor # requests before closing the topology. self._kill_cursors_executor.close() + if not _IS_SYNC: + await self._kill_cursors_executor.join() await self._process_kill_cursors() await self._topology.close() if self._encrypter: # TODO: PYTHON-1921 Encrypted MongoClients cannot be re-opened. await self._encrypter.close() self._closed = True - # Yield to the asyncio event loop so all executor tasks properly exit after cancellation - await asyncio.sleep(0) if not _IS_SYNC: # Add support for contextlib.aclosing. diff --git a/pymongo/asynchronous/monitor.py b/pymongo/asynchronous/monitor.py index ad1bc70aba..de22a30780 100644 --- a/pymongo/asynchronous/monitor.py +++ b/pymongo/asynchronous/monitor.py @@ -191,6 +191,8 @@ def gc_safe_close(self) -> None: async def close(self) -> None: self.gc_safe_close() + if not _IS_SYNC: + await self._executor.join() await self._rtt_monitor.close() # Increment the generation and maybe close the socket. If the executor # thread has the socket checked out, it will be closed when checked in. @@ -458,6 +460,8 @@ def __init__(self, topology: Topology, topology_settings: TopologySettings, pool async def close(self) -> None: self.gc_safe_close() + if not _IS_SYNC: + await self._executor.join() # Increment the generation and maybe close the socket. If the executor # thread has the socket checked out, it will be closed when checked in. await self._pool.reset() diff --git a/pymongo/periodic_executor.py b/pymongo/periodic_executor.py index ca22a21d14..f51a988728 100644 --- a/pymongo/periodic_executor.py +++ b/pymongo/periodic_executor.py @@ -75,19 +75,12 @@ def close(self, dummy: Any = None) -> None: callback; see monitor.py. """ self._stopped = True - if self._task: + if self._task is not None: self._task.cancel() async def join(self, timeout: Optional[int] = None) -> None: if self._task is not None: - try: - await asyncio.wait_for(self._task, timeout=timeout) # type-ignore: [arg-type] - except asyncio.TimeoutError: - # Task timed out - pass - except asyncio.exceptions.CancelledError: - # Task was already finished, or not yet started. - raise + await asyncio.wait([self._task], timeout=timeout) # type-ignore: [arg-type] def wake(self) -> None: """Execute the target function soon.""" diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 92cfe78713..1b8f9dc5f7 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -1553,13 +1553,14 @@ def close(self) -> None: # Stop the periodic task thread and then send pending killCursor # requests before closing the topology. self._kill_cursors_executor.close() + if not _IS_SYNC: + self._kill_cursors_executor.join() self._process_kill_cursors() self._topology.close() if self._encrypter: # TODO: PYTHON-1921 Encrypted MongoClients cannot be re-opened. self._encrypter.close() self._closed = True - # Yield to the asyncio event loop so all executor tasks properly exit after cancellation if not _IS_SYNC: # Add support for contextlib.closing. diff --git a/pymongo/synchronous/monitor.py b/pymongo/synchronous/monitor.py index df4130d4ab..5558c5fd07 100644 --- a/pymongo/synchronous/monitor.py +++ b/pymongo/synchronous/monitor.py @@ -191,6 +191,8 @@ def gc_safe_close(self) -> None: def close(self) -> None: self.gc_safe_close() + if not _IS_SYNC: + self._executor.join() self._rtt_monitor.close() # Increment the generation and maybe close the socket. If the executor # thread has the socket checked out, it will be closed when checked in. @@ -458,6 +460,8 @@ def __init__(self, topology: Topology, topology_settings: TopologySettings, pool def close(self) -> None: self.gc_safe_close() + if not _IS_SYNC: + self._executor.join() # Increment the generation and maybe close the socket. If the executor # thread has the socket checked out, it will be closed when checked in. self._pool.reset() diff --git a/test/__init__.py b/test/__init__.py index d3a63db2d5..f165a7cc72 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -1136,8 +1136,6 @@ class IntegrationTest(PyMongoTestCase): @client_context.require_connection def setUp(self) -> None: - if not _IS_SYNC: - reset_client_context() if client_context.load_balancer and not getattr(self, "RUN_ON_LOAD_BALANCER", False): raise SkipTest("this test does not support load balancers") if client_context.serverless and not getattr(self, "RUN_ON_SERVERLESS", False): diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index 73e2824742..3d82d90792 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -1154,8 +1154,6 @@ class AsyncIntegrationTest(AsyncPyMongoTestCase): @async_client_context.require_connection async def asyncSetUp(self) -> None: - if not _IS_SYNC: - await reset_client_context() if async_client_context.load_balancer and not getattr(self, "RUN_ON_LOAD_BALANCER", False): raise SkipTest("this test does not support load balancers") if async_client_context.serverless and not getattr(self, "RUN_ON_SERVERLESS", False): From b898826069a4ef50ba5a4ff3b576e89d89e6c72d Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 21 Jan 2025 11:19:19 -0500 Subject: [PATCH 3/6] Remove unneeded reset_async_client_context --- test/__init__.py | 10 ---------- test/asynchronous/__init__.py | 10 ---------- .../test_connections_survive_primary_stepdown_spec.py | 1 - test/test_connections_survive_primary_stepdown_spec.py | 1 - 4 files changed, 22 deletions(-) diff --git a/test/__init__.py b/test/__init__.py index f165a7cc72..ed7966f718 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -864,16 +864,6 @@ def max_message_size_bytes(self): client_context = ClientContext() -def reset_client_context(): - if _IS_SYNC: - # sync tests don't need to reset a client context - return - elif client_context.client is not None: - client_context.client.close() - client_context.client = None - client_context._init_client() - - class PyMongoTestCase(unittest.TestCase): def assertEqualCommand(self, expected, actual, msg=None): self.assertEqual(sanitize_cmd(expected), sanitize_cmd(actual), msg) diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index 3d82d90792..45db6fcd9a 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -866,16 +866,6 @@ async def max_message_size_bytes(self): async_client_context = AsyncClientContext() -async def reset_client_context(): - if _IS_SYNC: - # sync tests don't need to reset a client context - return - elif async_client_context.client is not None: - await async_client_context.client.close() - async_client_context.client = None - await async_client_context._init_client() - - class AsyncPyMongoTestCase(unittest.IsolatedAsyncioTestCase): def assertEqualCommand(self, expected, actual, msg=None): self.assertEqual(sanitize_cmd(expected), sanitize_cmd(actual), msg) diff --git a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py index 4795d3937a..7c11742a90 100644 --- a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py +++ b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py @@ -22,7 +22,6 @@ from test.asynchronous import ( AsyncIntegrationTest, async_client_context, - reset_client_context, unittest, ) from test.asynchronous.helpers import async_repl_set_step_down diff --git a/test/test_connections_survive_primary_stepdown_spec.py b/test/test_connections_survive_primary_stepdown_spec.py index 1fb08cbed5..9cac633301 100644 --- a/test/test_connections_survive_primary_stepdown_spec.py +++ b/test/test_connections_survive_primary_stepdown_spec.py @@ -22,7 +22,6 @@ from test import ( IntegrationTest, client_context, - reset_client_context, unittest, ) from test.helpers import repl_set_step_down From 6547c3cb06c01ac46ee28ed39b5044efed1f7ca5 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 21 Jan 2025 14:02:22 -0500 Subject: [PATCH 4/6] async spec runner should only call kill_all_sessions after runs --- test/asynchronous/utils_spec_runner.py | 5 +++-- test/utils_spec_runner.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/test/asynchronous/utils_spec_runner.py b/test/asynchronous/utils_spec_runner.py index b79e5258b5..ee135f3dbc 100644 --- a/test/asynchronous/utils_spec_runner.py +++ b/test/asynchronous/utils_spec_runner.py @@ -647,10 +647,11 @@ async def setup_scenario(self, scenario_def): async def run_scenario(self, scenario_def, test): self.maybe_skip_scenario(test) - # Kill all sessions before and after each test to prevent an open + # Kill all sessions before (sync only) and after each test to prevent an open # transaction (from a test failure) from blocking collection/database # operations during test set up and tear down. - await self.kill_all_sessions() + if _IS_SYNC: + await self.kill_all_sessions() self.addAsyncCleanup(self.kill_all_sessions) await self.setup_scenario(scenario_def) database_name = self.get_scenario_db_name(scenario_def) diff --git a/test/utils_spec_runner.py b/test/utils_spec_runner.py index 4508502cd0..6695714d71 100644 --- a/test/utils_spec_runner.py +++ b/test/utils_spec_runner.py @@ -647,10 +647,11 @@ def setup_scenario(self, scenario_def): def run_scenario(self, scenario_def, test): self.maybe_skip_scenario(test) - # Kill all sessions before and after each test to prevent an open + # Kill all sessions before (sync only) and after each test to prevent an open # transaction (from a test failure) from blocking collection/database # operations during test set up and tear down. - self.kill_all_sessions() + if _IS_SYNC: + self.kill_all_sessions() self.addCleanup(self.kill_all_sessions) self.setup_scenario(scenario_def) database_name = self.get_scenario_db_name(scenario_def) From f1f8d62d0aa3c035028cf84a3b214297fc7ce55b Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 21 Jan 2025 15:43:43 -0500 Subject: [PATCH 5/6] Revert all cleanup changes --- pymongo/asynchronous/mongo_client.py | 2 -- pymongo/asynchronous/monitor.py | 4 ---- pymongo/synchronous/mongo_client.py | 2 -- pymongo/synchronous/monitor.py | 4 ---- test/__init__.py | 12 ++++++++++++ test/asynchronous/__init__.py | 12 ++++++++++++ test/asynchronous/utils_spec_runner.py | 5 ++--- test/utils_spec_runner.py | 5 ++--- 8 files changed, 28 insertions(+), 18 deletions(-) diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 0026827108..1600e50628 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -1559,8 +1559,6 @@ async def close(self) -> None: # Stop the periodic task thread and then send pending killCursor # requests before closing the topology. self._kill_cursors_executor.close() - if not _IS_SYNC: - await self._kill_cursors_executor.join() await self._process_kill_cursors() await self._topology.close() if self._encrypter: diff --git a/pymongo/asynchronous/monitor.py b/pymongo/asynchronous/monitor.py index de22a30780..ad1bc70aba 100644 --- a/pymongo/asynchronous/monitor.py +++ b/pymongo/asynchronous/monitor.py @@ -191,8 +191,6 @@ def gc_safe_close(self) -> None: async def close(self) -> None: self.gc_safe_close() - if not _IS_SYNC: - await self._executor.join() await self._rtt_monitor.close() # Increment the generation and maybe close the socket. If the executor # thread has the socket checked out, it will be closed when checked in. @@ -460,8 +458,6 @@ def __init__(self, topology: Topology, topology_settings: TopologySettings, pool async def close(self) -> None: self.gc_safe_close() - if not _IS_SYNC: - await self._executor.join() # Increment the generation and maybe close the socket. If the executor # thread has the socket checked out, it will be closed when checked in. await self._pool.reset() diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 1b8f9dc5f7..a694a58c1e 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -1553,8 +1553,6 @@ def close(self) -> None: # Stop the periodic task thread and then send pending killCursor # requests before closing the topology. self._kill_cursors_executor.close() - if not _IS_SYNC: - self._kill_cursors_executor.join() self._process_kill_cursors() self._topology.close() if self._encrypter: diff --git a/pymongo/synchronous/monitor.py b/pymongo/synchronous/monitor.py index 5558c5fd07..df4130d4ab 100644 --- a/pymongo/synchronous/monitor.py +++ b/pymongo/synchronous/monitor.py @@ -191,8 +191,6 @@ def gc_safe_close(self) -> None: def close(self) -> None: self.gc_safe_close() - if not _IS_SYNC: - self._executor.join() self._rtt_monitor.close() # Increment the generation and maybe close the socket. If the executor # thread has the socket checked out, it will be closed when checked in. @@ -460,8 +458,6 @@ def __init__(self, topology: Topology, topology_settings: TopologySettings, pool def close(self) -> None: self.gc_safe_close() - if not _IS_SYNC: - self._executor.join() # Increment the generation and maybe close the socket. If the executor # thread has the socket checked out, it will be closed when checked in. self._pool.reset() diff --git a/test/__init__.py b/test/__init__.py index ed7966f718..d3a63db2d5 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -864,6 +864,16 @@ def max_message_size_bytes(self): client_context = ClientContext() +def reset_client_context(): + if _IS_SYNC: + # sync tests don't need to reset a client context + return + elif client_context.client is not None: + client_context.client.close() + client_context.client = None + client_context._init_client() + + class PyMongoTestCase(unittest.TestCase): def assertEqualCommand(self, expected, actual, msg=None): self.assertEqual(sanitize_cmd(expected), sanitize_cmd(actual), msg) @@ -1126,6 +1136,8 @@ class IntegrationTest(PyMongoTestCase): @client_context.require_connection def setUp(self) -> None: + if not _IS_SYNC: + reset_client_context() if client_context.load_balancer and not getattr(self, "RUN_ON_LOAD_BALANCER", False): raise SkipTest("this test does not support load balancers") if client_context.serverless and not getattr(self, "RUN_ON_SERVERLESS", False): diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index 45db6fcd9a..73e2824742 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -866,6 +866,16 @@ async def max_message_size_bytes(self): async_client_context = AsyncClientContext() +async def reset_client_context(): + if _IS_SYNC: + # sync tests don't need to reset a client context + return + elif async_client_context.client is not None: + await async_client_context.client.close() + async_client_context.client = None + await async_client_context._init_client() + + class AsyncPyMongoTestCase(unittest.IsolatedAsyncioTestCase): def assertEqualCommand(self, expected, actual, msg=None): self.assertEqual(sanitize_cmd(expected), sanitize_cmd(actual), msg) @@ -1144,6 +1154,8 @@ class AsyncIntegrationTest(AsyncPyMongoTestCase): @async_client_context.require_connection async def asyncSetUp(self) -> None: + if not _IS_SYNC: + await reset_client_context() if async_client_context.load_balancer and not getattr(self, "RUN_ON_LOAD_BALANCER", False): raise SkipTest("this test does not support load balancers") if async_client_context.serverless and not getattr(self, "RUN_ON_SERVERLESS", False): diff --git a/test/asynchronous/utils_spec_runner.py b/test/asynchronous/utils_spec_runner.py index ee135f3dbc..b79e5258b5 100644 --- a/test/asynchronous/utils_spec_runner.py +++ b/test/asynchronous/utils_spec_runner.py @@ -647,11 +647,10 @@ async def setup_scenario(self, scenario_def): async def run_scenario(self, scenario_def, test): self.maybe_skip_scenario(test) - # Kill all sessions before (sync only) and after each test to prevent an open + # Kill all sessions before and after each test to prevent an open # transaction (from a test failure) from blocking collection/database # operations during test set up and tear down. - if _IS_SYNC: - await self.kill_all_sessions() + await self.kill_all_sessions() self.addAsyncCleanup(self.kill_all_sessions) await self.setup_scenario(scenario_def) database_name = self.get_scenario_db_name(scenario_def) diff --git a/test/utils_spec_runner.py b/test/utils_spec_runner.py index 6695714d71..4508502cd0 100644 --- a/test/utils_spec_runner.py +++ b/test/utils_spec_runner.py @@ -647,11 +647,10 @@ def setup_scenario(self, scenario_def): def run_scenario(self, scenario_def, test): self.maybe_skip_scenario(test) - # Kill all sessions before (sync only) and after each test to prevent an open + # Kill all sessions before and after each test to prevent an open # transaction (from a test failure) from blocking collection/database # operations during test set up and tear down. - if _IS_SYNC: - self.kill_all_sessions() + self.kill_all_sessions() self.addCleanup(self.kill_all_sessions) self.setup_scenario(scenario_def) database_name = self.get_scenario_db_name(scenario_def) From a4eedb3210b392cc7efb7429dfa27b78b71b4021 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 21 Jan 2025 16:54:34 -0500 Subject: [PATCH 6/6] Remove task cancelling from AsyncPeriodicExecutor.close --- pymongo/periodic_executor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pymongo/periodic_executor.py b/pymongo/periodic_executor.py index f51a988728..9b10f6e7e3 100644 --- a/pymongo/periodic_executor.py +++ b/pymongo/periodic_executor.py @@ -75,8 +75,6 @@ def close(self, dummy: Any = None) -> None: callback; see monitor.py. """ self._stopped = True - if self._task is not None: - self._task.cancel() async def join(self, timeout: Optional[int] = None) -> None: if self._task is not None: