Skip to content

Commit 509d993

Browse files
committed
Extract Collection::aggregate() to an operation class
1 parent 6614f4d commit 509d993

File tree

2 files changed

+173
-76
lines changed

2 files changed

+173
-76
lines changed

src/Collection.php

Lines changed: 8 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
use MongoDB\Model\IndexInfoIterator;
1616
use MongoDB\Model\IndexInfoIteratorIterator;
1717
use MongoDB\Model\IndexInput;
18+
use MongoDB\Operation\Aggregate;
19+
use Traversable;
1820

1921
class Collection
2022
{
@@ -80,79 +82,25 @@ public function __toString()
8082
}
8183

8284
/**
83-
* Runs an aggregation framework pipeline
85+
* Executes an aggregation framework pipeline on the collection.
8486
*
8587
* Note: this method's return value depends on the MongoDB server version
8688
* and the "useCursor" option. If "useCursor" is true, a Cursor will be
8789
* returned; otherwise, an ArrayIterator is returned, which wraps the
8890
* "result" array from the command response document.
8991
*
9092
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
91-
*
92-
* @param array $pipeline The pipeline to execute
93-
* @param array $options Additional options
94-
* @return Iterator
93+
* @param array $pipeline List of pipeline operations
94+
* @param array $options Command options
95+
* @return Traversable
9596
*/
9697
public function aggregate(array $pipeline, array $options = array())
9798
{
9899
$readPreference = new ReadPreference(ReadPreference::RP_PRIMARY);
99100
$server = $this->manager->selectServer($readPreference);
101+
$operation = new Aggregate($this->dbname, $this->collname, $pipeline, $options);
100102

101-
if (FeatureDetection::isSupported($server, FeatureDetection::API_AGGREGATE_CURSOR)) {
102-
$options = array_merge(
103-
array(
104-
/**
105-
* Enables writing to temporary files. When set to true, aggregation stages
106-
* can write data to the _tmp subdirectory in the dbPath directory. The
107-
* default is false.
108-
*
109-
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
110-
*/
111-
'allowDiskUse' => false,
112-
/**
113-
* The number of documents to return per batch.
114-
*
115-
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
116-
*/
117-
'batchSize' => 0,
118-
/**
119-
* The maximum amount of time to allow the query to run.
120-
*
121-
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
122-
*/
123-
'maxTimeMS' => 0,
124-
/**
125-
* Indicates if the results should be provided as a cursor.
126-
*
127-
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
128-
*/
129-
'useCursor' => true,
130-
),
131-
$options
132-
);
133-
}
134-
135-
$options = $this->_massageAggregateOptions($options);
136-
$command = new Command(array(
137-
'aggregate' => $this->collname,
138-
'pipeline' => $pipeline,
139-
) + $options);
140-
$cursor = $server->executeCommand($this->dbname, $command);
141-
142-
if ( ! empty($options["cursor"])) {
143-
return $cursor;
144-
}
145-
146-
$doc = current($cursor->toArray());
147-
148-
if ($doc["ok"]) {
149-
return new \ArrayIterator(array_map(
150-
function (\stdClass $document) { return (array) $document; },
151-
$doc["result"]
152-
));
153-
}
154-
155-
throw $this->_generateCommandException($doc);
103+
return $operation->execute($server);
156104
}
157105

158106
/**
@@ -1170,22 +1118,6 @@ final protected function _generateCommandException($doc)
11701118
return new RuntimeException("FIXME: Unknown error");
11711119
}
11721120

1173-
/**
1174-
* Internal helper for massaging aggregate options
1175-
* @internal
1176-
*/
1177-
protected function _massageAggregateOptions($options)
1178-
{
1179-
if ( ! empty($options["useCursor"])) {
1180-
$options["cursor"] = isset($options["batchSize"])
1181-
? array("batchSize" => (integer) $options["batchSize"])
1182-
: new stdClass;
1183-
}
1184-
unset($options["useCursor"], $options["batchSize"]);
1185-
1186-
return $options;
1187-
}
1188-
11891121
/**
11901122
* Internal helper for massaging findandmodify options
11911123
* @internal

src/Operation/Aggregate.php

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
<?php
2+
3+
namespace MongoDB\Operation;
4+
5+
use MongoDB\FeatureDetection;
6+
use MongoDB\Driver\Command;
7+
use MongoDB\Driver\Server;
8+
use MongoDB\Exception\InvalidArgumentException;
9+
use MongoDB\Exception\InvalidArgumentTypeException;
10+
use MongoDB\Exception\RuntimeException;
11+
use ArrayIterator;
12+
use stdClass;
13+
use Traversable;
14+
15+
/**
16+
* Operation for the aggregate command.
17+
*
18+
* @api
19+
* @see MongoDB\Collection::aggregate()
20+
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
21+
*/
22+
class Aggregate implements Executable
23+
{
24+
const API_AGGREGATE_CURSOR = 2;
25+
26+
private $databaseName;
27+
private $collectionName;
28+
private $pipeline;
29+
private $options;
30+
31+
/**
32+
* Constructs an aggregate command.
33+
*
34+
* Supported options:
35+
*
36+
* * allowDiskUse (boolean): Enables writing to temporary files. When set
37+
* to true, aggregation stages can write data to the _tmp sub-directory
38+
* in the dbPath directory. The default is false.
39+
*
40+
* * batchSize (integer): The number of documents to return per batch.
41+
*
42+
* * maxTimeMS (integer): The maximum amount of time to allow the query to
43+
* run.
44+
*
45+
* * useCursor (boolean): Indicates whether the command will request that
46+
* the server provide results using a cursor. The default is true.
47+
*
48+
* For servers < 2.6, this option is ignored as aggregation cursors are
49+
* not available.
50+
*
51+
* For servers >= 2.6, this option allows users to turn off cursors if
52+
* necessary to aid in mongod/mongos upgrades.
53+
*
54+
* @param string $databaseName Database name
55+
* @param string $collectionName Collection name
56+
* @param array $pipeline List of pipeline operations
57+
* @param array $options Command options
58+
* @throws InvalidArgumentException
59+
*/
60+
public function __construct($databaseName, $collectionName, array $pipeline, array $options = array())
61+
{
62+
$options += array(
63+
'allowDiskUse' => false,
64+
'useCursor' => true,
65+
);
66+
67+
if ($options['allowDiskUse'] && ! is_bool($options['allowDiskUse'])) {
68+
throw new InvalidArgumentTypeException('allowDiskUse option', $options['allowDiskUse'], 'boolean');
69+
}
70+
71+
if (isset($options['batchSize']) && ! is_integer($options['batchSize'])) {
72+
throw new InvalidArgumentTypeException('batchSize option', $options['batchSize'], 'integer');
73+
}
74+
75+
if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) {
76+
throw new InvalidArgumentTypeException('maxTimeMS option', $options['maxTimeMS'], 'integer');
77+
}
78+
79+
if ( ! is_bool($options['useCursor'])) {
80+
throw new InvalidArgumentTypeException('useCursor option', $options['useCursor'], 'boolean');
81+
}
82+
83+
if (isset($options['batchSize']) && ! $options['useCursor']) {
84+
throw new InvalidArgumentException('batchSize option should not be used if useCursor is false');
85+
}
86+
87+
$expectedIndex = 0;
88+
89+
foreach ($pipeline as $i => $op) {
90+
if ($i !== $expectedIndex) {
91+
throw new InvalidArgumentException(sprintf('$pipeline is not a list (unexpected index: "%s")', $i));
92+
}
93+
94+
if ( ! is_array($op) && ! is_object($op)) {
95+
throw new InvalidArgumentTypeException(sprintf('$pipeline[%d]', $i), $op, 'array or object');
96+
}
97+
98+
$expectedIndex += 1;
99+
}
100+
101+
$this->databaseName = (string) $databaseName;
102+
$this->collectionName = (string) $collectionName;
103+
$this->pipeline = $pipeline;
104+
$this->options = $options;
105+
}
106+
107+
/**
108+
* Execute the operation.
109+
*
110+
* @see Executable::execute()
111+
* @param Server $server
112+
* @return Traversable
113+
*/
114+
public function execute(Server $server)
115+
{
116+
$command = $this->createCommand($server);
117+
$cursor = $server->executeCommand($this->databaseName, $command);
118+
119+
if ($this->options['useCursor']) {
120+
return $cursor;
121+
}
122+
123+
$result = current($cursor->toArray());
124+
125+
if (empty($result['ok'])) {
126+
throw new RuntimeException(isset($result['errmsg']) ? $result['errmsg'] : 'Unknown error');
127+
}
128+
129+
return new ArrayIterator(array_map(
130+
function (stdClass $document) { return (array) $document; },
131+
$result['result']
132+
));
133+
}
134+
135+
/**
136+
* Create the aggregate command.
137+
*
138+
* @param Server $server
139+
* @return Command
140+
*/
141+
private function createCommand(Server $server)
142+
{
143+
$cmd = array(
144+
'aggregate' => $this->collectionName,
145+
'pipeline' => $this->pipeline,
146+
);
147+
148+
// Servers < 2.6 do not support any command options
149+
if ( ! FeatureDetection::isSupported($server, self::API_AGGREGATE_CURSOR)) {
150+
return new Command($cmd);
151+
}
152+
153+
$cmd += $this->options;
154+
155+
if ($cmd['useCursor']) {
156+
$cmd['cursor'] = isset($cmd["batchSize"])
157+
? array('batchSize' => $cmd["batchSize"])
158+
: new stdClass;
159+
}
160+
161+
unset($cmd['useCursor'], $cmd['batchSize']);
162+
163+
return new Command($cmd);
164+
}
165+
}

0 commit comments

Comments
 (0)