Skip to content

Commit 1c318a6

Browse files
authored
Ensure that resumeToken is included on resume attempts (#634)
JAVA-3871
1 parent 858d743 commit 1c318a6

File tree

6 files changed

+37
-15
lines changed

6 files changed

+37
-15
lines changed

driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,11 @@ public boolean hasNext() {
6464
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, Boolean>() {
6565
@Override
6666
public Boolean apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
67-
return queryBatchCursor.hasNext();
67+
try {
68+
return queryBatchCursor.hasNext();
69+
} finally {
70+
cachePostBatchResumeToken(queryBatchCursor);
71+
}
6872
}
6973
});
7074
}
@@ -74,9 +78,11 @@ public List<T> next() {
7478
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, List<T>>() {
7579
@Override
7680
public List<T> apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
77-
List<T> results = convertResults(queryBatchCursor.next());
78-
cachePostBatchResumeToken(queryBatchCursor);
79-
return results;
81+
try {
82+
return convertResults(queryBatchCursor.next());
83+
} finally {
84+
cachePostBatchResumeToken(queryBatchCursor);
85+
}
8086
}
8187
});
8288
}
@@ -86,9 +92,11 @@ public List<T> tryNext() {
8692
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, List<T>>() {
8793
@Override
8894
public List<T> apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
89-
List<T> results = convertResults(queryBatchCursor.tryNext());
90-
cachePostBatchResumeToken(queryBatchCursor);
91-
return results;
95+
try {
96+
return convertResults(queryBatchCursor.tryNext());
97+
} finally {
98+
cachePostBatchResumeToken(queryBatchCursor);
99+
}
92100
}
93101
});
94102
}

driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ class QueryBatchCursor<T> implements AggregateResponseBatchCursor<T> {
100100
this.decoder = notNull("decoder", decoder);
101101
if (result != null) {
102102
this.operationTime = result.getTimestamp(OPERATION_TIME, null);
103+
this.postBatchResumeToken = getPostBatchResumeTokenFromResponse(result);
103104
}
104105
if (firstQueryResult.getCursor() != null) {
105106
notNull("connectionSource", connectionSource);

driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,21 +157,32 @@ class OperationFunctionalSpecification extends Specification {
157157
}
158158

159159
def next(cursor, boolean async, int minimumCount) {
160+
next(cursor, async, false, minimumCount)
161+
}
162+
163+
def next(cursor, boolean async, boolean callHasNextBeforeNext, int minimumCount) {
160164
List<BsonDocument> retVal = []
161165

162166
while (retVal.size() < minimumCount) {
163-
retVal.addAll(next(cursor, async))
167+
retVal.addAll(doNext(cursor, async, callHasNextBeforeNext))
164168
}
165169

166170
retVal
167171
}
168172

169173
def next(cursor, boolean async) {
174+
doNext(cursor, async, false)
175+
}
176+
177+
def doNext(cursor, boolean async, boolean callHasNextBeforeNext) {
170178
if (async) {
171179
def futureResultCallback = new FutureResultCallback<List<BsonDocument>>()
172180
cursor.next(futureResultCallback)
173181
futureResultCallback.get(TIMEOUT, TimeUnit.SECONDS)
174182
} else {
183+
if (callHasNextBeforeNext) {
184+
cursor.hasNext()
185+
}
175186
cursor.next()
176187
}
177188
}

driver-core/src/test/functional/com/mongodb/internal/operation/ChangeStreamOperationProseTestSpecification.groovy

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ class ChangeStreamOperationProseTestSpecification extends OperationFunctionalSpe
9696
setFailPoint(failPointDocument)
9797

9898
then:
99-
def result = next(cursor, async, 2)
99+
def result = next(cursor, async, callHasNext, 2)
100100

101101
then:
102102
result.size() == 2
@@ -107,7 +107,10 @@ class ChangeStreamOperationProseTestSpecification extends OperationFunctionalSpe
107107
waitForLastRelease(async ? getAsyncCluster() : getCluster())
108108

109109
where:
110-
async << [true, false]
110+
async | callHasNext
111+
true | false
112+
false | false
113+
false | true
111114
}
112115

113116
//

driver-sync/src/test/functional/com/mongodb/client/ChangeStreamProseTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,8 +370,10 @@ public void testGetResumeTokenReturnsPostBatchResumeTokenAfterGetMore()
370370
// use reflection to access the postBatchResumeToken
371371
AggregateResponseBatchCursor<?> batchCursor = getBatchCursor(cursor);
372372

373-
// check equality in the case where the batch has not been iterated at all
374-
assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());
373+
assertNotNull(batchCursor.getPostBatchResumeToken());
374+
375+
// resume token should be null before iteration
376+
assertNull(cursor.getResumeToken());
375377

376378
cursor.next();
377379
assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());

driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestValidator.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.Collection;
3434
import java.util.List;
3535

36-
import static org.junit.Assume.assumeTrue;
3736
import static util.JsonPoweredTestHelper.getTestDocument;
3837
import static util.JsonPoweredTestHelper.getTestFiles;
3938

@@ -51,8 +50,6 @@ public UnifiedTestValidator(final String fileDescription, final String testDescr
5150

5251
@Before
5352
public void setUp() {
54-
// TODO: remove after https://jira.mongodb.org/browse/JAVA-3871 is fixed
55-
assumeTrue(!(fileDescription.equals("poc-change-streams") && testDescription.equals("Test consecutive resume")));
5653
super.setUp();
5754
}
5855

0 commit comments

Comments
 (0)