diff --git a/test/asynchronous/test_csot.py b/test/asynchronous/test_csot.py new file mode 100644 index 0000000000..9e928c2251 --- /dev/null +++ b/test/asynchronous/test_csot.py @@ -0,0 +1,118 @@ +# Copyright 2022-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test the CSOT unified spec tests.""" +from __future__ import annotations + +import os +import sys +from pathlib import Path + +sys.path[0:0] = [""] + +from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest +from test.asynchronous.unified_format import generate_test_classes + +import pymongo +from pymongo import _csot +from pymongo.errors import PyMongoError + +_IS_SYNC = False + +# Location of JSON test specifications. +if _IS_SYNC: + TEST_PATH = os.path.join(Path(__file__).resolve().parent, "csot") +else: + TEST_PATH = os.path.join(Path(__file__).resolve().parent.parent, "csot") + +# Generate unified tests. +globals().update(generate_test_classes(TEST_PATH, module=__name__)) + + +class TestCSOT(AsyncIntegrationTest): + RUN_ON_SERVERLESS = True + RUN_ON_LOAD_BALANCER = True + + async def test_timeout_nested(self): + if os.environ.get("SKIP_CSOT_TESTS", ""): + raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...") + coll = self.db.coll + self.assertEqual(_csot.get_timeout(), None) + self.assertEqual(_csot.get_deadline(), float("inf")) + self.assertEqual(_csot.get_rtt(), 0.0) + with pymongo.timeout(10): + await coll.find_one() + self.assertEqual(_csot.get_timeout(), 10) + deadline_10 = _csot.get_deadline() + + # Capped at the original 10 deadline. + with pymongo.timeout(15): + await coll.find_one() + self.assertEqual(_csot.get_timeout(), 15) + self.assertEqual(_csot.get_deadline(), deadline_10) + + # Should be reset to previous values + self.assertEqual(_csot.get_timeout(), 10) + self.assertEqual(_csot.get_deadline(), deadline_10) + await coll.find_one() + + with pymongo.timeout(5): + await coll.find_one() + self.assertEqual(_csot.get_timeout(), 5) + self.assertLess(_csot.get_deadline(), deadline_10) + + # Should be reset to previous values + self.assertEqual(_csot.get_timeout(), 10) + self.assertEqual(_csot.get_deadline(), deadline_10) + await coll.find_one() + + # Should be reset to previous values + self.assertEqual(_csot.get_timeout(), None) + self.assertEqual(_csot.get_deadline(), float("inf")) + self.assertEqual(_csot.get_rtt(), 0.0) + + @async_client_context.require_change_streams + async def test_change_stream_can_resume_after_timeouts(self): + if os.environ.get("SKIP_CSOT_TESTS", ""): + raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...") + coll = self.db.test + await coll.insert_one({}) + async with await coll.watch() as stream: + with pymongo.timeout(0.1): + with self.assertRaises(PyMongoError) as ctx: + await stream.next() + self.assertTrue(ctx.exception.timeout) + self.assertTrue(stream.alive) + with self.assertRaises(PyMongoError) as ctx: + await stream.try_next() + self.assertTrue(ctx.exception.timeout) + self.assertTrue(stream.alive) + # Resume before the insert on 3.6 because 4.0 is required to avoid skipping documents + if async_client_context.version < (4, 0): + await stream.try_next() + await coll.insert_one({}) + with pymongo.timeout(10): + self.assertTrue(await stream.next()) + self.assertTrue(stream.alive) + # Timeout applies to entire next() call, not only individual commands. + with pymongo.timeout(0.5): + with self.assertRaises(PyMongoError) as ctx: + await stream.next() + self.assertTrue(ctx.exception.timeout) + self.assertTrue(stream.alive) + self.assertFalse(stream.alive) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/asynchronous/unified_format.py b/test/asynchronous/unified_format.py index 149aad9786..695f58ee27 100644 --- a/test/asynchronous/unified_format.py +++ b/test/asynchronous/unified_format.py @@ -1387,7 +1387,6 @@ async def run_scenario(self, spec, uri=None): # transaction (from a test failure) from blocking collection/database # operations during test set up and tear down. await self.kill_all_sessions() - self.addAsyncCleanup(self.kill_all_sessions) if "csot" in self.id().lower(): # Retry CSOT tests up to 2 times to deal with flakey tests. @@ -1395,7 +1394,11 @@ async def run_scenario(self, spec, uri=None): for i in range(attempts): try: return await self._run_scenario(spec, uri) - except AssertionError: + except (AssertionError, OperationFailure) as exc: + if isinstance(exc, OperationFailure) and ( + _IS_SYNC or "failpoint" not in exc._message + ): + raise if i < attempts - 1: print( f"Retrying after attempt {i+1} of {self.id()} failed with:\n" diff --git a/test/test_csot.py b/test/test_csot.py index c075a07d5a..5201156a1d 100644 --- a/test/test_csot.py +++ b/test/test_csot.py @@ -17,6 +17,7 @@ import os import sys +from pathlib import Path sys.path[0:0] = [""] @@ -27,8 +28,13 @@ from pymongo import _csot from pymongo.errors import PyMongoError +_IS_SYNC = True + # Location of JSON test specifications. -TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "csot") +if _IS_SYNC: + TEST_PATH = os.path.join(Path(__file__).resolve().parent, "csot") +else: + TEST_PATH = os.path.join(Path(__file__).resolve().parent.parent, "csot") # Generate unified tests. globals().update(generate_test_classes(TEST_PATH, module=__name__)) diff --git a/test/unified_format.py b/test/unified_format.py index b2e6ae1e83..73dee10ddf 100644 --- a/test/unified_format.py +++ b/test/unified_format.py @@ -1374,7 +1374,6 @@ def run_scenario(self, spec, uri=None): # transaction (from a test failure) from blocking collection/database # operations during test set up and tear down. self.kill_all_sessions() - self.addCleanup(self.kill_all_sessions) if "csot" in self.id().lower(): # Retry CSOT tests up to 2 times to deal with flakey tests. @@ -1382,7 +1381,11 @@ def run_scenario(self, spec, uri=None): for i in range(attempts): try: return self._run_scenario(spec, uri) - except AssertionError: + except (AssertionError, OperationFailure) as exc: + if isinstance(exc, OperationFailure) and ( + _IS_SYNC or "failpoint" not in exc._message + ): + raise if i < attempts - 1: print( f"Retrying after attempt {i+1} of {self.id()} failed with:\n" diff --git a/tools/synchro.py b/tools/synchro.py index 519ebb102b..39c53b435f 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -208,6 +208,7 @@ def async_only_test(f: str) -> bool: "test_connections_survive_primary_stepdown_spec.py", "test_create_entities.py", "test_crud_unified.py", + "test_csot.py", "test_cursor.py", "test_custom_types.py", "test_database.py",