From 3fa15525bc048f6e5d358060e21c934f705cd95f Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Thu, 30 Jan 2025 15:57:41 -0800 Subject: [PATCH 01/11] Convert test.test_monitor to async --- test/asynchronous/test_monitor.py | 97 +++++++++++++++++++++++++++++++ test/test_monitor.py | 1 + tools/synchro.py | 1 + 3 files changed, 99 insertions(+) create mode 100644 test/asynchronous/test_monitor.py diff --git a/test/asynchronous/test_monitor.py b/test/asynchronous/test_monitor.py new file mode 100644 index 0000000000..8846c32d30 --- /dev/null +++ b/test/asynchronous/test_monitor.py @@ -0,0 +1,97 @@ +# Copyright 2014-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test the monitor module.""" +from __future__ import annotations + +import gc +import subprocess +import sys +import warnings +from functools import partial + +sys.path[0:0] = [""] + +from test.asynchronous import AsyncIntegrationTest, connected, unittest +from test.utils import ( + ServerAndTopologyEventListener, + async_wait_until, +) + +from pymongo.periodic_executor import _EXECUTORS + +_IS_SYNC = False + +def unregistered(ref): + gc.collect() + return ref not in _EXECUTORS + + +def get_executors(client): + executors = [] + for server in client._topology._servers.values(): + executors.append(server._monitor._executor) + executors.append(server._monitor._rtt_monitor._executor) + executors.append(client._kill_cursors_executor) + executors.append(client._topology._Topology__events_executor) + return [e for e in executors if e is not None] + + +class TestMonitor(AsyncIntegrationTest): + async def create_client(self): + listener = ServerAndTopologyEventListener() + client = await self.unmanaged_async_single_client(event_listeners=[listener]) + await connected(client) + return client + + async def test_cleanup_executors_on_client_del(self): + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + client = await self.create_client() + executors = get_executors(client) + self.assertEqual(len(executors), 4) + + # Each executor stores a weakref to itself in _EXECUTORS. + executor_refs = [(r, r()._name) for r in _EXECUTORS.copy() if r() in executors] + + del executors + del client + + for ref, name in executor_refs: + await async_wait_until(partial(unregistered, ref), f"unregister executor: {name}", timeout=5) + + async def test_cleanup_executors_on_client_close(self): + client = await self.create_client() + executors = get_executors(client) + self.assertEqual(len(executors), 4) + + await client.close() + + for executor in executors: + await async_wait_until(lambda: executor._stopped, f"closed executor: {executor._name}", timeout=5) + + async def test_no_thread_start_runtime_err_on_shutdown(self): + """Test we silence noisy runtime errors fired when the AsyncMongoClient spawns a new thread + on process shutdown.""" + command = [sys.executable, "-c", "from pymongo import AsyncMongoClient; c = AsyncMongoClient()"] + completed_process: subprocess.CompletedProcess = subprocess.run( + command, capture_output=True + ) + + self.assertFalse(completed_process.stderr) + self.assertFalse(completed_process.stdout) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_monitor.py b/test/test_monitor.py index a704f3d8cb..7081307ada 100644 --- a/test/test_monitor.py +++ b/test/test_monitor.py @@ -31,6 +31,7 @@ from pymongo.periodic_executor import _EXECUTORS +_IS_SYNC = True def unregistered(ref): gc.collect() diff --git a/tools/synchro.py b/tools/synchro.py index dc272929ad..f771107494 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -214,6 +214,7 @@ def async_only_test(f: str) -> bool: "test_json_util_integration.py", "test_gridfs_spec.py", "test_logger.py", + "test_monitor.py", "test_monitoring.py", "test_raw_bson.py", "test_retryable_reads.py", From 16943a441e81031551700f19921ad53ec78d71cf Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Fri, 31 Jan 2025 09:42:15 -0800 Subject: [PATCH 02/11] edit test to require sync --- test/asynchronous/test_monitor.py | 21 ++++++++++++++++----- test/test_monitor.py | 5 ++++- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/test/asynchronous/test_monitor.py b/test/asynchronous/test_monitor.py index 8846c32d30..bc6ad57862 100644 --- a/test/asynchronous/test_monitor.py +++ b/test/asynchronous/test_monitor.py @@ -15,6 +15,7 @@ """Test the monitor module.""" from __future__ import annotations +import asyncio import gc import subprocess import sys @@ -23,7 +24,7 @@ sys.path[0:0] = [""] -from test.asynchronous import AsyncIntegrationTest, connected, unittest +from test.asynchronous import AsyncIntegrationTest, async_client_context, connected, unittest from test.utils import ( ServerAndTopologyEventListener, async_wait_until, @@ -33,6 +34,7 @@ _IS_SYNC = False + def unregistered(ref): gc.collect() return ref not in _EXECUTORS @@ -69,7 +71,9 @@ async def test_cleanup_executors_on_client_del(self): del client for ref, name in executor_refs: - await async_wait_until(partial(unregistered, ref), f"unregister executor: {name}", timeout=5) + await async_wait_until( + partial(unregistered, ref), f"unregister executor: {name}", timeout=5 + ) async def test_cleanup_executors_on_client_close(self): client = await self.create_client() @@ -79,12 +83,19 @@ async def test_cleanup_executors_on_client_close(self): await client.close() for executor in executors: - await async_wait_until(lambda: executor._stopped, f"closed executor: {executor._name}", timeout=5) + await async_wait_until( + lambda: executor._stopped, f"closed executor: {executor._name}", timeout=5 + ) - async def test_no_thread_start_runtime_err_on_shutdown(self): + @async_client_context.require_sync + def test_no_thread_start_runtime_err_on_shutdown(self): """Test we silence noisy runtime errors fired when the AsyncMongoClient spawns a new thread on process shutdown.""" - command = [sys.executable, "-c", "from pymongo import AsyncMongoClient; c = AsyncMongoClient()"] + command = [ + sys.executable, + "-c", + "from pymongo import AsyncMongoClient; c = AsyncMongoClient()", + ] completed_process: subprocess.CompletedProcess = subprocess.run( command, capture_output=True ) diff --git a/test/test_monitor.py b/test/test_monitor.py index 7081307ada..bc554e5013 100644 --- a/test/test_monitor.py +++ b/test/test_monitor.py @@ -15,6 +15,7 @@ """Test the monitor module.""" from __future__ import annotations +import asyncio import gc import subprocess import sys @@ -23,7 +24,7 @@ sys.path[0:0] = [""] -from test import IntegrationTest, connected, unittest +from test import IntegrationTest, client_context, connected, unittest from test.utils import ( ServerAndTopologyEventListener, wait_until, @@ -33,6 +34,7 @@ _IS_SYNC = True + def unregistered(ref): gc.collect() return ref not in _EXECUTORS @@ -81,6 +83,7 @@ def test_cleanup_executors_on_client_close(self): for executor in executors: wait_until(lambda: executor._stopped, f"closed executor: {executor._name}", timeout=5) + @client_context.require_sync def test_no_thread_start_runtime_err_on_shutdown(self): """Test we silence noisy runtime errors fired when the MongoClient spawns a new thread on process shutdown.""" From 69a5aa773fefeaa7056da9ecf96b3db1d5bf3739 Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Fri, 31 Jan 2025 09:45:42 -0800 Subject: [PATCH 03/11] run pre-commit --- test/test_monitor.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/test/test_monitor.py b/test/test_monitor.py index bc554e5013..22b483455b 100644 --- a/test/test_monitor.py +++ b/test/test_monitor.py @@ -87,7 +87,11 @@ def test_cleanup_executors_on_client_close(self): def test_no_thread_start_runtime_err_on_shutdown(self): """Test we silence noisy runtime errors fired when the MongoClient spawns a new thread on process shutdown.""" - command = [sys.executable, "-c", "from pymongo import MongoClient; c = MongoClient()"] + command = [ + sys.executable, + "-c", + "from pymongo import MongoClient; c = MongoClient()", + ] completed_process: subprocess.CompletedProcess = subprocess.run( command, capture_output=True ) From 646a73a5fb680d0b3919ec085b83020ebdeeb7bf Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Fri, 21 Feb 2025 13:47:28 -0800 Subject: [PATCH 04/11] use managed client --- test/asynchronous/test_monitor.py | 2 +- test/test_monitor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/test_monitor.py b/test/asynchronous/test_monitor.py index bc6ad57862..18a182de09 100644 --- a/test/asynchronous/test_monitor.py +++ b/test/asynchronous/test_monitor.py @@ -53,7 +53,7 @@ def get_executors(client): class TestMonitor(AsyncIntegrationTest): async def create_client(self): listener = ServerAndTopologyEventListener() - client = await self.unmanaged_async_single_client(event_listeners=[listener]) + client = await self.async_single_client(event_listeners=[listener]) await connected(client) return client diff --git a/test/test_monitor.py b/test/test_monitor.py index 22b483455b..1440eed240 100644 --- a/test/test_monitor.py +++ b/test/test_monitor.py @@ -53,7 +53,7 @@ def get_executors(client): class TestMonitor(IntegrationTest): def create_client(self): listener = ServerAndTopologyEventListener() - client = self.unmanaged_single_client(event_listeners=[listener]) + client = self.single_client(event_listeners=[listener]) connected(client) return client From a2248e720068a9d6df953c00f51cc2c447d85d37 Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Fri, 21 Feb 2025 14:18:58 -0800 Subject: [PATCH 05/11] no we need unmanaged client --- test/asynchronous/test_monitor.py | 2 +- test/test_monitor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/test_monitor.py b/test/asynchronous/test_monitor.py index 18a182de09..bc6ad57862 100644 --- a/test/asynchronous/test_monitor.py +++ b/test/asynchronous/test_monitor.py @@ -53,7 +53,7 @@ def get_executors(client): class TestMonitor(AsyncIntegrationTest): async def create_client(self): listener = ServerAndTopologyEventListener() - client = await self.async_single_client(event_listeners=[listener]) + client = await self.unmanaged_async_single_client(event_listeners=[listener]) await connected(client) return client diff --git a/test/test_monitor.py b/test/test_monitor.py index 1440eed240..22b483455b 100644 --- a/test/test_monitor.py +++ b/test/test_monitor.py @@ -53,7 +53,7 @@ def get_executors(client): class TestMonitor(IntegrationTest): def create_client(self): listener = ServerAndTopologyEventListener() - client = self.single_client(event_listeners=[listener]) + client = self.unmanaged_single_client(event_listeners=[listener]) connected(client) return client From 07d4852bc33c0ba11cc94ec04aa0b502a151cafc Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Fri, 21 Feb 2025 15:44:42 -0800 Subject: [PATCH 06/11] wait for resource warning --- test/asynchronous/test_monitor.py | 12 ++++++++++-- test/test_monitor.py | 12 ++++++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/test/asynchronous/test_monitor.py b/test/asynchronous/test_monitor.py index bc6ad57862..1c23f92d3d 100644 --- a/test/asynchronous/test_monitor.py +++ b/test/asynchronous/test_monitor.py @@ -58,8 +58,8 @@ async def create_client(self): return client async def test_cleanup_executors_on_client_del(self): - with warnings.catch_warnings(): - warnings.simplefilter("ignore") + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") client = await self.create_client() executors = get_executors(client) self.assertEqual(len(executors), 4) @@ -75,6 +75,14 @@ async def test_cleanup_executors_on_client_del(self): partial(unregistered, ref), f"unregister executor: {name}", timeout=5 ) + def resource_warning_caught(): + for warning in w: + if isinstance(warning.message, ResourceWarning): + return True + return False + + await async_wait_until(resource_warning_caught, "catch resource warning") + async def test_cleanup_executors_on_client_close(self): client = await self.create_client() executors = get_executors(client) diff --git a/test/test_monitor.py b/test/test_monitor.py index 22b483455b..bbf28bca3b 100644 --- a/test/test_monitor.py +++ b/test/test_monitor.py @@ -58,8 +58,8 @@ def create_client(self): return client def test_cleanup_executors_on_client_del(self): - with warnings.catch_warnings(): - warnings.simplefilter("ignore") + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") client = self.create_client() executors = get_executors(client) self.assertEqual(len(executors), 4) @@ -73,6 +73,14 @@ def test_cleanup_executors_on_client_del(self): for ref, name in executor_refs: wait_until(partial(unregistered, ref), f"unregister executor: {name}", timeout=5) + def resource_warning_caught(): + for warning in w: + if isinstance(warning.message, ResourceWarning): + return True + return False + + wait_until(resource_warning_caught, "catch resource warning") + def test_cleanup_executors_on_client_close(self): client = self.create_client() executors = get_executors(client) From fc7aa14c838728e40e6d718d174f8fede0813bda Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Fri, 21 Feb 2025 16:04:48 -0800 Subject: [PATCH 07/11] attempt 2 --- test/asynchronous/test_monitor.py | 6 +++++- test/test_monitor.py | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/test_monitor.py b/test/asynchronous/test_monitor.py index 1c23f92d3d..45580feb1b 100644 --- a/test/asynchronous/test_monitor.py +++ b/test/asynchronous/test_monitor.py @@ -77,7 +77,11 @@ async def test_cleanup_executors_on_client_del(self): def resource_warning_caught(): for warning in w: - if isinstance(warning.message, ResourceWarning): + if ( + issubclass(warning.category, ResourceWarning) + and "Call AsyncMongoClient.close() to safely shut down your client and free up resources." + in str(warning.message) + ): return True return False diff --git a/test/test_monitor.py b/test/test_monitor.py index bbf28bca3b..10493258fa 100644 --- a/test/test_monitor.py +++ b/test/test_monitor.py @@ -75,7 +75,11 @@ def test_cleanup_executors_on_client_del(self): def resource_warning_caught(): for warning in w: - if isinstance(warning.message, ResourceWarning): + if ( + issubclass(warning.category, ResourceWarning) + and "Call MongoClient.close() to safely shut down your client and free up resources." + in str(warning.message) + ): return True return False From d76acfbc435f67ba6aafa9e064eccc0c5ef98df0 Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Fri, 21 Feb 2025 17:26:57 -0800 Subject: [PATCH 08/11] test? --- test/asynchronous/test_monitor.py | 13 +++++++++---- test/test_monitor.py | 13 +++++++++---- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/test/asynchronous/test_monitor.py b/test/asynchronous/test_monitor.py index 45580feb1b..d90225ae4c 100644 --- a/test/asynchronous/test_monitor.py +++ b/test/asynchronous/test_monitor.py @@ -76,16 +76,21 @@ async def test_cleanup_executors_on_client_del(self): ) def resource_warning_caught(): + count = 0 for warning in w: if ( issubclass(warning.category, ResourceWarning) and "Call AsyncMongoClient.close() to safely shut down your client and free up resources." in str(warning.message) ): - return True - return False - - await async_wait_until(resource_warning_caught, "catch resource warning") + count += 1 + return count >= 2 + + try: + await async_wait_until(resource_warning_caught, "catch resource warning") + except AssertionError as exc: + if "catch resource warning" not in str(exc): + raise async def test_cleanup_executors_on_client_close(self): client = await self.create_client() diff --git a/test/test_monitor.py b/test/test_monitor.py index 10493258fa..90544106f0 100644 --- a/test/test_monitor.py +++ b/test/test_monitor.py @@ -74,16 +74,21 @@ def test_cleanup_executors_on_client_del(self): wait_until(partial(unregistered, ref), f"unregister executor: {name}", timeout=5) def resource_warning_caught(): + count = 0 for warning in w: if ( issubclass(warning.category, ResourceWarning) and "Call MongoClient.close() to safely shut down your client and free up resources." in str(warning.message) ): - return True - return False - - wait_until(resource_warning_caught, "catch resource warning") + count += 1 + return count >= 2 + + try: + wait_until(resource_warning_caught, "catch resource warning") + except AssertionError as exc: + if "catch resource warning" not in str(exc): + raise def test_cleanup_executors_on_client_close(self): client = self.create_client() From 9d430b6d5e1b22a646997c08d85a1dc1b1ffa4a8 Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Fri, 21 Feb 2025 19:07:37 -0800 Subject: [PATCH 09/11] test? pt 2 --- test/asynchronous/test_monitor.py | 4 +++- test/test_monitor.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/test_monitor.py b/test/asynchronous/test_monitor.py index d90225ae4c..de9996bd47 100644 --- a/test/asynchronous/test_monitor.py +++ b/test/asynchronous/test_monitor.py @@ -87,7 +87,9 @@ def resource_warning_caught(): return count >= 2 try: - await async_wait_until(resource_warning_caught, "catch resource warning") + await async_wait_until( + resource_warning_caught, "catch resource warning", timeout=30 + ) except AssertionError as exc: if "catch resource warning" not in str(exc): raise diff --git a/test/test_monitor.py b/test/test_monitor.py index 90544106f0..9d16f83e12 100644 --- a/test/test_monitor.py +++ b/test/test_monitor.py @@ -85,7 +85,7 @@ def resource_warning_caught(): return count >= 2 try: - wait_until(resource_warning_caught, "catch resource warning") + wait_until(resource_warning_caught, "catch resource warning", timeout=30) except AssertionError as exc: if "catch resource warning" not in str(exc): raise From 1c676519966e932b40e71d5e9d294d95a2761e9a Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Mon, 24 Feb 2025 09:25:04 -0800 Subject: [PATCH 10/11] ahh i need to trigger garbage collect --- test/asynchronous/test_monitor.py | 16 ++++++---------- test/test_monitor.py | 16 +++++++--------- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/test/asynchronous/test_monitor.py b/test/asynchronous/test_monitor.py index de9996bd47..58653bc96b 100644 --- a/test/asynchronous/test_monitor.py +++ b/test/asynchronous/test_monitor.py @@ -76,23 +76,19 @@ async def test_cleanup_executors_on_client_del(self): ) def resource_warning_caught(): - count = 0 + gc.collect() + print(w) for warning in w: + print(warning) if ( issubclass(warning.category, ResourceWarning) and "Call AsyncMongoClient.close() to safely shut down your client and free up resources." in str(warning.message) ): - count += 1 - return count >= 2 + return True + return False - try: - await async_wait_until( - resource_warning_caught, "catch resource warning", timeout=30 - ) - except AssertionError as exc: - if "catch resource warning" not in str(exc): - raise + await async_wait_until(resource_warning_caught, "catch resource warning") async def test_cleanup_executors_on_client_close(self): client = await self.create_client() diff --git a/test/test_monitor.py b/test/test_monitor.py index 9d16f83e12..6bfe293dc9 100644 --- a/test/test_monitor.py +++ b/test/test_monitor.py @@ -74,21 +74,19 @@ def test_cleanup_executors_on_client_del(self): wait_until(partial(unregistered, ref), f"unregister executor: {name}", timeout=5) def resource_warning_caught(): - count = 0 + gc.collect() + print(w) for warning in w: + print(warning) if ( issubclass(warning.category, ResourceWarning) and "Call MongoClient.close() to safely shut down your client and free up resources." in str(warning.message) ): - count += 1 - return count >= 2 - - try: - wait_until(resource_warning_caught, "catch resource warning", timeout=30) - except AssertionError as exc: - if "catch resource warning" not in str(exc): - raise + return True + return False + + wait_until(resource_warning_caught, "catch resource warning") def test_cleanup_executors_on_client_close(self): client = self.create_client() From 65486cc2d79fae841415d06ed1e9494bd15fdd5a Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Mon, 24 Feb 2025 09:28:08 -0800 Subject: [PATCH 11/11] ah forgot to delete print statements --- test/asynchronous/test_monitor.py | 2 -- test/test_monitor.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/test/asynchronous/test_monitor.py b/test/asynchronous/test_monitor.py index 58653bc96b..2705fbda3b 100644 --- a/test/asynchronous/test_monitor.py +++ b/test/asynchronous/test_monitor.py @@ -77,9 +77,7 @@ async def test_cleanup_executors_on_client_del(self): def resource_warning_caught(): gc.collect() - print(w) for warning in w: - print(warning) if ( issubclass(warning.category, ResourceWarning) and "Call AsyncMongoClient.close() to safely shut down your client and free up resources." diff --git a/test/test_monitor.py b/test/test_monitor.py index 6bfe293dc9..0fb7eb9cae 100644 --- a/test/test_monitor.py +++ b/test/test_monitor.py @@ -75,9 +75,7 @@ def test_cleanup_executors_on_client_del(self): def resource_warning_caught(): gc.collect() - print(w) for warning in w: - print(warning) if ( issubclass(warning.category, ResourceWarning) and "Call MongoClient.close() to safely shut down your client and free up resources."