From d1cc4f956d98a1c1f58b0a372275f8a94d71bb7a Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 12 Jun 2025 15:56:44 -0400 Subject: [PATCH 1/9] PYTHON 5212 - Use asyncio.loop.sock_connect in _async_create_connection --- pymongo/pool_shared.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index 308ecef349..385b09fd74 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -244,7 +244,8 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s sock.settimeout(timeout) sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) _set_keepalive_times(sock) - sock.connect(sa) + sock.setblocking(False) + await asyncio.get_running_loop().sock_connect(sock, sa) return sock except OSError as e: err = e From 5a87b9134f0757b977802e0f0043eaa685f585be Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 12 Jun 2025 16:10:11 -0400 Subject: [PATCH 2/9] Update unix sockets --- pymongo/pool_shared.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index 385b09fd74..216403a1f0 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -206,7 +206,8 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s # SOCK_CLOEXEC not supported for Unix sockets. _set_non_inheritable_non_atomic(sock.fileno()) try: - sock.connect(host) + sock.setblocking(False) + await asyncio.get_running_loop().sock_connect(sock, host) return sock except OSError: sock.close() From 3521829136bf79fc4be9da7bd4679ae9fcecc735 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 12 Jun 2025 16:22:38 -0400 Subject: [PATCH 3/9] Asynchronous timeout --- pymongo/pool_shared.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index 216403a1f0..dbf9a4b221 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -242,13 +242,14 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s timeout = options.connect_timeout elif timeout <= 0: raise socket.timeout("timed out") - sock.settimeout(timeout) sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) _set_keepalive_times(sock) sock.setblocking(False) - await asyncio.get_running_loop().sock_connect(sock, sa) + await asyncio.wait_for( + asyncio.get_running_loop().sock_connect(sock, sa), timeout=timeout + ) return sock - except OSError as e: + except (OSError, asyncio.TimeoutError) as e: err = e sock.close() From 9ce66b91720903cf39e1154cd81022b4063e96dd Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 12 Jun 2025 16:44:25 -0400 Subject: [PATCH 4/9] Fix asyncio.TimeoutError handling --- pymongo/pool_shared.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index dbf9a4b221..0f51d4f6a7 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -249,9 +249,12 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s asyncio.get_running_loop().sock_connect(sock, sa), timeout=timeout ) return sock - except (OSError, asyncio.TimeoutError) as e: - err = e + except asyncio.TimeoutError as e: + sock.close() + raise socket.timeout("timed out") from e + except OSError as e: sock.close() + err = e if err is not None: raise err From 4e21b6322ced51473bf60f24d8a6c15420cecd6f Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Fri, 13 Jun 2025 09:54:42 -0400 Subject: [PATCH 5/9] Restore socket timeout after connection + add changelog --- doc/changelog.rst | 17 +++++++++++++++++ pymongo/pool_shared.py | 2 ++ 2 files changed, 19 insertions(+) diff --git a/doc/changelog.rst b/doc/changelog.rst index 3f5df8df6c..ca4784f919 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -8,6 +8,23 @@ PyMongo 4.14 brings a number of changes including: - Added :attr:`bson.codec_options.TypeRegistry.codecs` and :attr:`bson.codec_options.TypeRegistry.fallback_encoder` properties to allow users to directly access the type codecs and fallback encoder for a given :class:`bson.codec_options.TypeRegistry`. +Changes in Version 4.13.2 (2025/06/17) +-------------------------------------- + +Version 4.13.2 is a bug fix release. + +- Fixed a bug where ``AsyncMongoClient`` would block the event loop while creating new connections, + potentially significantly increasing latency for ongoing operations. + +Issues Resolved +............... + +See the `PyMongo 4.13.2 release notes in JIRA`_ for the list of resolved issues +in this release. + +.. _PyMongo 4.13.2 release notes in JIRA: https://jira.mongodb.org/secure/ReleaseNote.jspa?projectId=10004&version=43937 + + Changes in Version 4.13.1 (2025/06/10) -------------------------------------- diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index 0f51d4f6a7..d543075225 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -244,10 +244,12 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s raise socket.timeout("timed out") sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) _set_keepalive_times(sock) + # Socket needs to be non-blocking during connection to not block the event loop sock.setblocking(False) await asyncio.wait_for( asyncio.get_running_loop().sock_connect(sock, sa), timeout=timeout ) + sock.settimeout(timeout) return sock except asyncio.TimeoutError as e: sock.close() From af944e7d5870a46d8185ff2e00b014003a8dda5d Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Fri, 13 Jun 2025 10:57:45 -0400 Subject: [PATCH 6/9] Add regression test --- .../asynchronous/test_async_loop_unblocked.py | 57 +++++++++++++++++++ tools/synchro.py | 1 + 2 files changed, 58 insertions(+) create mode 100644 test/asynchronous/test_async_loop_unblocked.py diff --git a/test/asynchronous/test_async_loop_unblocked.py b/test/asynchronous/test_async_loop_unblocked.py new file mode 100644 index 0000000000..06d367aaf8 --- /dev/null +++ b/test/asynchronous/test_async_loop_unblocked.py @@ -0,0 +1,57 @@ +# Copyright 2025-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test that the asynchronous API does not block the event loop.""" +from __future__ import annotations + +import asyncio +import time +from test.asynchronous import AsyncIntegrationTest + +from pymongo.errors import ServerSelectionTimeoutError + + +class TestClientLoopUnblocked(AsyncIntegrationTest): + async def test_client_does_not_block_loop(self): + # Use an unreachable TEST-NET host to ensure that the client times out attempting to create a connection. + client = self.simple_client("192.0.2.1", serverSelectionTimeoutMS=500) + latencies = [] + + # If the loop is being blocked, at least one iteration will have a latency much more than 0.1 seconds + async def background_task(): + last_run = None + try: + while True: + if last_run: + latencies.append(time.monotonic() - last_run) + last_run = time.monotonic() + await asyncio.sleep(0.1) + except asyncio.CancelledError: + latencies.append(time.monotonic() - last_run) + raise + + t = asyncio.create_task(background_task()) + + with self.assertRaisesRegex(ServerSelectionTimeoutError, "No servers found yet"): + await client.admin.command("ping") + + t.cancel() + with self.assertRaises(asyncio.CancelledError): + await t + + self.assertLessEqual( + sorted(latencies, reverse=True)[0], + 0.2, + "Task took longer than twice its sleep time to run again", + ) diff --git a/tools/synchro.py b/tools/synchro.py index aaf7c6836a..541231cf71 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -186,6 +186,7 @@ def async_only_test(f: str) -> bool: "test_async_cancellation.py", "test_async_loop_safety.py", "test_async_contextvars_reset.py", + "test_async_loop_unblocked.py", ] From 92fae2c334597a75b6c26613cc5c5b47be5ec390 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Fri, 13 Jun 2025 11:04:38 -0400 Subject: [PATCH 7/9] Fix typing --- test/asynchronous/test_async_loop_unblocked.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/test_async_loop_unblocked.py b/test/asynchronous/test_async_loop_unblocked.py index 06d367aaf8..afc80f7633 100644 --- a/test/asynchronous/test_async_loop_unblocked.py +++ b/test/asynchronous/test_async_loop_unblocked.py @@ -33,12 +33,13 @@ async def background_task(): last_run = None try: while True: - if last_run: + if last_run is not None: latencies.append(time.monotonic() - last_run) last_run = time.monotonic() await asyncio.sleep(0.1) except asyncio.CancelledError: - latencies.append(time.monotonic() - last_run) + if last_run is not None: + latencies.append(time.monotonic() - last_run) raise t = asyncio.create_task(background_task()) From e08da843f4193cd86a49f911f27373ec9b8e3cbc Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Fri, 13 Jun 2025 11:09:36 -0400 Subject: [PATCH 8/9] Cleanup --- test/asynchronous/test_async_loop_unblocked.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/test/asynchronous/test_async_loop_unblocked.py b/test/asynchronous/test_async_loop_unblocked.py index afc80f7633..6f69b7a5ab 100644 --- a/test/asynchronous/test_async_loop_unblocked.py +++ b/test/asynchronous/test_async_loop_unblocked.py @@ -30,16 +30,14 @@ async def test_client_does_not_block_loop(self): # If the loop is being blocked, at least one iteration will have a latency much more than 0.1 seconds async def background_task(): - last_run = None + start = time.monotonic() try: while True: - if last_run is not None: - latencies.append(time.monotonic() - last_run) - last_run = time.monotonic() + start = time.monotonic() await asyncio.sleep(0.1) + latencies.append(time.monotonic() - start) except asyncio.CancelledError: - if last_run is not None: - latencies.append(time.monotonic() - last_run) + latencies.append(time.monotonic() - start) raise t = asyncio.create_task(background_task()) @@ -54,5 +52,5 @@ async def background_task(): self.assertLessEqual( sorted(latencies, reverse=True)[0], 0.2, - "Task took longer than twice its sleep time to run again", + "Background task was blocked from running", ) From 73d41e1b04f3b9b45f001bb8cb71dfa30d73e6bc Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Fri, 13 Jun 2025 15:08:40 -0400 Subject: [PATCH 9/9] Address review --- pymongo/pool_shared.py | 5 +++-- test/asynchronous/test_async_loop_unblocked.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index d543075225..905f1a4d18 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -253,10 +253,11 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s return sock except asyncio.TimeoutError as e: sock.close() - raise socket.timeout("timed out") from e + err = socket.timeout("timed out") + err.__cause__ = e except OSError as e: sock.close() - err = e + err = e # type: ignore[assignment] if err is not None: raise err diff --git a/test/asynchronous/test_async_loop_unblocked.py b/test/asynchronous/test_async_loop_unblocked.py index 6f69b7a5ab..86f934b798 100644 --- a/test/asynchronous/test_async_loop_unblocked.py +++ b/test/asynchronous/test_async_loop_unblocked.py @@ -51,6 +51,6 @@ async def background_task(): self.assertLessEqual( sorted(latencies, reverse=True)[0], - 0.2, + 1.0, "Background task was blocked from running", )