Skip to content

Commit 7b084d3

Browse files
committed
Merge pull request #729
* phplib-537: PHPLIB-537: Skip change stream tests on single node clusters PHPLIB-537: Allow more iterations for change stream spec tests on sharded clusters PHPLIB-537: "Cursor not found" error is no longer resumable PHPLIB-537: Use whitelist to check if a change stream is resumable
2 parents 9fa37e7 + b26a7a3 commit 7b084d3

10 files changed

+3437
-773
lines changed

src/ChangeStream.php

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,31 @@ class ChangeStream implements Iterator
4242
*/
4343
const CURSOR_NOT_FOUND = 43;
4444

45-
/** @var array */
46-
private static $nonResumableErrorCodes = [
47-
136, // CappedPositionLost
48-
237, // CursorKilled
49-
11601, // Interrupted
45+
/** @var int[] */
46+
private static $resumableErrorCodes = [
47+
6, // HostUnreachable
48+
7, // HostNotFound
49+
89, // NetworkTimeout
50+
91, // ShutdownInProgress
51+
189, // PrimarySteppedDown
52+
262, // ExceededTimeLimit
53+
9001, // SocketException
54+
10107, // NotMaster
55+
11600, // InterruptedAtShutdown
56+
11602, // InterruptedDueToReplStateChange
57+
13435, // NotMasterNoSlaveOk
58+
13436, // NotMasterOrSecondary
59+
63, // StaleShardVersion
60+
150, // StaleEpoch
61+
13388, // StaleConfig
62+
234, // RetryChangeStream
63+
133, // FailedToSatisfyReadPreference
64+
216, // ElectionInProgress
5065
];
5166

67+
/** @var int */
68+
private static $wireVersionForResumableChangeStreamError = 9;
69+
5270
/** @var callable */
5371
private $resumeCallable;
5472

@@ -180,15 +198,11 @@ private function isResumableError(RuntimeException $exception)
180198
return false;
181199
}
182200

183-
if ($exception->hasErrorLabel('NonResumableChangeStreamError')) {
184-
return false;
185-
}
186-
187-
if (in_array($exception->getCode(), self::$nonResumableErrorCodes)) {
188-
return false;
201+
if (server_supports_feature($this->iterator->getServer(), self::$wireVersionForResumableChangeStreamError)) {
202+
return $exception->hasErrorLabel('ResumableChangeStreamError');
189203
}
190204

191-
return true;
205+
return in_array($exception->getCode(), self::$resumableErrorCodes);
192206
}
193207

194208
/**

src/Model/ChangeStreamIterator.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
use MongoDB\Driver\Monitoring\CommandStartedEvent;
2525
use MongoDB\Driver\Monitoring\CommandSubscriber;
2626
use MongoDB\Driver\Monitoring\CommandSucceededEvent;
27+
use MongoDB\Driver\Server;
2728
use MongoDB\Exception\InvalidArgumentException;
2829
use MongoDB\Exception\ResumeTokenException;
2930
use MongoDB\Exception\UnexpectedValueException;
@@ -63,6 +64,9 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
6364
/** @var array|object|null */
6465
private $resumeToken;
6566

67+
/** @var Server */
68+
private $server;
69+
6670
/**
6771
* @internal
6872
* @param Cursor $cursor
@@ -90,6 +94,7 @@ public function __construct(Cursor $cursor, $firstBatchSize, $initialResumeToken
9094
$this->isRewindNop = ($firstBatchSize === 0);
9195
$this->postBatchResumeToken = $postBatchResumeToken;
9296
$this->resumeToken = $initialResumeToken;
97+
$this->server = $cursor->getServer();
9398
}
9499

95100
/** @internal */
@@ -152,6 +157,14 @@ public function getResumeToken()
152157
return $this->resumeToken;
153158
}
154159

160+
/**
161+
* Returns the server the cursor is running on.
162+
*/
163+
public function getServer() : Server
164+
{
165+
return $this->server;
166+
}
167+
155168
/**
156169
* @see https://php.net/iteratoriterator.key
157170
* @return mixed

tests/Operation/WatchFunctionalTest.php

Lines changed: 25 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
use MongoDB\Driver\ReadPreference;
1717
use MongoDB\Driver\WriteConcern;
1818
use MongoDB\Exception\ResumeTokenException;
19-
use MongoDB\Operation\DatabaseCommand;
2019
use MongoDB\Operation\InsertOne;
2120
use MongoDB\Operation\Watch;
2221
use MongoDB\Tests\CommandObserver;
@@ -155,49 +154,6 @@ function (array $event) use (&$lastEvent) {
155154
$this->assertSameDocument($postBatchResumeToken, $changeStream->getResumeToken());
156155
}
157156

158-
/**
159-
* Prose test 10: "ChangeStream will resume after a killCursors command is
160-
* issued for its child cursor."
161-
*/
162-
public function testNextResumesAfterCursorNotFound()
163-
{
164-
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
165-
$changeStream = $operation->execute($this->getPrimaryServer());
166-
167-
$changeStream->rewind();
168-
$this->assertFalse($changeStream->valid());
169-
170-
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
171-
172-
$this->advanceCursorUntilValid($changeStream);
173-
174-
$expectedResult = [
175-
'_id' => $changeStream->current()->_id,
176-
'operationType' => 'insert',
177-
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
178-
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
179-
'documentKey' => ['_id' => 1],
180-
];
181-
182-
$this->assertMatchesDocument($expectedResult, $changeStream->current());
183-
184-
$this->killChangeStreamCursor($changeStream);
185-
186-
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
187-
188-
$this->advanceCursorUntilValid($changeStream);
189-
190-
$expectedResult = [
191-
'_id' => $changeStream->current()->_id,
192-
'operationType' => 'insert',
193-
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
194-
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
195-
'documentKey' => ['_id' => 2],
196-
];
197-
198-
$this->assertMatchesDocument($expectedResult, $changeStream->current());
199-
}
200-
201157
public function testNextResumesAfterConnectionException()
202158
{
203159
/* In order to trigger a dropped connection, we'll use a new client with
@@ -267,7 +223,7 @@ function (array $event) use (&$events) {
267223
$postBatchResumeToken = $this->getPostBatchResumeTokenFromReply($events[0]['succeeded']->getReply());
268224

269225
$this->assertFalse($changeStream->valid());
270-
$this->killChangeStreamCursor($changeStream);
226+
$this->forceChangeStreamResume();
271227

272228
$this->assertNoCommandExecuted(function () use ($changeStream) {
273229
$changeStream->rewind();
@@ -346,7 +302,7 @@ function (array $event) use (&$events) {
346302
$this->assertInstanceOf(TimestampInterface::class, $operationTime);
347303

348304
$this->assertFalse($changeStream->valid());
349-
$this->killChangeStreamCursor($changeStream);
305+
$this->forceChangeStreamResume();
350306

351307
$this->assertNoCommandExecuted(function () use ($changeStream) {
352308
$changeStream->rewind();
@@ -497,7 +453,7 @@ public function testNoChangeAfterResumeBeforeInsert()
497453

498454
$this->assertMatchesDocument($expectedResult, $changeStream->current());
499455

500-
$this->killChangeStreamCursor($changeStream);
456+
$this->forceChangeStreamResume();
501457

502458
$changeStream->next();
503459
$this->assertFalse($changeStream->valid());
@@ -524,10 +480,10 @@ public function testResumeMultipleTimesInSuccession()
524480
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
525481
$changeStream = $operation->execute($this->getPrimaryServer());
526482

527-
/* Killing the cursor when there are no results will test that neither
483+
/* Forcing a resume when there are no results will test that neither
528484
* the initial rewind() nor a resume attempt via next() increment the
529485
* key. */
530-
$this->killChangeStreamCursor($changeStream);
486+
$this->forceChangeStreamResume();
531487

532488
$this->assertNoCommandExecuted(function () use ($changeStream) {
533489
$changeStream->rewind();
@@ -542,7 +498,7 @@ public function testResumeMultipleTimesInSuccession()
542498
$this->assertNull($changeStream->current());
543499

544500
// A consecutive resume attempt should still not increment the key
545-
$this->killChangeStreamCursor($changeStream);
501+
$this->forceChangeStreamResume();
546502

547503
$changeStream->next();
548504
$this->assertFalse($changeStream->valid());
@@ -568,10 +524,10 @@ public function testResumeMultipleTimesInSuccession()
568524

569525
$this->assertMatchesDocument($expectedResult, $changeStream->current());
570526

571-
/* Insert another document and kill the cursor. ChangeStream::next()
527+
/* Insert another document and force a resume. ChangeStream::next()
572528
* should resume and pick up the last insert. */
573529
$this->insertDocument(['_id' => 2]);
574-
$this->killChangeStreamCursor($changeStream);
530+
$this->forceChangeStreamResume();
575531

576532
$changeStream->next();
577533
$this->assertTrue($changeStream->valid());
@@ -595,7 +551,7 @@ public function testResumeMultipleTimesInSuccession()
595551
*
596552
* Note: PHPLIB-448 may require rewind() to throw an exception here. */
597553
$this->insertDocument(['_id' => 3]);
598-
$this->killChangeStreamCursor($changeStream);
554+
$this->forceChangeStreamResume();
599555

600556
$this->assertNoCommandExecuted(function () use ($changeStream) {
601557
$changeStream->rewind();
@@ -621,7 +577,7 @@ public function testResumeMultipleTimesInSuccession()
621577

622578
// Test one final, consecutive resume via ChangeStream::next()
623579
$this->insertDocument(['_id' => 4]);
624-
$this->killChangeStreamCursor($changeStream);
580+
$this->forceChangeStreamResume();
625581

626582
$changeStream->next();
627583
$this->assertTrue($changeStream->valid());
@@ -665,7 +621,7 @@ public function testKey()
665621
$this->assertFalse($changeStream->valid());
666622
$this->assertNull($changeStream->key());
667623

668-
$this->killChangeStreamCursor($changeStream);
624+
$this->forceChangeStreamResume();
669625

670626
$changeStream->next();
671627
$this->assertFalse($changeStream->valid());
@@ -731,41 +687,6 @@ public function testInitialCursorIsNotClosed()
731687
$this->assertFalse($cursor->isDead());
732688
}
733689

734-
/**
735-
* Prose test 5: "ChangeStream will not attempt to resume after encountering
736-
* error code 11601 (Interrupted), 136 (CappedPositionLost), or 237
737-
* (CursorKilled) while executing a getMore command."
738-
*
739-
* @dataProvider provideNonResumableErrorCodes
740-
*/
741-
public function testNonResumableErrorCodes($errorCode)
742-
{
743-
$this->configureFailPoint([
744-
'configureFailPoint' => 'failCommand',
745-
'mode' => ['times' => 1],
746-
'data' => ['failCommands' => ['getMore'], 'errorCode' => $errorCode],
747-
]);
748-
749-
$this->insertDocument(['x' => 1]);
750-
751-
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), []);
752-
$changeStream = $operation->execute($this->getPrimaryServer());
753-
$changeStream->rewind();
754-
755-
$this->expectException(ServerException::class);
756-
$this->expectExceptionCode($errorCode);
757-
$changeStream->next();
758-
}
759-
760-
public function provideNonResumableErrorCodes()
761-
{
762-
return [
763-
'CappedPositionLost' => [136],
764-
'CursorKilled' => [237],
765-
'Interrupted' => [11601],
766-
];
767-
}
768-
769690
/**
770691
* Prose test 2: "ChangeStream will throw an exception if the server
771692
* response is missing the resume token (if wire version is < 8, this is a
@@ -979,7 +900,7 @@ public function testRewindExtractsResumeTokenAndNextResumes()
979900
];
980901
$this->assertMatchesDocument($expectedResult, $changeStream->current());
981902

982-
$this->killChangeStreamCursor($changeStream);
903+
$this->forceChangeStreamResume();
983904

984905
$this->advanceCursorUntilValid($changeStream);
985906
$this->assertSame(1, $changeStream->key());
@@ -1218,7 +1139,7 @@ function (array $event) use (&$originalSession) {
12181139
);
12191140

12201141
$changeStream->rewind();
1221-
$this->killChangeStreamCursor($changeStream);
1142+
$this->forceChangeStreamResume();
12221143

12231144
(new CommandObserver())->observe(
12241145
function () use (&$changeStream) {
@@ -1402,7 +1323,7 @@ public function testOriginalReadPreferenceIsPreservedOnResume()
14021323

14031324
$changeStream = $operation->execute($secondary);
14041325
$previousCursorId = $changeStream->getCursorId();
1405-
$this->killChangeStreamCursor($changeStream);
1326+
$this->forceChangeStreamResume();
14061327

14071328
$changeStream->next();
14081329
$this->assertNotSame($previousCursorId, $changeStream->getCursorId());
@@ -1543,7 +1464,7 @@ public function testResumingChangeStreamWithoutPreviousResultsIncludesStartAfter
15431464
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
15441465
$changeStream = $operation->execute($this->getPrimaryServer());
15451466
$changeStream->rewind();
1546-
$this->killChangeStreamCursor($changeStream);
1467+
$this->forceChangeStreamResume();
15471468

15481469
$aggregateCommand = null;
15491470

@@ -1594,7 +1515,7 @@ public function testResumingChangeStreamWithPreviousResultsIncludesResumeAfterOp
15941515
$this->advanceCursorUntilValid($changeStream);
15951516
$this->assertTrue($changeStream->valid());
15961517

1597-
$this->killChangeStreamCursor($changeStream);
1518+
$this->forceChangeStreamResume();
15981519

15991520
$aggregateCommand = null;
16001521

@@ -1630,6 +1551,15 @@ function (array $event) use (&$commands) {
16301551
$this->assertEmpty($commands);
16311552
}
16321553

1554+
private function forceChangeStreamResume(array $commands = ['getMore'], int $errorCode = self::NOT_MASTER)
1555+
{
1556+
$this->configureFailPoint([
1557+
'configureFailPoint' => 'failCommand',
1558+
'mode' => ['times' => 1],
1559+
'data' => ['failCommands' => $commands, 'errorCode' => $errorCode],
1560+
]);
1561+
}
1562+
16331563
private function getPostBatchResumeTokenFromReply(stdClass $reply)
16341564
{
16351565
$this->assertObjectHasAttribute('cursor', $reply);
@@ -1662,17 +1592,6 @@ private function isStartAtOperationTimeSupported()
16621592
return server_supports_feature($this->getPrimaryServer(), self::$wireVersionForStartAtOperationTime);
16631593
}
16641594

1665-
private function killChangeStreamCursor(ChangeStream $changeStream)
1666-
{
1667-
$command = [
1668-
'killCursors' => $this->getCollectionName(),
1669-
'cursors' => [ $changeStream->getCursorId() ],
1670-
];
1671-
1672-
$operation = new DatabaseCommand($this->getDatabaseName(), $command);
1673-
$operation->execute($this->getPrimaryServer());
1674-
}
1675-
16761595
private function advanceCursorUntilValid(Iterator $iterator, $limitOnShardedClusters = 5)
16771596
{
16781597
if (! $this->isShardedCluster()) {

0 commit comments

Comments
 (0)