@@ -71,37 +71,75 @@ public function __construct(Manager $manager, $namespace, WriteConcern $writeCon
71
71
72
72
/**
73
73
* Runs an aggregation framework pipeline
74
- * NOTE: The return value of this method depends on your MongoDB server version
75
- * and possibly options.
76
- * MongoDB 2.6 (and later) will return a Cursor by default
77
- * MongoDB pre 2.6 will return an ArrayIterator
74
+ *
75
+ * Note: this method's return value depends on the MongoDB server version
76
+ * and the "useCursor" option. If "useCursor" is true, a Cursor will be
77
+ * returned; otherwise, an ArrayIterator is returned, which wraps the
78
+ * "result" array from the command response document.
78
79
*
79
80
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
80
- * @see Collection::getAggregateOptions() for supported $options
81
81
*
82
82
* @param array $pipeline The pipeline to execute
83
83
* @param array $options Additional options
84
84
* @return Iterator
85
85
*/
86
86
public function aggregate (array $ pipeline , array $ options = array ())
87
87
{
88
- $ options = array_merge ($ this ->getAggregateOptions (), $ options );
89
- $ options = $ this ->_massageAggregateOptions ($ options );
90
- $ cmd = array (
91
- "aggregate " => $ this ->collname ,
92
- "pipeline " => $ pipeline ,
93
- ) + $ options ;
88
+ $ readPreference = new ReadPreference (ReadPreference::RP_PRIMARY );
89
+ $ server = $ this ->manager ->selectServer ($ readPreference );
90
+
91
+ if (FeatureDetection::isSupported ($ server , FeatureDetection::API_AGGREGATE_CURSOR )) {
92
+ $ options = array_merge (
93
+ array (
94
+ /**
95
+ * Enables writing to temporary files. When set to true, aggregation stages
96
+ * can write data to the _tmp subdirectory in the dbPath directory. The
97
+ * default is false.
98
+ *
99
+ * @see http://docs.mongodb.org/manual/reference/command/aggregate/
100
+ */
101
+ 'allowDiskUse ' => false ,
102
+ /**
103
+ * The number of documents to return per batch.
104
+ *
105
+ * @see http://docs.mongodb.org/manual/reference/command/aggregate/
106
+ */
107
+ 'batchSize ' => 0 ,
108
+ /**
109
+ * The maximum amount of time to allow the query to run.
110
+ *
111
+ * @see http://docs.mongodb.org/manual/reference/command/aggregate/
112
+ */
113
+ 'maxTimeMS ' => 0 ,
114
+ /**
115
+ * Indicates if the results should be provided as a cursor.
116
+ *
117
+ * @see http://docs.mongodb.org/manual/reference/command/aggregate/
118
+ */
119
+ 'useCursor ' => true ,
120
+ ),
121
+ $ options
122
+ );
123
+ }
94
124
95
- $ cursor = $ this ->_runCommand ($ this ->dbname , $ cmd );
125
+ $ options = $ this ->_massageAggregateOptions ($ options );
126
+ $ command = new Command (array (
127
+ 'aggregate ' => $ this ->collname ,
128
+ 'pipeline ' => $ pipeline ,
129
+ ) + $ options );
130
+ $ cursor = $ server ->executeCommand ($ this ->dbname , $ command );
96
131
97
- if (isset ( $ cmd [ " cursor " ]) && $ cmd ["cursor " ]) {
132
+ if ( ! empty ( $ options ["cursor " ]) ) {
98
133
return $ cursor ;
99
134
}
100
135
101
136
$ doc = current ($ cursor ->toArray ());
102
137
103
138
if ($ doc ["ok " ]) {
104
- return new \ArrayIterator ($ doc ["result " ]);
139
+ return new \ArrayIterator (array_map (
140
+ function (\stdClass $ document ) { return (array ) $ document ; },
141
+ $ doc ["result " ]
142
+ ));
105
143
}
106
144
107
145
throw $ this ->_generateCommandException ($ doc );
@@ -239,12 +277,12 @@ public function count(array $filter = array(), array $options = array())
239
277
{
240
278
$ cmd = array (
241
279
"count " => $ this ->collname ,
242
- "query " => $ filter ,
280
+ "query " => ( object ) $ filter ,
243
281
) + $ options ;
244
282
245
283
$ doc = current ($ this ->_runCommand ($ this ->dbname , $ cmd )->toArray ());
246
284
if ($ doc ["ok " ]) {
247
- return $ doc ["n " ];
285
+ return ( integer ) $ doc ["n " ];
248
286
}
249
287
throw $ this ->_generateCommandException ($ doc );
250
288
}
@@ -363,7 +401,7 @@ public function distinct($fieldName, array $filter = array(), array $options = a
363
401
$ cmd = array (
364
402
"distinct " => $ this ->collname ,
365
403
"key " => $ fieldName ,
366
- "query " => $ filter ,
404
+ "query " => ( object ) $ filter ,
367
405
) + $ options ;
368
406
369
407
$ doc = current ($ this ->_runCommand ($ this ->dbname , $ cmd )->toArray ());
@@ -497,7 +535,7 @@ public function findOneAndDelete(array $filter, array $options = array())
497
535
498
536
$ doc = current ($ this ->_runCommand ($ this ->dbname , $ cmd )->toArray ());
499
537
if ($ doc ["ok " ]) {
500
- return $ doc ["value " ];
538
+ return is_object ( $ doc [ " value " ]) ? ( array ) $ doc [ " value " ] : $ doc ["value " ];
501
539
}
502
540
503
541
throw $ this ->_generateCommandException ($ doc );
@@ -534,7 +572,7 @@ public function findOneAndReplace(array $filter, array $replacement, array $opti
534
572
535
573
$ doc = current ($ this ->_runCommand ($ this ->dbname , $ cmd )->toArray ());
536
574
if ($ doc ["ok " ]) {
537
- return $ doc[ " value " ] ;
575
+ return $ this -> _massageFindAndModifyResult ( $ doc, $ options ) ;
538
576
}
539
577
540
578
throw $ this ->_generateCommandException ($ doc );
@@ -572,61 +610,12 @@ public function findOneAndUpdate(array $filter, array $update, array $options =
572
610
573
611
$ doc = current ($ this ->_runCommand ($ this ->dbname , $ cmd )->toArray ());
574
612
if ($ doc ["ok " ]) {
575
- return $ doc[ " value " ] ;
613
+ return $ this -> _massageFindAndModifyResult ( $ doc, $ options ) ;
576
614
}
577
615
578
616
throw $ this ->_generateCommandException ($ doc );
579
617
}
580
618
581
- /**
582
- * Retrieves all aggregate options with their default values.
583
- *
584
- * @return array of Collection::aggregate() options
585
- */
586
- public function getAggregateOptions ()
587
- {
588
- $ opts = array (
589
- /**
590
- * Enables writing to temporary files. When set to true, aggregation stages
591
- * can write data to the _tmp subdirectory in the dbPath directory. The
592
- * default is false.
593
- *
594
- * @see http://docs.mongodb.org/manual/reference/command/aggregate/
595
- */
596
- "allowDiskUse " => false ,
597
-
598
- /**
599
- * The number of documents to return per batch.
600
- *
601
- * @see http://docs.mongodb.org/manual/reference/command/aggregate/
602
- */
603
- "batchSize " => 0 ,
604
-
605
- /**
606
- * The maximum amount of time to allow the query to run.
607
- *
608
- * @see http://docs.mongodb.org/manual/reference/command/aggregate/
609
- */
610
- "maxTimeMS " => 0 ,
611
-
612
- /**
613
- * Indicates if the results should be provided as a cursor.
614
- *
615
- * The default for this value depends on the version of the server.
616
- * - Servers >= 2.6 will use a default of true.
617
- * - Servers < 2.6 will use a default of false.
618
- *
619
- * As with any other property, this value can be changed.
620
- *
621
- * @see http://docs.mongodb.org/manual/reference/command/aggregate/
622
- */
623
- "useCursor " => true ,
624
- );
625
-
626
- /* FIXME: Add a version check for useCursor */
627
- return $ opts ;
628
- }
629
-
630
619
/**
631
620
* Retrieves all Bulk Write options with their default values.
632
621
*
@@ -961,7 +950,7 @@ public function getWriteOptions()
961
950
*
962
951
* @see http://docs.mongodb.org/manual/reference/command/insert/
963
952
*
964
- * @param array $documents The documents to insert
953
+ * @param array[]|object[] $documents The documents to insert
965
954
* @return InsertManyResult
966
955
*/
967
956
public function insertMany (array $ documents )
@@ -976,6 +965,8 @@ public function insertMany(array $documents)
976
965
977
966
if ($ insertedId !== null ) {
978
967
$ insertedIds [$ i ] = $ insertedId ;
968
+ } else {
969
+ $ insertedIds [$ i ] = is_array ($ document ) ? $ document ['_id ' ] : $ document ->_id ;
979
970
}
980
971
}
981
972
@@ -989,18 +980,21 @@ public function insertMany(array $documents)
989
980
*
990
981
* @see http://docs.mongodb.org/manual/reference/command/insert/
991
982
*
992
- * @param array $document The document to insert
993
- * @param array $options Additional options
983
+ * @param array|object $document The document to insert
994
984
* @return InsertOneResult
995
985
*/
996
- public function insertOne (array $ document )
986
+ public function insertOne ($ document )
997
987
{
998
988
$ options = array_merge ($ this ->getWriteOptions ());
999
989
1000
990
$ bulk = new BulkWrite ($ options ["ordered " ]);
1001
991
$ id = $ bulk ->insert ($ document );
1002
992
$ wr = $ this ->manager ->executeBulkWrite ($ this ->ns , $ bulk , $ this ->wc );
1003
993
994
+ if ($ id === null ) {
995
+ $ id = is_array ($ document ) ? $ document ['_id ' ] : $ document ->_id ;
996
+ }
997
+
1004
998
return new InsertOneResult ($ wr , $ id );
1005
999
}
1006
1000
@@ -1038,7 +1032,7 @@ public function replaceOne(array $filter, array $update, array $options = array(
1038
1032
if (isset ($ firstKey [0 ]) && $ firstKey [0 ] == '$ ' ) {
1039
1033
throw new InvalidArgumentException ("First key in \$update must NOT be a \$operator " );
1040
1034
}
1041
- $ wr = $ this ->_update ($ filter , $ update , $ options );
1035
+ $ wr = $ this ->_update ($ filter , $ update , $ options + array ( " multi " => false ) );
1042
1036
1043
1037
return new UpdateResult ($ wr );
1044
1038
}
@@ -1057,7 +1051,7 @@ public function replaceOne(array $filter, array $update, array $options = array(
1057
1051
*/
1058
1052
public function updateMany (array $ filter , $ update , array $ options = array ())
1059
1053
{
1060
- $ wr = $ this ->_update ($ filter , $ update , $ options + array ("limit " => 0 ));
1054
+ $ wr = $ this ->_update ($ filter , $ update , $ options + array ("multi " => true ));
1061
1055
1062
1056
return new UpdateResult ($ wr );
1063
1057
}
@@ -1080,7 +1074,7 @@ public function updateOne(array $filter, array $update, array $options = array()
1080
1074
if (!isset ($ firstKey [0 ]) || $ firstKey [0 ] != '$ ' ) {
1081
1075
throw new InvalidArgumentException ("First key in \$update must be a \$operator " );
1082
1076
}
1083
- $ wr = $ this ->_update ($ filter , $ update , $ options );
1077
+ $ wr = $ this ->_update ($ filter , $ update , $ options + array ( " multi " => false ) );
1084
1078
1085
1079
return new UpdateResult ($ wr );
1086
1080
}
@@ -1146,14 +1140,66 @@ final protected function _generateCommandException($doc)
1146
1140
*/
1147
1141
protected function _massageAggregateOptions ($ options )
1148
1142
{
1149
- if ($ options ["useCursor " ]) {
1150
- $ options ["cursor " ] = array ("batchSize " => $ options ["batchSize " ]);
1143
+ if ( ! empty ($ options ["useCursor " ])) {
1144
+ $ options ["cursor " ] = isset ($ options ["batchSize " ])
1145
+ ? array ("batchSize " => (integer ) $ options ["batchSize " ])
1146
+ : new stdClass ;
1151
1147
}
1152
1148
unset($ options ["useCursor " ], $ options ["batchSize " ]);
1153
1149
1154
1150
return $ options ;
1155
1151
}
1156
1152
1153
+ /**
1154
+ * Internal helper for massaging findandmodify options
1155
+ * @internal
1156
+ */
1157
+ final protected function _massageFindAndModifyOptions ($ options , $ update = array ())
1158
+ {
1159
+ $ ret = array (
1160
+ "sort " => $ options ["sort " ],
1161
+ "new " => isset ($ options ["returnDocument " ]) ? $ options ["returnDocument " ] == self ::FIND_ONE_AND_RETURN_AFTER : false ,
1162
+ "fields " => $ options ["projection " ],
1163
+ "upsert " => isset ($ options ["upsert " ]) ? $ options ["upsert " ] : false ,
1164
+ );
1165
+ if ($ update ) {
1166
+ $ ret ["update " ] = $ update ;
1167
+ } else {
1168
+ $ ret ["remove " ] = true ;
1169
+ }
1170
+ return $ ret ;
1171
+ }
1172
+
1173
+ /**
1174
+ * Internal helper for massaging the findAndModify result.
1175
+ *
1176
+ * @internal
1177
+ * @param array $result
1178
+ * @param array $options
1179
+ * @return array|null
1180
+ */
1181
+ final protected function _massageFindAndModifyResult (array $ result , array $ options )
1182
+ {
1183
+ if ($ result ['value ' ] === null ) {
1184
+ return null ;
1185
+ }
1186
+
1187
+ /* Prior to 3.0, findAndModify returns an empty document instead of null
1188
+ * when an upsert is performed and the pre-modified document was
1189
+ * requested.
1190
+ */
1191
+ if ($ options ['upsert ' ] && ! $ options ['new ' ] &&
1192
+ isset ($ result ['lastErrorObject ' ]->updatedExisting ) &&
1193
+ ! $ result ['lastErrorObject ' ]->updatedExisting ) {
1194
+
1195
+ return null ;
1196
+ }
1197
+
1198
+ return is_object ($ result ["value " ])
1199
+ ? (array ) $ result ['value ' ]
1200
+ : $ result ['value ' ];
1201
+ }
1202
+
1157
1203
/**
1158
1204
* Constructs the Query Wire Protocol field 'flags' based on $options
1159
1205
* provided to other helpers
0 commit comments