Skip to content

Commit cc606b9

Browse files
committed
Ensure that resumeToken is included on resume attempts
JAVA-3871
1 parent f9bf950 commit cc606b9

File tree

6 files changed

+36
-15
lines changed

6 files changed

+36
-15
lines changed

driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,12 @@ public boolean hasNext() {
6464
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, Boolean>() {
6565
@Override
6666
public Boolean apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
67-
return queryBatchCursor.hasNext();
67+
try {
68+
return queryBatchCursor.hasNext();
69+
} catch (RuntimeException e) {
70+
cachePostBatchResumeToken(queryBatchCursor);
71+
throw e;
72+
}
6873
}
6974
});
7075
}
@@ -74,9 +79,11 @@ public List<T> next() {
7479
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, List<T>>() {
7580
@Override
7681
public List<T> apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
77-
List<T> results = convertResults(queryBatchCursor.next());
78-
cachePostBatchResumeToken(queryBatchCursor);
79-
return results;
82+
try {
83+
return convertResults(queryBatchCursor.next());
84+
} finally {
85+
cachePostBatchResumeToken(queryBatchCursor);
86+
}
8087
}
8188
});
8289
}
@@ -86,9 +93,11 @@ public List<T> tryNext() {
8693
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, List<T>>() {
8794
@Override
8895
public List<T> apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
89-
List<T> results = convertResults(queryBatchCursor.tryNext());
90-
cachePostBatchResumeToken(queryBatchCursor);
91-
return results;
96+
try {
97+
return convertResults(queryBatchCursor.tryNext());
98+
} finally {
99+
cachePostBatchResumeToken(queryBatchCursor);
100+
}
92101
}
93102
});
94103
}

driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ class QueryBatchCursor<T> implements AggregateResponseBatchCursor<T> {
100100
this.decoder = notNull("decoder", decoder);
101101
if (result != null) {
102102
this.operationTime = result.getTimestamp(OPERATION_TIME, null);
103+
this.postBatchResumeToken = getPostBatchResumeTokenFromResponse(result);
103104
}
104105
if (firstQueryResult.getCursor() != null) {
105106
notNull("connectionSource", connectionSource);

driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,21 +157,32 @@ class OperationFunctionalSpecification extends Specification {
157157
}
158158

159159
def next(cursor, boolean async, int minimumCount) {
160+
next(cursor, async, false, minimumCount)
161+
}
162+
163+
def next(cursor, boolean async, boolean callHasNextBeforeNext, int minimumCount) {
160164
List<BsonDocument> retVal = []
161165

162166
while (retVal.size() < minimumCount) {
163-
retVal.addAll(next(cursor, async))
167+
retVal.addAll(doNext(cursor, async, callHasNextBeforeNext))
164168
}
165169

166170
retVal
167171
}
168172

169173
def next(cursor, boolean async) {
174+
doNext(cursor, async, false)
175+
}
176+
177+
def doNext(cursor, boolean async, boolean callHasNextBeforeNext) {
170178
if (async) {
171179
def futureResultCallback = new FutureResultCallback<List<BsonDocument>>()
172180
cursor.next(futureResultCallback)
173181
futureResultCallback.get(TIMEOUT, TimeUnit.SECONDS)
174182
} else {
183+
if (callHasNextBeforeNext) {
184+
cursor.hasNext()
185+
}
175186
cursor.next()
176187
}
177188
}

driver-core/src/test/functional/com/mongodb/internal/operation/ChangeStreamOperationProseTestSpecification.groovy

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ class ChangeStreamOperationProseTestSpecification extends OperationFunctionalSpe
9696
setFailPoint(failPointDocument)
9797

9898
then:
99-
def result = next(cursor, async, 2)
99+
def result = next(cursor, async, callHasNext, 2)
100100

101101
then:
102102
result.size() == 2
@@ -107,7 +107,10 @@ class ChangeStreamOperationProseTestSpecification extends OperationFunctionalSpe
107107
waitForLastRelease(async ? getAsyncCluster() : getCluster())
108108

109109
where:
110-
async << [true, false]
110+
async | callHasNext
111+
true | false
112+
false | false
113+
false | true
111114
}
112115

113116
//

driver-sync/src/test/functional/com/mongodb/client/ChangeStreamProseTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,8 +370,8 @@ public void testGetResumeTokenReturnsPostBatchResumeTokenAfterGetMore()
370370
// use reflection to access the postBatchResumeToken
371371
AggregateResponseBatchCursor<?> batchCursor = getBatchCursor(cursor);
372372

373-
// check equality in the case where the batch has not been iterated at all
374-
assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());
373+
// resume token should be null before iteration
374+
assertNull(cursor.getResumeToken());
375375

376376
cursor.next();
377377
assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());

driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestValidator.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.Collection;
3434
import java.util.List;
3535

36-
import static org.junit.Assume.assumeTrue;
3736
import static util.JsonPoweredTestHelper.getTestDocument;
3837
import static util.JsonPoweredTestHelper.getTestFiles;
3938

@@ -51,8 +50,6 @@ public UnifiedTestValidator(final String fileDescription, final String testDescr
5150

5251
@Before
5352
public void setUp() {
54-
// TODO: remove after https://jira.mongodb.org/browse/JAVA-3871 is fixed
55-
assumeTrue(!(fileDescription.equals("poc-change-streams") && testDescription.equals("Test consecutive resume")));
5653
super.setUp();
5754
}
5855

0 commit comments

Comments
 (0)