Skip to content

PHPLIB-537: Use whitelist to check if a change stream is resumable #729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions src/ChangeStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,31 @@ class ChangeStream implements Iterator
*/
const CURSOR_NOT_FOUND = 43;

/** @var array */
private static $nonResumableErrorCodes = [
136, // CappedPositionLost
237, // CursorKilled
11601, // Interrupted
/** @var int[] */
private static $resumableErrorCodes = [
6, // HostUnreachable
7, // HostNotFound
89, // NetworkTimeout
91, // ShutdownInProgress
189, // PrimarySteppedDown
262, // ExceededTimeLimit
9001, // SocketException
10107, // NotMaster
11600, // InterruptedAtShutdown
11602, // InterruptedDueToReplStateChange
13435, // NotMasterNoSlaveOk
13436, // NotMasterOrSecondary
63, // StaleShardVersion
150, // StaleEpoch
13388, // StaleConfig
234, // RetryChangeStream
133, // FailedToSatisfyReadPreference
216, // ElectionInProgress
];

/** @var int */
private static $wireVersionForResumableChangeStreamError = 9;

/** @var callable */
private $resumeCallable;

Expand Down Expand Up @@ -180,15 +198,11 @@ private function isResumableError(RuntimeException $exception)
return false;
}

if ($exception->hasErrorLabel('NonResumableChangeStreamError')) {
return false;
}

if (in_array($exception->getCode(), self::$nonResumableErrorCodes)) {
return false;
if (server_supports_feature($this->iterator->getServer(), self::$wireVersionForResumableChangeStreamError)) {
return $exception->hasErrorLabel('ResumableChangeStreamError');
}

return true;
return in_array($exception->getCode(), self::$resumableErrorCodes);
}

/**
Expand Down
13 changes: 13 additions & 0 deletions src/Model/ChangeStreamIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
use MongoDB\Driver\Monitoring\CommandStartedEvent;
use MongoDB\Driver\Monitoring\CommandSubscriber;
use MongoDB\Driver\Monitoring\CommandSucceededEvent;
use MongoDB\Driver\Server;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\ResumeTokenException;
use MongoDB\Exception\UnexpectedValueException;
Expand Down Expand Up @@ -63,6 +64,9 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
/** @var array|object|null */
private $resumeToken;

/** @var Server */
private $server;

/**
* @internal
* @param Cursor $cursor
Expand Down Expand Up @@ -90,6 +94,7 @@ public function __construct(Cursor $cursor, $firstBatchSize, $initialResumeToken
$this->isRewindNop = ($firstBatchSize === 0);
$this->postBatchResumeToken = $postBatchResumeToken;
$this->resumeToken = $initialResumeToken;
$this->server = $cursor->getServer();
}

/** @internal */
Expand Down Expand Up @@ -152,6 +157,14 @@ public function getResumeToken()
return $this->resumeToken;
}

/**
* Returns the server the cursor is running on.
*/
public function getServer() : Server
{
return $this->server;
}

/**
* @see https://php.net/iteratoriterator.key
* @return mixed
Expand Down
131 changes: 25 additions & 106 deletions tests/Operation/WatchFunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\WriteConcern;
use MongoDB\Exception\ResumeTokenException;
use MongoDB\Operation\DatabaseCommand;
use MongoDB\Operation\InsertOne;
use MongoDB\Operation\Watch;
use MongoDB\Tests\CommandObserver;
Expand Down Expand Up @@ -155,49 +154,6 @@ function (array $event) use (&$lastEvent) {
$this->assertSameDocument($postBatchResumeToken, $changeStream->getResumeToken());
}

/**
* Prose test 10: "ChangeStream will resume after a killCursors command is
* issued for its child cursor."
*/
public function testNextResumesAfterCursorNotFound()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());

$changeStream->rewind();
$this->assertFalse($changeStream->valid());

$this->insertDocument(['_id' => 1, 'x' => 'foo']);

$this->advanceCursorUntilValid($changeStream);

$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 1],
];

$this->assertMatchesDocument($expectedResult, $changeStream->current());

$this->killChangeStreamCursor($changeStream);

$this->insertDocument(['_id' => 2, 'x' => 'bar']);

$this->advanceCursorUntilValid($changeStream);

$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2],
];

$this->assertMatchesDocument($expectedResult, $changeStream->current());
}

public function testNextResumesAfterConnectionException()
{
/* In order to trigger a dropped connection, we'll use a new client with
Expand Down Expand Up @@ -267,7 +223,7 @@ function (array $event) use (&$events) {
$postBatchResumeToken = $this->getPostBatchResumeTokenFromReply($events[0]['succeeded']->getReply());

$this->assertFalse($changeStream->valid());
$this->killChangeStreamCursor($changeStream);
$this->forceChangeStreamResume();

$this->assertNoCommandExecuted(function () use ($changeStream) {
$changeStream->rewind();
Expand Down Expand Up @@ -346,7 +302,7 @@ function (array $event) use (&$events) {
$this->assertInstanceOf(TimestampInterface::class, $operationTime);

$this->assertFalse($changeStream->valid());
$this->killChangeStreamCursor($changeStream);
$this->forceChangeStreamResume();

$this->assertNoCommandExecuted(function () use ($changeStream) {
$changeStream->rewind();
Expand Down Expand Up @@ -497,7 +453,7 @@ public function testNoChangeAfterResumeBeforeInsert()

$this->assertMatchesDocument($expectedResult, $changeStream->current());

$this->killChangeStreamCursor($changeStream);
$this->forceChangeStreamResume();

$changeStream->next();
$this->assertFalse($changeStream->valid());
Expand All @@ -524,10 +480,10 @@ public function testResumeMultipleTimesInSuccession()
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());

/* Killing the cursor when there are no results will test that neither
/* Forcing a resume when there are no results will test that neither
* the initial rewind() nor a resume attempt via next() increment the
* key. */
$this->killChangeStreamCursor($changeStream);
$this->forceChangeStreamResume();

$this->assertNoCommandExecuted(function () use ($changeStream) {
$changeStream->rewind();
Expand All @@ -542,7 +498,7 @@ public function testResumeMultipleTimesInSuccession()
$this->assertNull($changeStream->current());

// A consecutive resume attempt should still not increment the key
$this->killChangeStreamCursor($changeStream);
$this->forceChangeStreamResume();

$changeStream->next();
$this->assertFalse($changeStream->valid());
Expand All @@ -568,10 +524,10 @@ public function testResumeMultipleTimesInSuccession()

$this->assertMatchesDocument($expectedResult, $changeStream->current());

/* Insert another document and kill the cursor. ChangeStream::next()
/* Insert another document and force a resume. ChangeStream::next()
* should resume and pick up the last insert. */
$this->insertDocument(['_id' => 2]);
$this->killChangeStreamCursor($changeStream);
$this->forceChangeStreamResume();

$changeStream->next();
$this->assertTrue($changeStream->valid());
Expand All @@ -595,7 +551,7 @@ public function testResumeMultipleTimesInSuccession()
*
* Note: PHPLIB-448 may require rewind() to throw an exception here. */
$this->insertDocument(['_id' => 3]);
$this->killChangeStreamCursor($changeStream);
$this->forceChangeStreamResume();

$this->assertNoCommandExecuted(function () use ($changeStream) {
$changeStream->rewind();
Expand All @@ -621,7 +577,7 @@ public function testResumeMultipleTimesInSuccession()

// Test one final, consecutive resume via ChangeStream::next()
$this->insertDocument(['_id' => 4]);
$this->killChangeStreamCursor($changeStream);
$this->forceChangeStreamResume();

$changeStream->next();
$this->assertTrue($changeStream->valid());
Expand Down Expand Up @@ -665,7 +621,7 @@ public function testKey()
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());

$this->killChangeStreamCursor($changeStream);
$this->forceChangeStreamResume();

$changeStream->next();
$this->assertFalse($changeStream->valid());
Expand Down Expand Up @@ -731,41 +687,6 @@ public function testInitialCursorIsNotClosed()
$this->assertFalse($cursor->isDead());
}

/**
* Prose test 5: "ChangeStream will not attempt to resume after encountering
* error code 11601 (Interrupted), 136 (CappedPositionLost), or 237
* (CursorKilled) while executing a getMore command."
*
* @dataProvider provideNonResumableErrorCodes
*/
public function testNonResumableErrorCodes($errorCode)
{
$this->configureFailPoint([
'configureFailPoint' => 'failCommand',
'mode' => ['times' => 1],
'data' => ['failCommands' => ['getMore'], 'errorCode' => $errorCode],
]);

$this->insertDocument(['x' => 1]);

$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), []);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();

$this->expectException(ServerException::class);
$this->expectExceptionCode($errorCode);
$changeStream->next();
}

public function provideNonResumableErrorCodes()
{
return [
'CappedPositionLost' => [136],
'CursorKilled' => [237],
'Interrupted' => [11601],
];
}

/**
* Prose test 2: "ChangeStream will throw an exception if the server
* response is missing the resume token (if wire version is < 8, this is a
Expand Down Expand Up @@ -979,7 +900,7 @@ public function testRewindExtractsResumeTokenAndNextResumes()
];
$this->assertMatchesDocument($expectedResult, $changeStream->current());

$this->killChangeStreamCursor($changeStream);
$this->forceChangeStreamResume();

$this->advanceCursorUntilValid($changeStream);
$this->assertSame(1, $changeStream->key());
Expand Down Expand Up @@ -1218,7 +1139,7 @@ function (array $event) use (&$originalSession) {
);

$changeStream->rewind();
$this->killChangeStreamCursor($changeStream);
$this->forceChangeStreamResume();

(new CommandObserver())->observe(
function () use (&$changeStream) {
Expand Down Expand Up @@ -1402,7 +1323,7 @@ public function testOriginalReadPreferenceIsPreservedOnResume()

$changeStream = $operation->execute($secondary);
$previousCursorId = $changeStream->getCursorId();
$this->killChangeStreamCursor($changeStream);
$this->forceChangeStreamResume();

$changeStream->next();
$this->assertNotSame($previousCursorId, $changeStream->getCursorId());
Expand Down Expand Up @@ -1543,7 +1464,7 @@ public function testResumingChangeStreamWithoutPreviousResultsIncludesStartAfter
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->killChangeStreamCursor($changeStream);
$this->forceChangeStreamResume();

$aggregateCommand = null;

Expand Down Expand Up @@ -1594,7 +1515,7 @@ public function testResumingChangeStreamWithPreviousResultsIncludesResumeAfterOp
$this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());

$this->killChangeStreamCursor($changeStream);
$this->forceChangeStreamResume();

$aggregateCommand = null;

Expand Down Expand Up @@ -1630,6 +1551,15 @@ function (array $event) use (&$commands) {
$this->assertEmpty($commands);
}

private function forceChangeStreamResume(array $commands = ['getMore'], int $errorCode = self::NOT_MASTER)
{
$this->configureFailPoint([
'configureFailPoint' => 'failCommand',
'mode' => ['times' => 1],
'data' => ['failCommands' => $commands, 'errorCode' => $errorCode],
]);
}

private function getPostBatchResumeTokenFromReply(stdClass $reply)
{
$this->assertObjectHasAttribute('cursor', $reply);
Expand Down Expand Up @@ -1662,17 +1592,6 @@ private function isStartAtOperationTimeSupported()
return server_supports_feature($this->getPrimaryServer(), self::$wireVersionForStartAtOperationTime);
}

private function killChangeStreamCursor(ChangeStream $changeStream)
{
$command = [
'killCursors' => $this->getCollectionName(),
'cursors' => [ $changeStream->getCursorId() ],
];

$operation = new DatabaseCommand($this->getDatabaseName(), $command);
$operation->execute($this->getPrimaryServer());
}

private function advanceCursorUntilValid(Iterator $iterator, $limitOnShardedClusters = 5)
{
if (! $this->isShardedCluster()) {
Expand Down
Loading