Skip to content

Commit 0b0ffc3

Browse files
committed
Ensure that resumeToken is included on resume attempts (#634)
JAVA-3871
1 parent 0e556f1 commit 0b0ffc3

File tree

4 files changed

+32
-10
lines changed

4 files changed

+32
-10
lines changed

driver-core/src/main/com/mongodb/operation/ChangeStreamBatchCursor.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ public boolean hasNext() {
6161
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, Boolean>() {
6262
@Override
6363
public Boolean apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
64-
return queryBatchCursor.hasNext();
64+
try {
65+
return queryBatchCursor.hasNext();
66+
} finally {
67+
cachePostBatchResumeToken(queryBatchCursor);
68+
}
6569
}
6670
});
6771
}
@@ -71,9 +75,11 @@ public List<T> next() {
7175
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, List<T>>() {
7276
@Override
7377
public List<T> apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
74-
List<T> results = convertResults(queryBatchCursor.next());
75-
cachePostBatchResumeToken(queryBatchCursor);
76-
return results;
78+
try {
79+
return convertResults(queryBatchCursor.next());
80+
} finally {
81+
cachePostBatchResumeToken(queryBatchCursor);
82+
}
7783
}
7884
});
7985
}
@@ -83,9 +89,11 @@ public List<T> tryNext() {
8389
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, List<T>>() {
8490
@Override
8591
public List<T> apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
86-
List<T> results = convertResults(queryBatchCursor.tryNext());
87-
cachePostBatchResumeToken(queryBatchCursor);
88-
return results;
92+
try {
93+
return convertResults(queryBatchCursor.tryNext());
94+
} finally {
95+
cachePostBatchResumeToken(queryBatchCursor);
96+
}
8997
}
9098
});
9199
}

driver-core/src/main/com/mongodb/operation/QueryBatchCursor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ class QueryBatchCursor<T> implements AggregateResponseBatchCursor<T> {
9999
this.decoder = notNull("decoder", decoder);
100100
if (result != null) {
101101
this.operationTime = result.getTimestamp(OPERATION_TIME, null);
102+
this.postBatchResumeToken = getPostBatchResumeTokenFromResponse(result);
102103
}
103104
if (firstQueryResult.getCursor() != null) {
104105
notNull("connectionSource", connectionSource);

driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,21 +157,32 @@ class OperationFunctionalSpecification extends Specification {
157157
}
158158

159159
def next(cursor, boolean async, int minimumCount) {
160+
next(cursor, async, false, minimumCount)
161+
}
162+
163+
def next(cursor, boolean async, boolean callHasNextBeforeNext, int minimumCount) {
160164
List<BsonDocument> retVal = []
161165

162166
while (retVal.size() < minimumCount) {
163-
retVal.addAll(next(cursor, async))
167+
retVal.addAll(doNext(cursor, async, callHasNextBeforeNext))
164168
}
165169

166170
retVal
167171
}
168172

169173
def next(cursor, boolean async) {
174+
doNext(cursor, async, false)
175+
}
176+
177+
def doNext(cursor, boolean async, boolean callHasNextBeforeNext) {
170178
if (async) {
171179
def futureResultCallback = new FutureResultCallback<List<BsonDocument>>()
172180
cursor.next(futureResultCallback)
173181
futureResultCallback.get(TIMEOUT, TimeUnit.SECONDS)
174182
} else {
183+
if (callHasNextBeforeNext) {
184+
cursor.hasNext()
185+
}
175186
cursor.next()
176187
}
177188
}

driver-sync/src/test/functional/com/mongodb/client/ChangeStreamProseTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,8 +394,10 @@ public void testGetResumeTokenReturnsPostBatchResumeTokenAfterGetMore()
394394
// use reflection to access the postBatchResumeToken
395395
AggregateResponseBatchCursor<?> batchCursor = getBatchCursor(cursor);
396396

397-
// check equality in the case where the batch has not been iterated at all
398-
assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());
397+
assertNotNull(batchCursor.getPostBatchResumeToken());
398+
399+
// resume token should be null before iteration
400+
assertNull(cursor.getResumeToken());
399401

400402
cursor.next();
401403
assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());

0 commit comments

Comments
 (0)