From 5141c5e3b95cf67835099d0162eec20ab41266e1 Mon Sep 17 00:00:00 2001 From: Iris Date: Thu, 26 Sep 2024 17:12:16 -0700 Subject: [PATCH 1/7] Migrate test_retryable_reads.py to async --- test/asynchronous/test_retryable_reads.py | 196 ++++++++++++++++++++++ test/test_retryable_reads.py | 3 + tools/synchro.py | 1 + 3 files changed, 200 insertions(+) create mode 100644 test/asynchronous/test_retryable_reads.py diff --git a/test/asynchronous/test_retryable_reads.py b/test/asynchronous/test_retryable_reads.py new file mode 100644 index 0000000000..0fb0f82964 --- /dev/null +++ b/test/asynchronous/test_retryable_reads.py @@ -0,0 +1,196 @@ +# Copyright 2019-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test retryable reads spec.""" +from __future__ import annotations + +import os +import pprint +import sys +import threading + +from pymongo.errors import AutoReconnect + +sys.path[0:0] = [""] + +from test.asynchronous import ( + AsyncIntegrationTest, + AsyncPyMongoTestCase, + async_client_context, + client_knobs, + unittest, +) +from test.utils import ( + CMAPListener, + OvertCommandListener, + set_fail_point, +) + +from pymongo.monitoring import ( + ConnectionCheckedOutEvent, + ConnectionCheckOutFailedEvent, + ConnectionCheckOutFailedReason, + PoolClearedEvent, +) + +_IS_SYNC = False + +# Location of JSON test specifications. +_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "retryable_reads", "legacy") + + +class TestClientOptions(AsyncPyMongoTestCase): + async def test_default(self): + client = self.simple_client(connect=False) + self.assertEqual(client.options.retry_reads, True) + + async def test_kwargs(self): + client = self.simple_client(retryReads=True, connect=False) + self.assertEqual(client.options.retry_reads, True) + client = self.simple_client(retryReads=False, connect=False) + self.assertEqual(client.options.retry_reads, False) + + async def test_uri(self): + client = self.simple_client("mongodb://h/?retryReads=true", connect=False) + self.assertEqual(client.options.retry_reads, True) + client = self.simple_client("mongodb://h/?retryReads=false", connect=False) + self.assertEqual(client.options.retry_reads, False) + + +class FindThread(threading.Thread): + def __init__(self, collection): + super().__init__() + self.daemon = True + self.collection = collection + self.passed = False + + async def run(self): + await self.collection.find_one({}) + self.passed = True + + +class TestPoolPausedError(AsyncIntegrationTest): + # Pools don't get paused in load balanced mode. + RUN_ON_LOAD_BALANCER = False + RUN_ON_SERVERLESS = False + + @async_client_context.require_sync + @async_client_context.require_failCommand_blockConnection + @client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05) + async def test_pool_paused_error_is_retryable(self): + if "PyPy" in sys.version: + # Tracked in PYTHON-3519 + self.skipTest("Test is flakey on PyPy") + cmap_listener = CMAPListener() + cmd_listener = OvertCommandListener() + client = await self.async_rs_or_single_client( + maxPoolSize=1, event_listeners=[cmap_listener, cmd_listener] + ) + self.addAsyncCleanup(client.close) + for _ in range(10): + cmap_listener.reset() + cmd_listener.reset() + threads = [FindThread(client.pymongo_test.test) for _ in range(2)] + fail_command = { + "mode": {"times": 1}, + "data": { + "failCommands": ["find"], + "blockConnection": True, + "blockTimeMS": 1000, + "errorCode": 91, + }, + } + async with self.fail_point(fail_command): + for thread in threads: + thread.start() + for thread in threads: + thread.join() + for thread in threads: + self.assertTrue(thread.passed) + + # It's possible that SDAM can rediscover the server and mark the + # pool ready before the thread in the wait queue has a chance + # to run. Repeat the test until the thread actually encounters + # a PoolClearedError. + if cmap_listener.event_count(ConnectionCheckOutFailedEvent): + break + + # Via CMAP monitoring, assert that the first check out succeeds. + cmap_events = cmap_listener.events_by_type( + (ConnectionCheckedOutEvent, ConnectionCheckOutFailedEvent, PoolClearedEvent) + ) + msg = pprint.pformat(cmap_listener.events) + self.assertIsInstance(cmap_events[0], ConnectionCheckedOutEvent, msg) + self.assertIsInstance(cmap_events[1], PoolClearedEvent, msg) + self.assertIsInstance(cmap_events[2], ConnectionCheckOutFailedEvent, msg) + self.assertEqual(cmap_events[2].reason, ConnectionCheckOutFailedReason.CONN_ERROR, msg) + self.assertIsInstance(cmap_events[3], ConnectionCheckedOutEvent, msg) + + # Connection check out failures are not reflected in command + # monitoring because we only publish command events _after_ checking + # out a connection. + started = cmd_listener.started_events + msg = pprint.pformat(cmd_listener.results) + self.assertEqual(3, len(started), msg) + succeeded = cmd_listener.succeeded_events + self.assertEqual(2, len(succeeded), msg) + failed = cmd_listener.failed_events + self.assertEqual(1, len(failed), msg) + + +class TestRetryableReads(AsyncIntegrationTest): + @async_client_context.require_multiple_mongoses + @async_client_context.require_failCommand_fail_point + async def test_retryable_reads_in_sharded_cluster_multiple_available(self): + fail_command = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["find"], + "closeConnection": True, + "appName": "retryableReadTest", + }, + } + + mongos_clients = [] + + for mongos in async_client_context.mongos_seeds().split(","): + client = await self.async_rs_or_single_client(mongos) + set_fail_point(client, fail_command) + self.addAsyncCleanup(client.close) + mongos_clients.append(client) + + listener = OvertCommandListener() + client = self.async_rs_or_single_client( + async_client_context.mongos_seeds(), + appName="retryableReadTest", + event_listeners=[listener], + retryReads=True, + ) + + async with self.fail_point(fail_command): + with self.assertRaises(AutoReconnect): + await client.t.t.find_one({}) + + # Disable failpoints on each mongos + for client in mongos_clients: + fail_command["mode"] = "off" + set_fail_point(client, fail_command) + + self.assertEqual(len(listener.failed_events), 2) + self.assertEqual(len(listener.succeeded_events), 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_retryable_reads.py b/test/test_retryable_reads.py index b4fafe4652..a6d56c8fad 100644 --- a/test/test_retryable_reads.py +++ b/test/test_retryable_reads.py @@ -44,6 +44,8 @@ PoolClearedEvent, ) +_IS_SYNC = True + # Location of JSON test specifications. _TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "retryable_reads", "legacy") @@ -83,6 +85,7 @@ class TestPoolPausedError(IntegrationTest): RUN_ON_LOAD_BALANCER = False RUN_ON_SERVERLESS = False + @client_context.require_sync @client_context.require_failCommand_blockConnection @client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05) def test_pool_paused_error_is_retryable(self): diff --git a/tools/synchro.py b/tools/synchro.py index c5b0afb643..3333b0de2e 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -193,6 +193,7 @@ def async_only_test(f: str) -> bool: "test_logger.py", "test_monitoring.py", "test_raw_bson.py", + "test_retryable_reads.py", "test_retryable_writes.py", "test_session.py", "test_transactions.py", From e343b5cda05cc3110ed7cf15eb271477ca1bebf7 Mon Sep 17 00:00:00 2001 From: Iris Date: Thu, 26 Sep 2024 17:37:35 -0700 Subject: [PATCH 2/7] add missing await --- test/asynchronous/test_retryable_reads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/asynchronous/test_retryable_reads.py b/test/asynchronous/test_retryable_reads.py index 0fb0f82964..f649f7d6f7 100644 --- a/test/asynchronous/test_retryable_reads.py +++ b/test/asynchronous/test_retryable_reads.py @@ -172,7 +172,7 @@ async def test_retryable_reads_in_sharded_cluster_multiple_available(self): mongos_clients.append(client) listener = OvertCommandListener() - client = self.async_rs_or_single_client( + client = await self.async_rs_or_single_client( async_client_context.mongos_seeds(), appName="retryableReadTest", event_listeners=[listener], From 9d5790da3898a8241dfd2626c742e55dda606ca1 Mon Sep 17 00:00:00 2001 From: Iris Date: Fri, 27 Sep 2024 09:04:36 -0700 Subject: [PATCH 3/7] add async_set_fail_point --- test/asynchronous/test_retryable_reads.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/asynchronous/test_retryable_reads.py b/test/asynchronous/test_retryable_reads.py index f649f7d6f7..d576f8987e 100644 --- a/test/asynchronous/test_retryable_reads.py +++ b/test/asynchronous/test_retryable_reads.py @@ -34,7 +34,7 @@ from test.utils import ( CMAPListener, OvertCommandListener, - set_fail_point, + async_set_fail_point, ) from pymongo.monitoring import ( @@ -167,7 +167,7 @@ async def test_retryable_reads_in_sharded_cluster_multiple_available(self): for mongos in async_client_context.mongos_seeds().split(","): client = await self.async_rs_or_single_client(mongos) - set_fail_point(client, fail_command) + await async_set_fail_point(client, fail_command) self.addAsyncCleanup(client.close) mongos_clients.append(client) @@ -186,7 +186,7 @@ async def test_retryable_reads_in_sharded_cluster_multiple_available(self): # Disable failpoints on each mongos for client in mongos_clients: fail_command["mode"] = "off" - set_fail_point(client, fail_command) + await async_set_fail_point(client, fail_command) self.assertEqual(len(listener.failed_events), 2) self.assertEqual(len(listener.succeeded_events), 0) From ba6d9f3f43265dcfbaedfdebf0a336d2d187189c Mon Sep 17 00:00:00 2001 From: Iris Date: Sun, 29 Sep 2024 22:18:21 -0700 Subject: [PATCH 4/7] remove redundant clean up --- test/asynchronous/test_retryable_reads.py | 2 -- test/test_retryable_reads.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/test/asynchronous/test_retryable_reads.py b/test/asynchronous/test_retryable_reads.py index d576f8987e..dbb5f5d4be 100644 --- a/test/asynchronous/test_retryable_reads.py +++ b/test/asynchronous/test_retryable_reads.py @@ -97,7 +97,6 @@ async def test_pool_paused_error_is_retryable(self): client = await self.async_rs_or_single_client( maxPoolSize=1, event_listeners=[cmap_listener, cmd_listener] ) - self.addAsyncCleanup(client.close) for _ in range(10): cmap_listener.reset() cmd_listener.reset() @@ -168,7 +167,6 @@ async def test_retryable_reads_in_sharded_cluster_multiple_available(self): for mongos in async_client_context.mongos_seeds().split(","): client = await self.async_rs_or_single_client(mongos) await async_set_fail_point(client, fail_command) - self.addAsyncCleanup(client.close) mongos_clients.append(client) listener = OvertCommandListener() diff --git a/test/test_retryable_reads.py b/test/test_retryable_reads.py index a6d56c8fad..87bb830c22 100644 --- a/test/test_retryable_reads.py +++ b/test/test_retryable_reads.py @@ -97,7 +97,6 @@ def test_pool_paused_error_is_retryable(self): client = self.rs_or_single_client( maxPoolSize=1, event_listeners=[cmap_listener, cmd_listener] ) - self.addCleanup(client.close) for _ in range(10): cmap_listener.reset() cmd_listener.reset() @@ -168,7 +167,6 @@ def test_retryable_reads_in_sharded_cluster_multiple_available(self): for mongos in client_context.mongos_seeds().split(","): client = self.rs_or_single_client(mongos) set_fail_point(client, fail_command) - self.addCleanup(client.close) mongos_clients.append(client) listener = OvertCommandListener() From 620831b9ff2a7ee894bec0f7391ef22c6e72af81 Mon Sep 17 00:00:00 2001 From: Iris Date: Mon, 30 Sep 2024 14:57:34 -0700 Subject: [PATCH 5/7] change test dir --- test/asynchronous/test_retryable_reads.py | 11 ++++++++++- test/test_retryable_reads.py | 11 ++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/test_retryable_reads.py b/test/asynchronous/test_retryable_reads.py index dbb5f5d4be..9cc0259186 100644 --- a/test/asynchronous/test_retryable_reads.py +++ b/test/asynchronous/test_retryable_reads.py @@ -47,7 +47,16 @@ _IS_SYNC = False # Location of JSON test specifications. -_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "retryable_reads", "legacy") +if _IS_SYNC: + _TEST_PATH = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "retryable_reads", "legacy" + ) +else: + _TEST_PATH = os.path.join( + os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)), + "retryable_reads", + "legacy", + ) class TestClientOptions(AsyncPyMongoTestCase): diff --git a/test/test_retryable_reads.py b/test/test_retryable_reads.py index 87bb830c22..a4b62330d3 100644 --- a/test/test_retryable_reads.py +++ b/test/test_retryable_reads.py @@ -47,7 +47,16 @@ _IS_SYNC = True # Location of JSON test specifications. -_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "retryable_reads", "legacy") +if _IS_SYNC: + _TEST_PATH = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "retryable_reads", "legacy" + ) +else: + _TEST_PATH = os.path.join( + os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)), + "retryable_reads", + "legacy", + ) class TestClientOptions(PyMongoTestCase): From 0438ba2fe85aff414b0ff6e33cac3b55b320d620 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 1 Oct 2024 08:36:14 -0700 Subject: [PATCH 6/7] remove _TEST_PATH --- test/asynchronous/test_retryable_reads.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/test/asynchronous/test_retryable_reads.py b/test/asynchronous/test_retryable_reads.py index 9cc0259186..b2d86f5d84 100644 --- a/test/asynchronous/test_retryable_reads.py +++ b/test/asynchronous/test_retryable_reads.py @@ -46,18 +46,6 @@ _IS_SYNC = False -# Location of JSON test specifications. -if _IS_SYNC: - _TEST_PATH = os.path.join( - os.path.dirname(os.path.realpath(__file__)), "retryable_reads", "legacy" - ) -else: - _TEST_PATH = os.path.join( - os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)), - "retryable_reads", - "legacy", - ) - class TestClientOptions(AsyncPyMongoTestCase): async def test_default(self): From 61591fdb09d6b1e89d6a9e4e3d984c60656717fd Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 1 Oct 2024 08:44:07 -0700 Subject: [PATCH 7/7] forgot to run precommit --- test/test_retryable_reads.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/test/test_retryable_reads.py b/test/test_retryable_reads.py index a4b62330d3..d4951db5ee 100644 --- a/test/test_retryable_reads.py +++ b/test/test_retryable_reads.py @@ -46,18 +46,6 @@ _IS_SYNC = True -# Location of JSON test specifications. -if _IS_SYNC: - _TEST_PATH = os.path.join( - os.path.dirname(os.path.realpath(__file__)), "retryable_reads", "legacy" - ) -else: - _TEST_PATH = os.path.join( - os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)), - "retryable_reads", - "legacy", - ) - class TestClientOptions(PyMongoTestCase): def test_default(self):