Skip to content

Commit 5c333d6

Browse files
committed
DATAMONGO-2258 - Polishing.
Consider undefined state in ChangeStreamOptions.isResumeAfter()/isStartAfter() when resume token is not set. Original pull request: #739.
1 parent 7f4d3f2 commit 5c333d6

File tree

3 files changed

+76
-11
lines changed

3 files changed

+76
-11
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.bson.BsonTimestamp;
2626
import org.bson.BsonValue;
2727
import org.bson.Document;
28+
2829
import org.springframework.data.mongodb.core.aggregation.Aggregation;
2930
import org.springframework.data.mongodb.core.query.Collation;
3031
import org.springframework.lang.Nullable;
@@ -52,7 +53,7 @@ public class ChangeStreamOptions {
5253
private @Nullable FullDocument fullDocumentLookup;
5354
private @Nullable Collation collation;
5455
private @Nullable Object resumeTimestamp;
55-
private Resume resume = Resume.RESUME_AFTER;
56+
private Resume resume = Resume.UNDEFINED;
5657

5758
protected ChangeStreamOptions() {}
5859

@@ -161,6 +162,8 @@ private static <T> Object doGetTimestamp(Object timestamp, Class<T> targetType)
161162
*/
162163
enum Resume {
163164

165+
UNDEFINED,
166+
164167
/**
165168
* @see com.mongodb.client.ChangeStreamIterable#startAfter(BsonDocument)
166169
*/
@@ -185,7 +188,7 @@ public static class ChangeStreamOptionsBuilder {
185188
private @Nullable FullDocument fullDocumentLookup;
186189
private @Nullable Collation collation;
187190
private @Nullable Object resumeTimestamp;
188-
private Resume resume = Resume.RESUME_AFTER;
191+
private Resume resume = Resume.UNDEFINED;
189192

190193
private ChangeStreamOptionsBuilder() {}
191194

@@ -253,6 +256,11 @@ public ChangeStreamOptionsBuilder resumeToken(BsonValue resumeToken) {
253256
Assert.notNull(resumeToken, "ResumeToken must not be null!");
254257

255258
this.resumeToken = resumeToken;
259+
260+
if (this.resume == Resume.UNDEFINED) {
261+
this.resume = Resume.RESUME_AFTER;
262+
}
263+
256264
return this;
257265
}
258266

@@ -319,7 +327,7 @@ public ChangeStreamOptionsBuilder resumeAt(BsonTimestamp resumeTimestamp) {
319327
public ChangeStreamOptionsBuilder resumeAfter(BsonValue resumeToken) {
320328

321329
resumeToken(resumeToken);
322-
resume = Resume.RESUME_AFTER;
330+
this.resume = Resume.RESUME_AFTER;
323331

324332
return this;
325333
}
@@ -334,7 +342,7 @@ public ChangeStreamOptionsBuilder resumeAfter(BsonValue resumeToken) {
334342
public ChangeStreamOptionsBuilder startAfter(BsonValue resumeToken) {
335343

336344
resumeToken(resumeToken);
337-
resume = Resume.START_AFTER;
345+
this.resume = Resume.START_AFTER;
338346

339347
return this;
340348
}
@@ -346,12 +354,12 @@ public ChangeStreamOptions build() {
346354

347355
ChangeStreamOptions options = new ChangeStreamOptions();
348356

349-
options.filter = filter;
350-
options.resumeToken = resumeToken;
351-
options.fullDocumentLookup = fullDocumentLookup;
352-
options.collation = collation;
353-
options.resumeTimestamp = resumeTimestamp;
354-
options.resume = resume;
357+
options.filter = this.filter;
358+
options.resumeToken = this.resumeToken;
359+
options.fullDocumentLookup = this.fullDocumentLookup;
360+
options.collation = this.collation;
361+
options.resumeTimestamp = this.resumeTimestamp;
362+
options.resume = this.resume;
355363

356364
return options;
357365
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.core;
17+
18+
import static org.assertj.core.api.Assertions.*;
19+
20+
import org.bson.BsonDocument;
21+
import org.junit.Test;
22+
23+
/**
24+
* Unit tests for {@link ChangeStreamOptions}.
25+
*
26+
* @author Mark Paluch
27+
*/
28+
public class ChangeStreamOptionsUnitTests {
29+
30+
@Test // DATAMONGO-2258
31+
public void shouldReportResumeAfter() {
32+
33+
ChangeStreamOptions options = ChangeStreamOptions.builder().resumeAfter(new BsonDocument()).build();
34+
35+
assertThat(options.isResumeAfter()).isTrue();
36+
assertThat(options.isStartAfter()).isFalse();
37+
}
38+
39+
@Test // DATAMONGO-2258
40+
public void shouldReportStartAfter() {
41+
42+
ChangeStreamOptions options = ChangeStreamOptions.builder().startAfter(new BsonDocument()).build();
43+
44+
assertThat(options.isResumeAfter()).isFalse();
45+
assertThat(options.isStartAfter()).isTrue();
46+
}
47+
48+
@Test // DATAMONGO-2258
49+
public void shouldNotReportResumeStartAfter() {
50+
51+
ChangeStreamOptions options = ChangeStreamOptions.empty();
52+
53+
assertThat(options.isResumeAfter()).isFalse();
54+
assertThat(options.isStartAfter()).isFalse();
55+
}
56+
}

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.junit.runner.RunWith;
2828
import org.mockito.Mock;
2929
import org.mockito.junit.MockitoJUnitRunner;
30+
3031
import org.springframework.data.mongodb.core.MongoTemplate;
3132
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
3233
import org.springframework.data.mongodb.core.convert.MongoConverter;
@@ -71,7 +72,7 @@ public void setUp() {
7172
}
7273

7374
@Test // DATAMONGO-2258
74-
public void shouldBe2DotOneComplient() {
75+
public void shouldNotBreakLovelaceBehavior() {
7576

7677
BsonDocument resumeToken = new BsonDocument("token", new BsonString(UUID.randomUUID().toString()));
7778

0 commit comments

Comments
 (0)