From 5fdb0b63090eec5e24e2f6c5d1c1a19d22009bcc Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Fri, 16 Sep 2022 11:15:10 +0200 Subject: [PATCH 1/2] Prepare issue branch. --- pom.xml | 2 +- spring-data-mongodb-benchmarks/pom.xml | 2 +- spring-data-mongodb-distribution/pom.xml | 2 +- spring-data-mongodb/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index a401249d79..24f6bafc9f 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 4.0.0-SNAPSHOT + 4.0.x-GH-4167-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml index c28a240d2c..64d7d599ea 100644 --- a/spring-data-mongodb-benchmarks/pom.xml +++ b/spring-data-mongodb-benchmarks/pom.xml @@ -7,7 +7,7 @@ org.springframework.data spring-data-mongodb-parent - 4.0.0-SNAPSHOT + 4.0.x-GH-4167-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index 0412911f82..007a78ba81 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -15,7 +15,7 @@ org.springframework.data spring-data-mongodb-parent - 4.0.0-SNAPSHOT + 4.0.x-GH-4167-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index cb8c76ade4..0bbc20fa63 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -13,7 +13,7 @@ org.springframework.data spring-data-mongodb-parent - 4.0.0-SNAPSHOT + 4.0.x-GH-4167-SNAPSHOT ../pom.xml From 74c8acd8a862654857b2478b376ca202bca7fd70 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Fri, 16 Sep 2022 11:16:28 +0200 Subject: [PATCH 2/2] Fix usage of change stream option startAfter. We now make sure to apply the token to startAfter method of the driver. Before this change it had been incorrectly applied to resumeAfter. --- .../mongodb/core/ReactiveMongoTemplate.java | 6 +++++- .../core/ReactiveMongoTemplateUnitTests.java | 20 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index e8fad963c7..9af3b5518f 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -1894,7 +1894,11 @@ public Flux> changeStream(@Nullable String database, @N publisher = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class); } - publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher); + if(options.isResumeAfter()) { + publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher); + } else if (options.isStartAfter()) { + publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::startAfter).orElse(publisher); + } publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation) .orElse(publisher); publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java index 0d9bca468c..b6fedd86e3 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java @@ -39,6 +39,8 @@ import java.util.stream.Collectors; import org.assertj.core.api.Assertions; +import org.bson.BsonDocument; +import org.bson.BsonString; import org.bson.Document; import org.bson.conversions.Bson; import org.bson.types.ObjectId; @@ -111,6 +113,7 @@ import com.mongodb.client.result.InsertOneResult; import com.mongodb.client.result.UpdateResult; import com.mongodb.reactivestreams.client.AggregatePublisher; +import com.mongodb.reactivestreams.client.ChangeStreamPublisher; import com.mongodb.reactivestreams.client.DistinctPublisher; import com.mongodb.reactivestreams.client.FindPublisher; import com.mongodb.reactivestreams.client.MapReducePublisher; @@ -146,6 +149,7 @@ public class ReactiveMongoTemplateUnitTests { @Mock DistinctPublisher distinctPublisher; @Mock Publisher deletePublisher; @Mock MapReducePublisher mapReducePublisher; + @Mock ChangeStreamPublisher changeStreamPublisher; private MongoExceptionTranslator exceptionTranslator = new MongoExceptionTranslator(); private MappingMongoConverter converter; @@ -1485,6 +1489,22 @@ void createCollectionShouldSetUpTimeSeries() { .granularity(TimeSeriesGranularity.HOURS).toString()); } + @Test // GH-4167 + void changeStreamOptionStartAftershouldApplied() { + + when(factory.getMongoDatabase(anyString())).thenReturn(Mono.just(db)); + + when(collection.watch(any(Class.class))).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.batchSize(anyInt())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.startAfter(any())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher); + + BsonDocument token = new BsonDocument("token", new BsonString("id")); + template.changeStream("database", "collection", ChangeStreamOptions.builder().startAfter(token).build(), Object.class).subscribe(); + + verify(changeStreamPublisher).startAfter(eq(token)); + } + private void stubFindSubscribe(Document document) { Publisher realPublisher = Flux.just(document);