diff --git a/test/asynchronous/test_monitor.py b/test/asynchronous/test_monitor.py new file mode 100644 index 0000000000..2705fbda3b --- /dev/null +++ b/test/asynchronous/test_monitor.py @@ -0,0 +1,121 @@ +# Copyright 2014-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test the monitor module.""" +from __future__ import annotations + +import asyncio +import gc +import subprocess +import sys +import warnings +from functools import partial + +sys.path[0:0] = [""] + +from test.asynchronous import AsyncIntegrationTest, async_client_context, connected, unittest +from test.utils import ( + ServerAndTopologyEventListener, + async_wait_until, +) + +from pymongo.periodic_executor import _EXECUTORS + +_IS_SYNC = False + + +def unregistered(ref): + gc.collect() + return ref not in _EXECUTORS + + +def get_executors(client): + executors = [] + for server in client._topology._servers.values(): + executors.append(server._monitor._executor) + executors.append(server._monitor._rtt_monitor._executor) + executors.append(client._kill_cursors_executor) + executors.append(client._topology._Topology__events_executor) + return [e for e in executors if e is not None] + + +class TestMonitor(AsyncIntegrationTest): + async def create_client(self): + listener = ServerAndTopologyEventListener() + client = await self.unmanaged_async_single_client(event_listeners=[listener]) + await connected(client) + return client + + async def test_cleanup_executors_on_client_del(self): + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + client = await self.create_client() + executors = get_executors(client) + self.assertEqual(len(executors), 4) + + # Each executor stores a weakref to itself in _EXECUTORS. + executor_refs = [(r, r()._name) for r in _EXECUTORS.copy() if r() in executors] + + del executors + del client + + for ref, name in executor_refs: + await async_wait_until( + partial(unregistered, ref), f"unregister executor: {name}", timeout=5 + ) + + def resource_warning_caught(): + gc.collect() + for warning in w: + if ( + issubclass(warning.category, ResourceWarning) + and "Call AsyncMongoClient.close() to safely shut down your client and free up resources." + in str(warning.message) + ): + return True + return False + + await async_wait_until(resource_warning_caught, "catch resource warning") + + async def test_cleanup_executors_on_client_close(self): + client = await self.create_client() + executors = get_executors(client) + self.assertEqual(len(executors), 4) + + await client.close() + + for executor in executors: + await async_wait_until( + lambda: executor._stopped, f"closed executor: {executor._name}", timeout=5 + ) + + @async_client_context.require_sync + def test_no_thread_start_runtime_err_on_shutdown(self): + """Test we silence noisy runtime errors fired when the AsyncMongoClient spawns a new thread + on process shutdown.""" + command = [ + sys.executable, + "-c", + "from pymongo import AsyncMongoClient; c = AsyncMongoClient()", + ] + completed_process: subprocess.CompletedProcess = subprocess.run( + command, capture_output=True + ) + + self.assertFalse(completed_process.stderr) + self.assertFalse(completed_process.stdout) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_monitor.py b/test/test_monitor.py index a704f3d8cb..0fb7eb9cae 100644 --- a/test/test_monitor.py +++ b/test/test_monitor.py @@ -15,6 +15,7 @@ """Test the monitor module.""" from __future__ import annotations +import asyncio import gc import subprocess import sys @@ -23,7 +24,7 @@ sys.path[0:0] = [""] -from test import IntegrationTest, connected, unittest +from test import IntegrationTest, client_context, connected, unittest from test.utils import ( ServerAndTopologyEventListener, wait_until, @@ -31,6 +32,8 @@ from pymongo.periodic_executor import _EXECUTORS +_IS_SYNC = True + def unregistered(ref): gc.collect() @@ -55,8 +58,8 @@ def create_client(self): return client def test_cleanup_executors_on_client_del(self): - with warnings.catch_warnings(): - warnings.simplefilter("ignore") + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") client = self.create_client() executors = get_executors(client) self.assertEqual(len(executors), 4) @@ -70,6 +73,19 @@ def test_cleanup_executors_on_client_del(self): for ref, name in executor_refs: wait_until(partial(unregistered, ref), f"unregister executor: {name}", timeout=5) + def resource_warning_caught(): + gc.collect() + for warning in w: + if ( + issubclass(warning.category, ResourceWarning) + and "Call MongoClient.close() to safely shut down your client and free up resources." + in str(warning.message) + ): + return True + return False + + wait_until(resource_warning_caught, "catch resource warning") + def test_cleanup_executors_on_client_close(self): client = self.create_client() executors = get_executors(client) @@ -80,10 +96,15 @@ def test_cleanup_executors_on_client_close(self): for executor in executors: wait_until(lambda: executor._stopped, f"closed executor: {executor._name}", timeout=5) + @client_context.require_sync def test_no_thread_start_runtime_err_on_shutdown(self): """Test we silence noisy runtime errors fired when the MongoClient spawns a new thread on process shutdown.""" - command = [sys.executable, "-c", "from pymongo import MongoClient; c = MongoClient()"] + command = [ + sys.executable, + "-c", + "from pymongo import MongoClient; c = MongoClient()", + ] completed_process: subprocess.CompletedProcess = subprocess.run( command, capture_output=True ) diff --git a/tools/synchro.py b/tools/synchro.py index 39c53b435f..877a683531 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -226,6 +226,7 @@ def async_only_test(f: str) -> bool: "test_load_balancer.py", "test_logger.py", "test_max_staleness.py", + "test_monitor.py", "test_monitoring.py", "test_mongos_load_balancing.py", "test_on_demand_csfle.py",