Skip to content

PYTHON-4585 Cursor.to_list does not apply client's timeoutMS setting #1860

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Sep 17, 2024
Merged
3 changes: 3 additions & 0 deletions pymongo/asynchronous/command_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)

from bson import CodecOptions, _convert_raw_document_lists_to_streams
from pymongo import _csot
from pymongo.asynchronous.cursor import _ConnectionManager
from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS
from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure
Expand Down Expand Up @@ -77,6 +78,7 @@ def __init__(
self._address = address
self._batch_size = batch_size
self._max_await_time_ms = max_await_time_ms
self._timeout = self._collection.database.client.options.timeout
self._session = session
self._explicit_session = explicit_session
self._killed = self._id == 0
Expand Down Expand Up @@ -385,6 +387,7 @@ async def __aenter__(self) -> AsyncCommandCursor[_DocumentType]:
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
await self.close()

@_csot.apply
async def to_list(self, length: Optional[int] = None) -> list[_DocumentType]:
"""Converts the contents of this cursor to a list more efficiently than ``[doc async for doc in cursor]``.

Expand Down
4 changes: 3 additions & 1 deletion pymongo/asynchronous/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from bson import RE_TYPE, _convert_raw_document_lists_to_streams
from bson.code import Code
from bson.son import SON
from pymongo import helpers_shared
from pymongo import _csot, helpers_shared
from pymongo.asynchronous.helpers import anext
from pymongo.collation import validate_collation_or_none
from pymongo.common import (
Expand Down Expand Up @@ -196,6 +196,7 @@ def __init__(
self._explain = False
self._comment = comment
self._max_time_ms = max_time_ms
self._timeout = self._collection.database.client.options.timeout
self._max_await_time_ms: Optional[int] = None
self._max: Optional[Union[dict[Any, Any], _Sort]] = max
self._min: Optional[Union[dict[Any, Any], _Sort]] = min
Expand Down Expand Up @@ -1290,6 +1291,7 @@ async def __aenter__(self) -> AsyncCursor[_DocumentType]:
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
await self.close()

@_csot.apply
async def to_list(self, length: Optional[int] = None) -> list[_DocumentType]:
"""Converts the contents of this cursor to a list more efficiently than ``[doc async for doc in cursor]``.

Expand Down
3 changes: 3 additions & 0 deletions pymongo/synchronous/command_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)

from bson import CodecOptions, _convert_raw_document_lists_to_streams
from pymongo import _csot
from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS
from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure
from pymongo.message import (
Expand Down Expand Up @@ -77,6 +78,7 @@ def __init__(
self._address = address
self._batch_size = batch_size
self._max_await_time_ms = max_await_time_ms
self._timeout = self._collection.database.client.options.timeout
self._session = session
self._explicit_session = explicit_session
self._killed = self._id == 0
Expand Down Expand Up @@ -385,6 +387,7 @@ def __enter__(self) -> CommandCursor[_DocumentType]:
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.close()

@_csot.apply
def to_list(self, length: Optional[int] = None) -> list[_DocumentType]:
"""Converts the contents of this cursor to a list more efficiently than ``[doc for doc in cursor]``.

Expand Down
4 changes: 3 additions & 1 deletion pymongo/synchronous/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from bson import RE_TYPE, _convert_raw_document_lists_to_streams
from bson.code import Code
from bson.son import SON
from pymongo import helpers_shared
from pymongo import _csot, helpers_shared
from pymongo.collation import validate_collation_or_none
from pymongo.common import (
validate_is_document_type,
Expand Down Expand Up @@ -196,6 +196,7 @@ def __init__(
self._explain = False
self._comment = comment
self._max_time_ms = max_time_ms
self._timeout = self._collection.database.client.options.timeout
self._max_await_time_ms: Optional[int] = None
self._max: Optional[Union[dict[Any, Any], _Sort]] = max
self._min: Optional[Union[dict[Any, Any], _Sort]] = min
Expand Down Expand Up @@ -1288,6 +1289,7 @@ def __enter__(self) -> Cursor[_DocumentType]:
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.close()

@_csot.apply
def to_list(self, length: Optional[int] = None) -> list[_DocumentType]:
"""Converts the contents of this cursor to a list more efficiently than ``[doc for doc in cursor]``.

Expand Down
34 changes: 33 additions & 1 deletion test/asynchronous/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
AllowListEventListener,
EventListener,
OvertCommandListener,
delay,
ignore_deprecations,
wait_until,
)
Expand All @@ -44,7 +45,7 @@
from pymongo.asynchronous.cursor import AsyncCursor, CursorType
from pymongo.asynchronous.helpers import anext
from pymongo.collation import Collation
from pymongo.errors import ExecutionTimeout, InvalidOperation, OperationFailure
from pymongo.errors import ExecutionTimeout, InvalidOperation, OperationFailure, PyMongoError
from pymongo.operations import _IndexList
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference
Expand Down Expand Up @@ -1410,6 +1411,18 @@ async def test_to_list_length(self):
docs = await c.to_list(3)
self.assertEqual(len(docs), 2)

async def test_to_list_csot_applied(self):
client = await self.async_single_client(timeoutMS=500)
# Initialize the client with a larger timeout to help make test less flakey
with pymongo.timeout(2):
await client.admin.command("ping")
coll = client.pymongo.test
await coll.insert_many([{} for _ in range(5)])
cursor = coll.find({"$where": delay(1)})
with self.assertRaises(PyMongoError) as ctx:
await cursor.to_list()
self.assertTrue(ctx.exception.timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice.


@async_client_context.require_change_streams
async def test_command_cursor_to_list(self):
# Set maxAwaitTimeMS=1 to speed up the test.
Expand Down Expand Up @@ -1439,6 +1452,25 @@ async def test_command_cursor_to_list_length(self):
result = await db.test.aggregate([pipeline])
self.assertEqual(len(await result.to_list(1)), 1)

@async_client_context.require_failCommand_blockConnection
async def test_command_cursor_to_list_csot_applied(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 things:

  • Let's use regular aggregate here since there's nothing special about $changeStream here and it complicates the test.
  • The failCommands needs to be for "getMore", not "aggregate" since await client.db.test.aggregate() actually runs the aggregate command and to_list only runs getMore. We'll need to set batchSize to ensure at least 1 getMore even runs.

client = await self.async_single_client(timeoutMS=500)
# Initialize the client with a larger timeout to help make test less flakey
with pymongo.timeout(2):
await client.admin.command("ping")
coll = client.pymongo.test
await coll.insert_many([{} for _ in range(5)])
fail_command = {
"configureFailPoint": "failCommand",
"mode": {"times": 5},
"data": {"failCommands": ["getMore"], "blockConnection": True, "blockTimeMS": 1000},
}
cursor = await coll.aggregate([], batchSize=1)
async with self.fail_point(fail_command):
with self.assertRaises(PyMongoError) as ctx:
await cursor.to_list()
self.assertTrue(ctx.exception.timeout)


class TestRawBatchCursor(AsyncIntegrationTest):
async def test_find_raw(self):
Expand Down
34 changes: 33 additions & 1 deletion test/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
AllowListEventListener,
EventListener,
OvertCommandListener,
delay,
ignore_deprecations,
wait_until,
)
Expand All @@ -42,7 +43,7 @@
from bson.code import Code
from pymongo import ASCENDING, DESCENDING
from pymongo.collation import Collation
from pymongo.errors import ExecutionTimeout, InvalidOperation, OperationFailure
from pymongo.errors import ExecutionTimeout, InvalidOperation, OperationFailure, PyMongoError
from pymongo.operations import _IndexList
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference
Expand Down Expand Up @@ -1401,6 +1402,18 @@ def test_to_list_length(self):
docs = c.to_list(3)
self.assertEqual(len(docs), 2)

def test_to_list_csot_applied(self):
client = self.single_client(timeoutMS=500)
# Initialize the client with a larger timeout to help make test less flakey
with pymongo.timeout(2):
client.admin.command("ping")
coll = client.pymongo.test
coll.insert_many([{} for _ in range(5)])
cursor = coll.find({"$where": delay(1)})
with self.assertRaises(PyMongoError) as ctx:
cursor.to_list()
self.assertTrue(ctx.exception.timeout)

@client_context.require_change_streams
def test_command_cursor_to_list(self):
# Set maxAwaitTimeMS=1 to speed up the test.
Expand Down Expand Up @@ -1430,6 +1443,25 @@ def test_command_cursor_to_list_length(self):
result = db.test.aggregate([pipeline])
self.assertEqual(len(result.to_list(1)), 1)

@client_context.require_failCommand_blockConnection
def test_command_cursor_to_list_csot_applied(self):
client = self.single_client(timeoutMS=500)
# Initialize the client with a larger timeout to help make test less flakey
with pymongo.timeout(2):
client.admin.command("ping")
coll = client.pymongo.test
coll.insert_many([{} for _ in range(5)])
fail_command = {
"configureFailPoint": "failCommand",
"mode": {"times": 5},
"data": {"failCommands": ["getMore"], "blockConnection": True, "blockTimeMS": 1000},
}
cursor = coll.aggregate([], batchSize=1)
with self.fail_point(fail_command):
with self.assertRaises(PyMongoError) as ctx:
cursor.to_list()
self.assertTrue(ctx.exception.timeout)


class TestRawBatchCursor(IntegrationTest):
def test_find_raw(self):
Expand Down
Loading