Skip to content

Commit f9e7627

Browse files
committed
wip Extract Collection::aggregate() to an operation class
1 parent 975db3d commit f9e7627

File tree

3 files changed

+177
-71
lines changed

3 files changed

+177
-71
lines changed

src/Collection.php

Lines changed: 3 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use MongoDB\Model\IndexInfoIterator;
1616
use MongoDB\Model\IndexInfoIteratorIterator;
1717
use MongoDB\Model\IndexInput;
18+
use MongoDB\Operation\Aggregate;
1819

1920
class Collection
2021
{
@@ -97,62 +98,9 @@ public function aggregate(array $pipeline, array $options = array())
9798
{
9899
$readPreference = new ReadPreference(ReadPreference::RP_PRIMARY);
99100
$server = $this->manager->selectServer($readPreference);
101+
$operation = new Aggregate($this->collname, $pipeline, $options);
100102

101-
if (FeatureDetection::isSupported($server, FeatureDetection::API_AGGREGATE_CURSOR)) {
102-
$options = array_merge(
103-
array(
104-
/**
105-
* Enables writing to temporary files. When set to true, aggregation stages
106-
* can write data to the _tmp subdirectory in the dbPath directory. The
107-
* default is false.
108-
*
109-
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
110-
*/
111-
'allowDiskUse' => false,
112-
/**
113-
* The number of documents to return per batch.
114-
*
115-
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
116-
*/
117-
'batchSize' => 0,
118-
/**
119-
* The maximum amount of time to allow the query to run.
120-
*
121-
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
122-
*/
123-
'maxTimeMS' => 0,
124-
/**
125-
* Indicates if the results should be provided as a cursor.
126-
*
127-
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
128-
*/
129-
'useCursor' => true,
130-
),
131-
$options
132-
);
133-
}
134-
135-
$options = $this->_massageAggregateOptions($options);
136-
$command = new Command(array(
137-
'aggregate' => $this->collname,
138-
'pipeline' => $pipeline,
139-
) + $options);
140-
$cursor = $server->executeCommand($this->dbname, $command);
141-
142-
if ( ! empty($options["cursor"])) {
143-
return $cursor;
144-
}
145-
146-
$doc = current($cursor->toArray());
147-
148-
if ($doc["ok"]) {
149-
return new \ArrayIterator(array_map(
150-
function (\stdClass $document) { return (array) $document; },
151-
$doc["result"]
152-
));
153-
}
154-
155-
throw $this->_generateCommandException($doc);
103+
return $operation->execute($server, $this->dbname);
156104
}
157105

158106
/**
@@ -1170,22 +1118,6 @@ final protected function _generateCommandException($doc)
11701118
return new RuntimeException("FIXME: Unknown error");
11711119
}
11721120

1173-
/**
1174-
* Internal helper for massaging aggregate options
1175-
* @internal
1176-
*/
1177-
protected function _massageAggregateOptions($options)
1178-
{
1179-
if ( ! empty($options["useCursor"])) {
1180-
$options["cursor"] = isset($options["batchSize"])
1181-
? array("batchSize" => (integer) $options["batchSize"])
1182-
: new stdClass;
1183-
}
1184-
unset($options["useCursor"], $options["batchSize"]);
1185-
1186-
return $options;
1187-
}
1188-
11891121
/**
11901122
* Internal helper for massaging findandmodify options
11911123
* @internal

src/Operation/Aggregate.php

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
<?php
2+
3+
namespace MongoDB\Operation;
4+
5+
use BSON\Serializable;
6+
use MongoDB\FeatureDetection;
7+
use MongoDB\Driver\Command;
8+
use MongoDB\Driver\Server;
9+
use MongoDB\Exception\InvalidArgumentException;
10+
use MongoDB\Exception\InvalidArgumentTypeException;
11+
use MongoDB\Exception\RuntimeException;
12+
use ArrayIterator;
13+
use stdClass;
14+
15+
/**
16+
* Operation for the aggregate command.
17+
*
18+
* @api
19+
* @see MongoDB\Collection::aggregate()
20+
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
21+
*/
22+
class Aggregate implements Executable
23+
{
24+
const API_AGGREGATE_CURSOR = 2;
25+
26+
private $collectionName;
27+
private $pipeline;
28+
private $options;
29+
30+
/**
31+
* Constructs an aggregate command.
32+
*
33+
* Supported options:
34+
*
35+
* * allowDiskUse (boolean): Enables writing to temporary files. When set
36+
* to true, aggregation stages can write data to the _tmp sub-directory
37+
* in the dbPath directory. The default is false.
38+
*
39+
* * batchSize (integer): The number of documents to return per batch.
40+
*
41+
* * maxTimeMS (integer): The maximum amount of time to allow the query to
42+
* run.
43+
*
44+
* * useCursor (boolean): Indicates whether the command will request that
45+
* the server provide results using a cursor. The default is true.
46+
*
47+
* For servers < 2.6, this option is ignored as aggregation cursors are
48+
* not available.
49+
*
50+
* For servers >= 2.6, this option allows users to turn off cursors if
51+
* necessary to aid in mongod/mongos upgrades.
52+
*
53+
* @param string $collectionName
54+
* @param array $pipeline List of pipeline operations
55+
* @param array $options Command options
56+
* @throws InvalidArgumentException
57+
*/
58+
public function __construct($collectionName, array $pipeline, array $options = array())
59+
{
60+
$options += array(
61+
'allowDiskUse' => false,
62+
'useCursor' => true,
63+
);
64+
65+
if ($options['allowDiskUse'] && ! is_bool($options['allowDiskUse'])) {
66+
throw new InvalidArgumentTypeException('allowDiskUse option', $options['allowDiskUse'], 'boolean');
67+
}
68+
69+
if (isset($options['batchSize']) && ! is_integer($options['batchSize'])) {
70+
throw new InvalidArgumentTypeException('batchSize option', $options['batchSize'], 'integer');
71+
}
72+
73+
if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) {
74+
throw new InvalidArgumentTypeException('maxTimeMS option', $options['maxTimeMS'], 'integer');
75+
}
76+
77+
if ( ! is_bool($options['useCursor'])) {
78+
throw new InvalidArgumentTypeException('useCursor option', $options['useCursor'], 'boolean');
79+
}
80+
81+
if (isset($options['batchSize']) && ! $options['useCursor']) {
82+
throw new InvalidArgumentException('batchSize option should not be used if useCursor is false');
83+
}
84+
85+
$expectedIndex = 0;
86+
87+
foreach ($pipeline as $i => $op) {
88+
if ($i !== $expectedIndex) {
89+
throw new InvalidArgumentException(sprintf('$pipeline is not a list (unexpected index: "%s")', $i));
90+
}
91+
92+
if ( ! is_array($op) && ! is_object($op)) {
93+
throw new InvalidArgumentTypeException(sprintf('$pipeline[%d]', $i), $op, 'array or object');
94+
}
95+
96+
$expectedIndex += 1;
97+
}
98+
99+
$this->collectionName = $collectionName;
100+
$this->pipeline = $pipeline;
101+
$this->options = $options;
102+
}
103+
104+
/**
105+
* Execute the operation.
106+
*
107+
* @see MongoDB\Collection::createIndexes()
108+
* @see http://php.net/bson-serializable.bsonserialize
109+
*/
110+
public function execute(Server $server, $databaseName)
111+
{
112+
$command = $this->createCommand($server);
113+
$cursor = $server->executeCommand($databaseName, $command);
114+
115+
if ($this->options['useCursor']) {
116+
return $cursor;
117+
}
118+
119+
$result = current($cursor->toArray());
120+
121+
if (empty($result['ok'])) {
122+
throw new RuntimeException(isset($result['errmsg']) ? $result['errmsg'] : 'Unknown error');
123+
}
124+
125+
return new ArrayIterator(array_map(
126+
function (\stdClass $document) { return (array) $document; },
127+
$result['result']
128+
));
129+
}
130+
131+
/**
132+
* @param boolean $isCursorSupported
133+
* @return Command
134+
*/
135+
private createCommand(Server $server)
136+
{
137+
$cmd = array(
138+
'aggregate' => $this->collectionName,
139+
'pipeline' => $this->pipeline,
140+
);
141+
142+
// Servers < 2.6 do not support any command options
143+
if ( ! FeatureDetection::isSupported($server, self::API_AGGREGATE_CURSOR)) {
144+
return new Command($cmd);
145+
}
146+
147+
$cmd += $this->options;
148+
149+
if ($cmd['useCursor']) {
150+
$cmd['cursor'] = isset($cmd["batchSize"])
151+
? array('batchSize' => $cmd["batchSize"])
152+
: new stdClass;
153+
}
154+
155+
unset($cmd['useCursor'], $cmd['batchSize']);
156+
157+
return new Command($cmd);
158+
}
159+
}

src/Operation/Executable.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?php
2+
3+
namespace MongoDB\Operation;
4+
5+
use MongoDB\Driver\Server;
6+
7+
/**
8+
* Executable interface for command classes.
9+
*
10+
* @api
11+
*/
12+
interface Executable
13+
{
14+
public function execute(Server $server, $databaseName);
15+
}

0 commit comments

Comments
 (0)