Skip to content

Commit 10b913f

Browse files
committed
PHPLIB-537: Use whitelist to check if a change stream is resumable
1 parent 8109cb7 commit 10b913f

File tree

8 files changed

+3590
-117
lines changed

8 files changed

+3590
-117
lines changed

src/ChangeStream.php

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,31 @@ class ChangeStream implements Iterator
4242
*/
4343
const CURSOR_NOT_FOUND = 43;
4444

45-
/** @var array */
46-
private static $nonResumableErrorCodes = [
47-
136, // CappedPositionLost
48-
237, // CursorKilled
49-
11601, // Interrupted
45+
/** @var int[] */
46+
private static $resumableErrorCodes = [
47+
6, // HostUnreachable
48+
7, // HostNotFound
49+
89, // NetworkTimeout
50+
91, // ShutdownInProgress
51+
189, // PrimarySteppedDown
52+
262, // ExceededTimeLimit
53+
9001, // SocketException
54+
10107, // NotMaster
55+
11600, // InterruptedAtShutdown
56+
11602, // InterruptedDueToReplStateChange
57+
13435, // NotMasterNoSlaveOk
58+
13436, // NotMasterOrSecondary
59+
63, // StaleShardVersion
60+
150, // StaleEpoch
61+
13388, // StaleConfig
62+
234, // RetryChangeStream
63+
133, // FailedToSatisfyReadPreference
64+
216, // ElectionInProgress
5065
];
5166

67+
/** @var int */
68+
private static $wireVersionForResumableChangeStreamError = 9;
69+
5270
/** @var callable */
5371
private $resumeCallable;
5472

@@ -180,15 +198,15 @@ private function isResumableError(RuntimeException $exception)
180198
return false;
181199
}
182200

183-
if ($exception->hasErrorLabel('NonResumableChangeStreamError')) {
184-
return false;
201+
if (server_supports_feature($this->iterator->getServer(), self::$wireVersionForResumableChangeStreamError)) {
202+
return $exception->hasErrorLabel('ResumableChangeStreamError');
185203
}
186204

187-
if (in_array($exception->getCode(), self::$nonResumableErrorCodes)) {
205+
if ($exception->hasErrorLabel('NonResumableChangeStreamError')) {
188206
return false;
189207
}
190208

191-
return true;
209+
return in_array($exception->getCode(), self::$resumableErrorCodes);
192210
}
193211

194212
/**

src/Model/ChangeStreamIterator.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
use MongoDB\Driver\Monitoring\CommandStartedEvent;
2525
use MongoDB\Driver\Monitoring\CommandSubscriber;
2626
use MongoDB\Driver\Monitoring\CommandSucceededEvent;
27+
use MongoDB\Driver\Server;
2728
use MongoDB\Exception\InvalidArgumentException;
2829
use MongoDB\Exception\ResumeTokenException;
2930
use MongoDB\Exception\UnexpectedValueException;
@@ -63,6 +64,9 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
6364
/** @var array|object|null */
6465
private $resumeToken;
6566

67+
/** @var Server */
68+
private $server;
69+
6670
/**
6771
* @internal
6872
* @param Cursor $cursor
@@ -90,6 +94,7 @@ public function __construct(Cursor $cursor, $firstBatchSize, $initialResumeToken
9094
$this->isRewindNop = ($firstBatchSize === 0);
9195
$this->postBatchResumeToken = $postBatchResumeToken;
9296
$this->resumeToken = $initialResumeToken;
97+
$this->server = $cursor->getServer();
9398
}
9499

95100
/** @internal */
@@ -152,6 +157,14 @@ public function getResumeToken()
152157
return $this->resumeToken;
153158
}
154159

160+
/**
161+
* Returns the server the cursor is running on.
162+
*/
163+
public function getServer() : Server
164+
{
165+
return $this->server;
166+
}
167+
155168
/**
156169
* @see https://php.net/iteratoriterator.key
157170
* @return mixed

tests/Operation/WatchFunctionalTest.php

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -155,49 +155,6 @@ function (array $event) use (&$lastEvent) {
155155
$this->assertSameDocument($postBatchResumeToken, $changeStream->getResumeToken());
156156
}
157157

158-
/**
159-
* Prose test 10: "ChangeStream will resume after a killCursors command is
160-
* issued for its child cursor."
161-
*/
162-
public function testNextResumesAfterCursorNotFound()
163-
{
164-
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
165-
$changeStream = $operation->execute($this->getPrimaryServer());
166-
167-
$changeStream->rewind();
168-
$this->assertFalse($changeStream->valid());
169-
170-
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
171-
172-
$this->advanceCursorUntilValid($changeStream);
173-
174-
$expectedResult = [
175-
'_id' => $changeStream->current()->_id,
176-
'operationType' => 'insert',
177-
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
178-
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
179-
'documentKey' => ['_id' => 1],
180-
];
181-
182-
$this->assertMatchesDocument($expectedResult, $changeStream->current());
183-
184-
$this->killChangeStreamCursor($changeStream);
185-
186-
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
187-
188-
$this->advanceCursorUntilValid($changeStream);
189-
190-
$expectedResult = [
191-
'_id' => $changeStream->current()->_id,
192-
'operationType' => 'insert',
193-
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
194-
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
195-
'documentKey' => ['_id' => 2],
196-
];
197-
198-
$this->assertMatchesDocument($expectedResult, $changeStream->current());
199-
}
200-
201158
public function testNextResumesAfterConnectionException()
202159
{
203160
/* In order to trigger a dropped connection, we'll use a new client with
@@ -731,41 +688,6 @@ public function testInitialCursorIsNotClosed()
731688
$this->assertFalse($cursor->isDead());
732689
}
733690

734-
/**
735-
* Prose test 5: "ChangeStream will not attempt to resume after encountering
736-
* error code 11601 (Interrupted), 136 (CappedPositionLost), or 237
737-
* (CursorKilled) while executing a getMore command."
738-
*
739-
* @dataProvider provideNonResumableErrorCodes
740-
*/
741-
public function testNonResumableErrorCodes($errorCode)
742-
{
743-
$this->configureFailPoint([
744-
'configureFailPoint' => 'failCommand',
745-
'mode' => ['times' => 1],
746-
'data' => ['failCommands' => ['getMore'], 'errorCode' => $errorCode],
747-
]);
748-
749-
$this->insertDocument(['x' => 1]);
750-
751-
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), []);
752-
$changeStream = $operation->execute($this->getPrimaryServer());
753-
$changeStream->rewind();
754-
755-
$this->expectException(ServerException::class);
756-
$this->expectExceptionCode($errorCode);
757-
$changeStream->next();
758-
}
759-
760-
public function provideNonResumableErrorCodes()
761-
{
762-
return [
763-
'CappedPositionLost' => [136],
764-
'CursorKilled' => [237],
765-
'Interrupted' => [11601],
766-
];
767-
}
768-
769691
/**
770692
* Prose test 2: "ChangeStream will throw an exception if the server
771693
* response is missing the resume token (if wire version is < 8, this is a

tests/SpecTests/ChangeStreamsSpecTest.php

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use ArrayIterator;
66
use LogicException;
7+
use MongoDB\BSON\Int64;
78
use MongoDB\ChangeStream;
89
use MongoDB\Driver\Exception\Exception;
910
use MongoDB\Model\BSONDocument;
@@ -27,11 +28,22 @@ class ChangeStreamsSpecTest extends FunctionalTestCase
2728
/**
2829
* Assert that the expected and actual command documents match.
2930
*
31+
* Note: this method may modify the $expected object.
32+
*
3033
* @param stdClass $expected Expected command document
3134
* @param stdClass $actual Actual command document
3235
*/
3336
public static function assertCommandMatches(stdClass $expected, stdClass $actual)
3437
{
38+
if (isset($expected->getMore) && $expected->getMore == 42) {
39+
static::assertObjectHasAttribute('getMore', $actual);
40+
static::assertThat($actual->getMore, static::logicalOr(
41+
static::isInstanceOf(Int64::class),
42+
static::isType('integer')
43+
));
44+
unset($expected->getMore);
45+
}
46+
3547
static::assertDocumentsMatch($expected, $actual);
3648
}
3749

@@ -76,15 +88,18 @@ public function testChangeStreams(stdClass $test, $databaseName = null, $collect
7688

7789
$this->checkServerRequirements($this->createRunOn($test));
7890

79-
if (! isset($databaseName, $collectionName, $database2Name, $collection2Name)) {
91+
if (! isset($databaseName, $collectionName)) {
8092
$this->fail('Required database and collection names are unset');
8193
}
8294

8395
$context = Context::fromChangeStreams($test, $databaseName, $collectionName);
8496
$this->setContext($context);
8597

8698
$this->dropDatabasesAndCreateCollection($databaseName, $collectionName);
87-
$this->dropDatabasesAndCreateCollection($database2Name, $collection2Name);
99+
100+
if (isset($database2Name, $collection2Name)) {
101+
$this->dropDatabasesAndCreateCollection($database2Name, $collection2Name);
102+
}
88103

89104
if (isset($test->failPoint)) {
90105
$this->configureFailPoint($test->failPoint);

tests/SpecTests/change-streams/change-streams-errors.json

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,7 @@
5454
"cursor": {},
5555
"pipeline": [
5656
{
57-
"$changeStream": {
58-
"fullDocument": "default"
59-
}
57+
"$changeStream": {}
6058
},
6159
{
6260
"$unsupported": "foo"
@@ -110,6 +108,53 @@
110108
]
111109
}
112110
}
111+
},
112+
{
113+
"description": "change stream errors on MaxTimeMSExpired",
114+
"minServerVersion": "4.2",
115+
"failPoint": {
116+
"configureFailPoint": "failCommand",
117+
"mode": {
118+
"times": 1
119+
},
120+
"data": {
121+
"failCommands": [
122+
"getMore"
123+
],
124+
"errorCode": 50,
125+
"closeConnection": false
126+
}
127+
},
128+
"target": "collection",
129+
"topology": [
130+
"replicaset",
131+
"sharded"
132+
],
133+
"changeStreamPipeline": [
134+
{
135+
"$project": {
136+
"_id": 0
137+
}
138+
}
139+
],
140+
"changeStreamOptions": {},
141+
"operations": [
142+
{
143+
"database": "change-stream-tests",
144+
"collection": "test",
145+
"name": "insertOne",
146+
"arguments": {
147+
"document": {
148+
"z": 3
149+
}
150+
}
151+
}
152+
],
153+
"result": {
154+
"error": {
155+
"code": 50
156+
}
157+
}
113158
}
114159
]
115160
}

0 commit comments

Comments
 (0)