Skip to content

PYTHON-5090 Convert test.test_monitor to async #2106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions test/asynchronous/test_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Copyright 2014-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Test the monitor module."""
from __future__ import annotations

import asyncio
import gc
import subprocess
import sys
import warnings
from functools import partial

sys.path[0:0] = [""]

from test.asynchronous import AsyncIntegrationTest, async_client_context, connected, unittest
from test.utils import (
ServerAndTopologyEventListener,
async_wait_until,
)

from pymongo.periodic_executor import _EXECUTORS

_IS_SYNC = False


def unregistered(ref):
gc.collect()
return ref not in _EXECUTORS


def get_executors(client):
executors = []
for server in client._topology._servers.values():
executors.append(server._monitor._executor)
executors.append(server._monitor._rtt_monitor._executor)
executors.append(client._kill_cursors_executor)
executors.append(client._topology._Topology__events_executor)
return [e for e in executors if e is not None]


class TestMonitor(AsyncIntegrationTest):
async def create_client(self):
listener = ServerAndTopologyEventListener()
client = await self.unmanaged_async_single_client(event_listeners=[listener])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
client = await self.unmanaged_async_single_client(event_listeners=[listener])
client = await self.async_single_client(event_listeners=[listener])

Avoid using the unmanaged helper methods wherever possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking at the tests, i thought it was intentional to use unmanaged because the tests want to explicitly close the client and test behavior after client close (or client del)?

Copy link
Contributor

@NoahStapp NoahStapp Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it cause failures to switch the call from unmanaged? Calling close multiple times shouldn't affect the client here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we have to used unmanaged here, otherwise the client will never get garbage collected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding client.close to the test cleanup will keep a reference to the client and prevent it from being GCd.

Copy link
Contributor

@NoahStapp NoahStapp Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the reference inside the cleanups list created by the managed helper will cause the del client call to not GC the client?

Copy link
Contributor

@NoahStapp NoahStapp Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait: a GC'd AsyncMongoClient will trigger the unclosed client warning, as seen in the failing test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still want to make sure that a GC'd async client's tasks eventually get cleaned up.

await connected(client)
return client

async def test_cleanup_executors_on_client_del(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
client = await self.create_client()
executors = get_executors(client)
self.assertEqual(len(executors), 4)

# Each executor stores a weakref to itself in _EXECUTORS.
executor_refs = [(r, r()._name) for r in _EXECUTORS.copy() if r() in executors]

del executors
del client

for ref, name in executor_refs:
await async_wait_until(
partial(unregistered, ref), f"unregister executor: {name}", timeout=5
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an extra async_wait_until here that waits for the __del__ warning to be raised? That would avoid the warning being accidentally raised later on in a subsequent test like we see in the most recent run: https://github.com/mongodb/mongo-python-driver/actions/runs/13466188001/job/37632412774?pr=2106

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like:

with warnings.catch_warnings(record=True) as w:
   ...
   wait until the "Call AsyncMongoClient.close() to safely shut down your client and free up resources" warning appears in w.


def resource_warning_caught():
gc.collect()
for warning in w:
if (
issubclass(warning.category, ResourceWarning)
and "Call AsyncMongoClient.close() to safely shut down your client and free up resources."
in str(warning.message)
):
return True
return False

await async_wait_until(resource_warning_caught, "catch resource warning")

async def test_cleanup_executors_on_client_close(self):
client = await self.create_client()
executors = get_executors(client)
self.assertEqual(len(executors), 4)

await client.close()

for executor in executors:
await async_wait_until(
lambda: executor._stopped, f"closed executor: {executor._name}", timeout=5
)

@async_client_context.require_sync
def test_no_thread_start_runtime_err_on_shutdown(self):
"""Test we silence noisy runtime errors fired when the AsyncMongoClient spawns a new thread
on process shutdown."""
command = [
sys.executable,
"-c",
"from pymongo import AsyncMongoClient; c = AsyncMongoClient()",
]
completed_process: subprocess.CompletedProcess = subprocess.run(
command, capture_output=True
)

self.assertFalse(completed_process.stderr)
self.assertFalse(completed_process.stdout)


if __name__ == "__main__":
unittest.main()
29 changes: 25 additions & 4 deletions test/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Test the monitor module."""
from __future__ import annotations

import asyncio
import gc
import subprocess
import sys
Expand All @@ -23,14 +24,16 @@

sys.path[0:0] = [""]

from test import IntegrationTest, connected, unittest
from test import IntegrationTest, client_context, connected, unittest
from test.utils import (
ServerAndTopologyEventListener,
wait_until,
)

from pymongo.periodic_executor import _EXECUTORS

_IS_SYNC = True


def unregistered(ref):
gc.collect()
Expand All @@ -55,8 +58,8 @@ def create_client(self):
return client

def test_cleanup_executors_on_client_del(self):
with warnings.catch_warnings():
warnings.simplefilter("ignore")
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
client = self.create_client()
executors = get_executors(client)
self.assertEqual(len(executors), 4)
Expand All @@ -70,6 +73,19 @@ def test_cleanup_executors_on_client_del(self):
for ref, name in executor_refs:
wait_until(partial(unregistered, ref), f"unregister executor: {name}", timeout=5)

def resource_warning_caught():
gc.collect()
for warning in w:
if (
issubclass(warning.category, ResourceWarning)
and "Call MongoClient.close() to safely shut down your client and free up resources."
in str(warning.message)
):
return True
return False

wait_until(resource_warning_caught, "catch resource warning")

def test_cleanup_executors_on_client_close(self):
client = self.create_client()
executors = get_executors(client)
Expand All @@ -80,10 +96,15 @@ def test_cleanup_executors_on_client_close(self):
for executor in executors:
wait_until(lambda: executor._stopped, f"closed executor: {executor._name}", timeout=5)

@client_context.require_sync
def test_no_thread_start_runtime_err_on_shutdown(self):
"""Test we silence noisy runtime errors fired when the MongoClient spawns a new thread
on process shutdown."""
command = [sys.executable, "-c", "from pymongo import MongoClient; c = MongoClient()"]
command = [
sys.executable,
"-c",
"from pymongo import MongoClient; c = MongoClient()",
]
completed_process: subprocess.CompletedProcess = subprocess.run(
command, capture_output=True
)
Expand Down
1 change: 1 addition & 0 deletions tools/synchro.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def async_only_test(f: str) -> bool:
"test_load_balancer.py",
"test_logger.py",
"test_max_staleness.py",
"test_monitor.py",
"test_monitoring.py",
"test_mongos_load_balancing.py",
"test_on_demand_csfle.py",
Expand Down
Loading