Skip to content

Commit 694a4a5

Browse files
committed
PYTHON-1866 Add support for $merge aggregation pipeline stage
1 parent 0400949 commit 694a4a5

File tree

6 files changed

+486
-69
lines changed

6 files changed

+486
-69
lines changed

doc/changelog.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ Version 3.9 adds support for MongoDB 4.2. Highlights include:
4949
- Support for publishing Connection Monitoring and Pooling events via the new
5050
:class:`~pymongo.monitoring.ConnectionPoolListener` class. See
5151
:mod:`~pymongo.monitoring` for an example.
52+
- :meth:`pymongo.collection.Collection.aggregate` and
53+
:meth:`pymongo.database.Database.aggregate` now support the ``$merge`` pipeline
5254

5355
.. _URI options specification: https://github.com/mongodb/specifications/blob/master/source/uri-options/uri-options.rst
5456

pymongo/aggregation.py

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,25 @@ def __init__(self, target, cursor_class, pipeline, options,
3838

3939
common.validate_list('pipeline', pipeline)
4040
self._pipeline = pipeline
41+
self._performs_write = False
42+
if pipeline and ("$out" in pipeline[-1] or "$merge" in pipeline[-1]):
43+
self._performs_write = True
4144

4245
common.validate_is_mapping('options', options)
4346
self._options = options
4447

48+
# This is the batchSize that will be used for setting the initial
49+
# batchSize for the cursor, as well as the subsequent getMores.
50+
self._batch_size = common.validate_non_negative_integer_or_none(
51+
"batchSize", self._options.pop("batchSize", None))
52+
53+
# If the cursor option is already specified, avoid overriding it.
54+
self._options.setdefault("cursor", {})
55+
# If the pipeline performs a write, we ignore the initial batchSize
56+
# since the server doesn't return results in this case.
57+
if self._batch_size is not None and not self._performs_write:
58+
self._options["cursor"]["batchSize"] = self._batch_size
59+
4560
self._cursor_class = cursor_class
4661
self._explicit_session = explicit_session
4762
self._user_fields = user_fields
@@ -51,11 +66,6 @@ def __init__(self, target, cursor_class, pipeline, options,
5166
options.pop('collation', None))
5267

5368
self._max_await_time_ms = options.pop('maxAwaitTimeMS', None)
54-
self._batch_size = common.validate_non_negative_integer_or_none(
55-
"batchSize", options.pop("batchSize", None))
56-
57-
self._dollar_out = (self._pipeline and
58-
'$out' in self._pipeline[-1])
5969

6070
@property
6171
def _aggregation_target(self):
@@ -99,16 +109,17 @@ def get_cursor(self, session, server, sock_info, slave_ok):
99109
# - server version is >= 4.2 or
100110
# - server version is >= 3.2 and pipeline doesn't use $out
101111
if (('readConcern' not in cmd) and
102-
((sock_info.max_wire_version >= 4 and not self._dollar_out) or
112+
((sock_info.max_wire_version >= 4 and
113+
not self._performs_write) or
103114
(sock_info.max_wire_version >= 8))):
104115
read_concern = self._target.read_concern
105116
else:
106117
read_concern = None
107118

108119
# Apply this target's write concern if:
109120
# writeConcern has not been specified as a kwarg and pipeline doesn't
110-
# use $out
111-
if 'writeConcern' not in cmd and self._dollar_out:
121+
# perform a write operation
122+
if 'writeConcern' not in cmd and self._performs_write:
112123
write_concern = self._target._write_concern_for(session)
113124
else:
114125
write_concern = None
@@ -159,6 +170,16 @@ def get_cursor(self, session, server, sock_info, slave_ok):
159170

160171

161172
class _CollectionAggregationCommand(_AggregationCommand):
173+
def __init__(self, *args, **kwargs):
174+
# Pop additional option and initialize parent class.
175+
use_cursor = kwargs.pop("use_cursor", True)
176+
super(_CollectionAggregationCommand, self).__init__(*args, **kwargs)
177+
178+
# Remove the cursor document if the user has set use_cursor to False.
179+
self._use_cursor = use_cursor
180+
if not self._use_cursor:
181+
self._options.pop("cursor", None)
182+
162183
@property
163184
def _aggregation_target(self):
164185
return self._target.name
@@ -172,6 +193,15 @@ def _database(self):
172193
return self._target.database
173194

174195

196+
class _CollectionRawAggregationCommand(_CollectionAggregationCommand):
197+
def __init__(self, *args, **kwargs):
198+
super(_CollectionRawAggregationCommand, self).__init__(*args, **kwargs)
199+
200+
# For raw-batches, we set the initial batchSize for the cursor to 0.
201+
if self._use_cursor and not self._performs_write:
202+
self._options["cursor"]["batchSize"] = 0
203+
204+
175205
class _DatabaseAggregationCommand(_AggregationCommand):
176206
@property
177207
def _aggregation_target(self):

pymongo/change_stream.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ def _client(self):
9898
raise NotImplementedError
9999

100100
def _change_stream_options(self):
101+
"""Return the options dict for the $changeStream pipeline stage."""
101102
options = {}
102103
if self._full_document is not None:
103104
options['fullDocument'] = self._full_document
@@ -110,6 +111,7 @@ def _change_stream_options(self):
110111
return options
111112

112113
def _command_options(self):
114+
"""Return the options dict for the aggregation command."""
113115
options = {'cursor': {}}
114116
if self._max_await_time_ms is not None:
115117
options["maxAwaitTimeMS"] = self._max_await_time_ms

pymongo/collection.py

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
from pymongo import (common,
3030
helpers,
3131
message)
32-
from pymongo.aggregation import _CollectionAggregationCommand
32+
from pymongo.aggregation import (_CollectionAggregationCommand,
33+
_CollectionRawAggregationCommand)
3334
from pymongo.bulk import BulkOperationBuilder, _Bulk
3435
from pymongo.command_cursor import CommandCursor, RawBatchCommandCursor
3536
from pymongo.common import ORDERED_TYPES
@@ -2258,11 +2259,8 @@ def options(self, session=None):
22582259

22592260
return options
22602261

2261-
def _aggregate(self, pipeline, cursor_class, first_batch_size, session,
2262+
def _aggregate(self, aggregation_command, pipeline, cursor_class, session,
22622263
explicit_session, **kwargs):
2263-
# Check if we use the $out stage
2264-
dollar_out = pipeline and '$out' in pipeline[-1]
2265-
22662264
# Remove things that are not command options.
22672265
use_cursor = True
22682266
if "useCursor" in kwargs:
@@ -2271,25 +2269,14 @@ def _aggregate(self, pipeline, cursor_class, first_batch_size, session,
22712269
"and will be removed in PyMongo 4.0",
22722270
DeprecationWarning, stacklevel=2)
22732271
use_cursor = common.validate_boolean(
2274-
"useCursor", kwargs.pop("useCursor"))
2275-
2276-
# If the server does not support the "cursor" option we
2277-
# ignore useCursor and batchSize.
2278-
if use_cursor:
2279-
if "cursor" not in kwargs:
2280-
kwargs["cursor"] = {}
2281-
# Ignore batchSize when the $out pipeline stage is used.
2282-
# batchSize is meaningless in that case since the server
2283-
# doesn't return results. This also avoids SERVER-23923.
2284-
if first_batch_size is not None and not dollar_out:
2285-
kwargs["cursor"]["batchSize"] = first_batch_size
2286-
2287-
cmd = _CollectionAggregationCommand(
2272+
"useCursor", kwargs.pop("useCursor", True))
2273+
2274+
cmd = aggregation_command(
22882275
self, cursor_class, pipeline, kwargs, explicit_session,
2289-
user_fields={'cursor': {'firstBatch': 1}})
2276+
user_fields={'cursor': {'firstBatch': 1}}, use_cursor=use_cursor)
22902277
return self.__database.client._retryable_read(
22912278
cmd.get_cursor, self._read_preference_for(session), session,
2292-
retryable=not dollar_out)
2279+
retryable=not cmd._performs_write)
22932280

22942281
def aggregate(self, pipeline, session=None, **kwargs):
22952282
"""Perform an aggregation using the aggregation framework on this
@@ -2314,11 +2301,11 @@ def aggregate(self, pipeline, session=None, **kwargs):
23142301
- `useCursor` (bool): Deprecated. Will be removed in PyMongo 4.0.
23152302
23162303
The :meth:`aggregate` method obeys the :attr:`read_preference` of this
2317-
:class:`Collection`. Please note that using the ``$out`` pipeline stage
2318-
requires a read preference of
2304+
:class:`Collection`. Please note that using the ``$out`` and ``$merge``
2305+
pipeline stages requires a read preference of
23192306
:attr:`~pymongo.read_preferences.ReadPreference.PRIMARY` (the default).
2320-
The server will raise an error if the ``$out`` pipeline stage is used
2321-
with any other read preference.
2307+
The server will raise an error if the ``$out`` or ``$merge`` pipeline
2308+
stages are used with any other read preference.
23222309
23232310
.. note:: This method does not support the 'explain' option. Please
23242311
use :meth:`~pymongo.database.Database.command` instead. An
@@ -2338,6 +2325,8 @@ def aggregate(self, pipeline, session=None, **kwargs):
23382325
A :class:`~pymongo.command_cursor.CommandCursor` over the result
23392326
set.
23402327
2328+
.. versionchanged:: 3.9
2329+
Added support for the ``$merge`` pipeline stage.
23412330
.. versionchanged:: 3.9
23422331
Apply this collection's read concern to pipelines containing the
23432332
`$out` stage when connected to MongoDB >= 4.2.
@@ -2364,9 +2353,9 @@ def aggregate(self, pipeline, session=None, **kwargs):
23642353
https://docs.mongodb.com/manual/reference/command/aggregate
23652354
"""
23662355
with self.__database.client._tmp_session(session, close=False) as s:
2367-
return self._aggregate(pipeline,
2356+
return self._aggregate(_CollectionAggregationCommand,
2357+
pipeline,
23682358
CommandCursor,
2369-
kwargs.get('batchSize'),
23702359
session=s,
23712360
explicit_session=session is not None,
23722361
**kwargs)
@@ -2397,8 +2386,13 @@ def aggregate_raw_batches(self, pipeline, **kwargs):
23972386
if "session" in kwargs:
23982387
raise ConfigurationError(
23992388
"aggregate_raw_batches does not support sessions")
2400-
return self._aggregate(pipeline, RawBatchCommandCursor, 0,
2401-
None, False, **kwargs)
2389+
2390+
return self._aggregate(_CollectionRawAggregationCommand,
2391+
pipeline,
2392+
RawBatchCommandCursor,
2393+
session=None,
2394+
explicit_session=False,
2395+
**kwargs)
24022396

24032397
def watch(self, pipeline=None, full_document='default', resume_after=None,
24042398
max_await_time_ms=None, batch_size=None, collation=None,

pymongo/database.py

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -446,29 +446,6 @@ def _fix_outgoing(self, son, collection):
446446
son = manipulator.transform_outgoing(son, collection)
447447
return son
448448

449-
def _aggregate(self, pipeline, cursor_class, session,
450-
explicit_session, **kwargs):
451-
# Check if we use the $out stage
452-
dollar_out = pipeline and '$out' in pipeline[-1]
453-
454-
# Specify cursor option if it has not been provided by the user.
455-
if "cursor" not in kwargs:
456-
kwargs["cursor"] = {}
457-
458-
# Ignore batchSize when the $out pipeline stage is used.
459-
# batchSize is meaningless in that case since the server
460-
# doesn't return results. This also avoids SERVER-23923.
461-
batch_size = kwargs.get("batchSize")
462-
if batch_size is not None and not dollar_out:
463-
kwargs["cursor"]["batchSize"] = batch_size
464-
465-
cmd = _DatabaseAggregationCommand(
466-
self, cursor_class, pipeline, kwargs, explicit_session,
467-
user_fields={'cursor': {'firstBatch': 1}})
468-
return self.client._retryable_read(
469-
cmd.get_cursor, self._read_preference_for(session), session,
470-
retryable=not dollar_out)
471-
472449
def aggregate(self, pipeline, session=None, **kwargs):
473450
"""Perform a database-level aggregation.
474451
@@ -500,11 +477,11 @@ def aggregate(self, pipeline, session=None, **kwargs):
500477
:class:`~pymongo.collation.Collation`.
501478
502479
The :meth:`aggregate` method obeys the :attr:`read_preference` of this
503-
:class:`Database`. Please note that using the ``$out`` pipeline stage
504-
requires a read preference of
480+
:class:`Database`. Please note that using the ``$out`` or ``$merge``
481+
pipeline stages requires a read preference of
505482
:attr:`~pymongo.read_preferences.ReadPreference.PRIMARY` (the default).
506-
The server will raise an error if the ``$out`` pipeline stage is used
507-
with any other read preference.
483+
The server will raise an error if the ``$out`` or ``$merge`` pipeline
484+
stages is used with any other read preference.
508485
509486
.. note:: This method does not support the 'explain' option. Please
510487
use :meth:`~pymongo.database.Database.command` instead.
@@ -531,11 +508,12 @@ def aggregate(self, pipeline, session=None, **kwargs):
531508
https://docs.mongodb.com/manual/reference/command/aggregate
532509
"""
533510
with self.client._tmp_session(session, close=False) as s:
534-
return self._aggregate(pipeline,
535-
CommandCursor,
536-
session=s,
537-
explicit_session=session is not None,
538-
**kwargs)
511+
cmd = _DatabaseAggregationCommand(
512+
self, CommandCursor, pipeline, kwargs, session is not None,
513+
user_fields={'cursor': {'firstBatch': 1}})
514+
return self.client._retryable_read(
515+
cmd.get_cursor, self._read_preference_for(s), s,
516+
retryable=not cmd._performs_write)
539517

540518
def watch(self, pipeline=None, full_document='default', resume_after=None,
541519
max_await_time_ms=None, batch_size=None, collation=None,

0 commit comments

Comments
 (0)