Skip to content

Commit 4c2ed40

Browse files
committed
PHPLIB-109: Extract BulkWrite operation class
1 parent 1cdf495 commit 4c2ed40

File tree

10 files changed

+604
-167
lines changed

10 files changed

+604
-167
lines changed

src/Collection.php

Lines changed: 13 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
namespace MongoDB;
44

5-
use MongoDB\Driver\BulkWrite;
65
use MongoDB\Driver\Command;
76
use MongoDB\Driver\Cursor;
87
use MongoDB\Driver\Manager;
@@ -14,6 +13,7 @@
1413
use MongoDB\Model\IndexInfoIterator;
1514
use MongoDB\Model\IndexInput;
1615
use MongoDB\Operation\Aggregate;
16+
use MongoDB\Operation\BulkWrite;
1717
use MongoDB\Operation\CreateIndexes;
1818
use MongoDB\Operation\Count;
1919
use MongoDB\Operation\DeleteMany;
@@ -101,135 +101,23 @@ public function aggregate(array $pipeline, array $options = array())
101101
}
102102

103103
/**
104-
* Adds a full set of write operations into a bulk and executes it
105-
*
106-
* The syntax of the $bulk array is:
107-
* $bulk = [
108-
* [
109-
* 'METHOD' => [
110-
* $document,
111-
* $extraArgument1,
112-
* $extraArgument2,
113-
* ],
114-
* ],
115-
* [
116-
* 'METHOD' => [
117-
* $document,
118-
* $extraArgument1,
119-
* $extraArgument2,
120-
* ],
121-
* ],
122-
* ]
123-
*
124-
*
125-
* Where METHOD is one of
126-
* - 'insertOne'
127-
* Supports no $extraArgument
128-
* - 'updateMany'
129-
* Requires $extraArgument1, same as $update for Collection::updateMany()
130-
* Optional $extraArgument2, same as $options for Collection::updateMany()
131-
* - 'updateOne'
132-
* Requires $extraArgument1, same as $update for Collection::updateOne()
133-
* Optional $extraArgument2, same as $options for Collection::updateOne()
134-
* - 'replaceOne'
135-
* Requires $extraArgument1, same as $update for Collection::replaceOne()
136-
* Optional $extraArgument2, same as $options for Collection::replaceOne()
137-
* - 'deleteOne'
138-
* Supports no $extraArgument
139-
* - 'deleteMany'
140-
* Supports no $extraArgument
141-
*
142-
* @example Collection-bulkWrite.php Using Collection::bulkWrite()
143-
*
144-
* @see Collection::getBulkOptions() for supported $options
145-
*
146-
* @param array $ops Array of operations
147-
* @param array $options Additional options
148-
* @return WriteResult
104+
* Executes multiple write operations.
105+
*
106+
* @see BulkWrite::__construct() for supported options
107+
* @param array[] $operations List of write operations
108+
* @param array $options Command options
109+
* @return BulkWriteResult
149110
*/
150-
public function bulkWrite(array $ops, array $options = array())
111+
public function bulkWrite(array $operations, array $options = array())
151112
{
152-
$options = array_merge($this->getBulkOptions(), $options);
153-
154-
$bulk = new BulkWrite($options["ordered"]);
155-
$insertedIds = array();
156-
157-
foreach ($ops as $n => $op) {
158-
foreach ($op as $opname => $args) {
159-
if (!isset($args[0])) {
160-
throw new InvalidArgumentException(sprintf("Missing argument#1 for '%s' (operation#%d)", $opname, $n));
161-
}
162-
163-
switch ($opname) {
164-
case "insertOne":
165-
$insertedId = $bulk->insert($args[0]);
166-
167-
if ($insertedId !== null) {
168-
$insertedIds[$n] = $insertedId;
169-
} else {
170-
$insertedIds[$n] = is_array($args[0]) ? $args[0]['_id'] : $args[0]->_id;
171-
}
172-
173-
break;
174-
175-
case "updateMany":
176-
if (!isset($args[1])) {
177-
throw new InvalidArgumentException(sprintf("Missing argument#2 for '%s' (operation#%d)", $opname, $n));
178-
}
179-
$options = array_merge($this->getWriteOptions(), isset($args[2]) ? $args[2] : array(), array("multi" => true));
180-
$firstKey = key($args[1]);
181-
if (!isset($firstKey[0]) || $firstKey[0] != '$') {
182-
throw new InvalidArgumentException("First key in \$update must be a \$operator");
183-
}
184-
185-
$bulk->update($args[0], $args[1], $options);
186-
break;
187-
188-
case "updateOne":
189-
if (!isset($args[1])) {
190-
throw new InvalidArgumentException(sprintf("Missing argument#2 for '%s' (operation#%d)", $opname, $n));
191-
}
192-
$options = array_merge($this->getWriteOptions(), isset($args[2]) ? $args[2] : array(), array("multi" => false));
193-
$firstKey = key($args[1]);
194-
if (!isset($firstKey[0]) || $firstKey[0] != '$') {
195-
throw new InvalidArgumentException("First key in \$update must be a \$operator");
196-
}
197-
198-
$bulk->update($args[0], $args[1], $options);
199-
break;
200-
201-
case "replaceOne":
202-
if (!isset($args[1])) {
203-
throw new InvalidArgumentException(sprintf("Missing argument#2 for '%s' (operation#%d)", $opname, $n));
204-
}
205-
$options = array_merge($this->getWriteOptions(), isset($args[2]) ? $args[2] : array(), array("multi" => false));
206-
$firstKey = key($args[1]);
207-
if (isset($firstKey[0]) && $firstKey[0] == '$') {
208-
throw new InvalidArgumentException("First key in \$update must NOT be a \$operator");
209-
}
210-
211-
$bulk->update($args[0], $args[1], $options);
212-
break;
213-
214-
case "deleteOne":
215-
$options = array_merge($this->getWriteOptions(), isset($args[1]) ? $args[1] : array(), array("limit" => 1));
216-
$bulk->delete($args[0], $options);
217-
break;
218-
219-
case "deleteMany":
220-
$options = array_merge($this->getWriteOptions(), isset($args[1]) ? $args[1] : array(), array("limit" => 0));
221-
$bulk->delete($args[0], $options);
222-
break;
223-
224-
default:
225-
throw new InvalidArgumentException(sprintf("Unknown operation type called '%s' (operation#%d)", $opname, $n));
226-
}
227-
}
113+
if ( ! isset($options['writeConcern']) && isset($this->wc)) {
114+
$options['writeConcern'] = $this->wc;
228115
}
229116

230-
$writeResult = $this->manager->executeBulkWrite($this->ns, $bulk, $this->wc);
117+
$operation = new BulkWrite($this->dbname, $this->collname, $operations, $options);
118+
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
231119

232-
return new BulkWriteResult($writeResult, $insertedIds);
120+
return $operation->execute($server);
233121
}
234122

235123
/**
@@ -498,18 +386,6 @@ public function findOneAndUpdate($filter, $update, array $options = array())
498386
return $operation->execute($server);
499387
}
500388

501-
/**
502-
* Retrieves all Bulk Write options with their default values.
503-
*
504-
* @return array of available Bulk Write options
505-
*/
506-
public function getBulkOptions()
507-
{
508-
return array(
509-
"ordered" => false,
510-
);
511-
}
512-
513389
/**
514390
* Return the collection name.
515391
*
@@ -541,20 +417,6 @@ public function getNamespace()
541417
return $this->ns;
542418
}
543419

544-
/**
545-
* Retrieves all Write options with their default values.
546-
*
547-
* @return array of available Write options
548-
*/
549-
public function getWriteOptions()
550-
{
551-
return array(
552-
"ordered" => false,
553-
"upsert" => false,
554-
"limit" => 1,
555-
);
556-
}
557-
558420
/**
559421
* Inserts multiple documents.
560422
*

0 commit comments

Comments
 (0)