Skip to content

Commit bc383d8

Browse files
committed
Add support for reactive CursorWindow query methods.
1 parent c9831f7 commit bc383d8

File tree

8 files changed

+84
-12
lines changed

8 files changed

+84
-12
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ExecutableFindOperation.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@ default Optional<T> first() {
128128
/**
129129
* Return a window from a {@link CursorRequest}.
130130
*
131+
* @param cursorRequest the cursor request.
131132
* @return a window from the resulting elements.
133+
* @since 4.1
132134
*/
133135
CursorWindow<T> window(CursorRequest cursorRequest);
134136

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -855,8 +855,6 @@ public <T> List<T> find(Query query, Class<T> entityClass, String collectionName
855855
@Override
856856
public <T> CursorWindow<T> findWindow(CursorRequest cursorRequest, Query query, Class<T> entityType) {
857857

858-
Assert.notNull(cursorRequest, "CursorRequest must not be null");
859-
Assert.notNull(query, "Query must not be null");
860858
Assert.notNull(entityType, "Entity type must not be null");
861859

862860
return findWindow(cursorRequest, query, entityType, getCollectionName(entityType));
@@ -867,7 +865,6 @@ public <T> CursorWindow<T> findWindow(CursorRequest cursorRequest, Query query,
867865
// CursorRequest is similar to Pageable in the sense of being required to define a sort order
868866
public <T> CursorWindow<T> findWindow(CursorRequest cursorRequest, Query query, Class<T> entityType,
869867
String collectionName) {
870-
871868
return doFindWindow(cursorRequest, query, entityType, entityType, collectionName);
872869
}
873870

@@ -881,6 +878,7 @@ <T> CursorWindow<T> doFindWindow(CursorRequest cursorRequest, Query query, Class
881878
Assert.notNull(targetClass, "Target type must not be null");
882879

883880
ReadDocumentCallback<T> callback = new ReadDocumentCallback<>(mongoConverter, targetClass, collectionName);
881+
884882
if (cursorRequest instanceof OffsetCursorRequest offset) {
885883

886884
int limit = offset.getSize() + 1;

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFindOperation.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import reactor.core.publisher.Flux;
1919
import reactor.core.publisher.Mono;
2020

21+
import org.springframework.data.domain.CursorRequest;
22+
import org.springframework.data.domain.CursorWindow;
2123
import org.springframework.data.geo.GeoResult;
2224
import org.springframework.data.mongodb.core.query.CriteriaDefinition;
2325
import org.springframework.data.mongodb.core.query.NearQuery;
@@ -87,6 +89,15 @@ interface TerminatingFind<T> {
8789
*/
8890
Flux<T> all();
8991

92+
/**
93+
* Return a window from a {@link CursorRequest}.
94+
*
95+
* @param cursorRequest the cursor request.
96+
* @return a window from the resulting elements.
97+
* @since 4.1
98+
*/
99+
Mono<CursorWindow<T>> window(CursorRequest cursorRequest);
100+
90101
/**
91102
* Get all matching elements using a {@link com.mongodb.CursorType#TailableAwait tailable cursor}. The stream will
92103
* not be completed unless the {@link org.reactivestreams.Subscription} is

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFindOperationSupport.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import org.bson.Document;
2222
import org.springframework.dao.IncorrectResultSizeDataAccessException;
23+
import org.springframework.data.domain.CursorRequest;
24+
import org.springframework.data.domain.CursorWindow;
2325
import org.springframework.data.mongodb.core.CollectionPreparerSupport.ReactiveCollectionPreparerDelegate;
2426
import org.springframework.data.mongodb.core.query.NearQuery;
2527
import org.springframework.data.mongodb.core.query.Query;
@@ -137,6 +139,11 @@ public Flux<T> all() {
137139
return doFind(null);
138140
}
139141

142+
@Override
143+
public Mono<CursorWindow<T>> window(CursorRequest cursorRequest) {
144+
return template.doFindWindow(cursorRequest, query, domainType, returnType, getCollectionName());
145+
}
146+
140147
@Override
141148
public Flux<T> tail() {
142149
return doFind(template.new TailingQueryFindPublisherPreparer(query, domainType));

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -834,8 +834,6 @@ public <T> Flux<T> find(@Nullable Query query, Class<T> entityClass, String coll
834834
@Override
835835
public <T> Mono<CursorWindow<T>> findWindow(CursorRequest cursorRequest, Query query, Class<T> entityType) {
836836

837-
Assert.notNull(cursorRequest, "CursorRequest must not be null");
838-
Assert.notNull(query, "Query must not be null");
839837
Assert.notNull(entityType, "Entity type must not be null");
840838

841839
return findWindow(cursorRequest, query, entityType, getCollectionName(entityType));
@@ -846,19 +844,25 @@ public <T> Mono<CursorWindow<T>> findWindow(CursorRequest cursorRequest, Query q
846844
// CursorRequest is similar to Pageable in the sense of being required to define a sort order
847845
public <T> Mono<CursorWindow<T>> findWindow(CursorRequest cursorRequest, Query query, Class<T> entityType,
848846
String collectionName) {
847+
return doFindWindow(cursorRequest, query, entityType, entityType, collectionName);
848+
}
849+
850+
<T> Mono<CursorWindow<T>> doFindWindow(CursorRequest cursorRequest, Query query, Class<?> sourceClass,
851+
Class<T> targetClass, String collectionName) {
849852

850853
Assert.notNull(cursorRequest, "CursorRequest must not be null");
851854
Assert.notNull(query, "Query must not be null");
852855
Assert.notNull(collectionName, "CollectionName must not be null");
853-
Assert.notNull(entityType, "Entity type must not be null");
856+
Assert.notNull(sourceClass, "Entity type must not be null");
857+
Assert.notNull(targetClass, "Target type must not be null");
854858

855859
if (cursorRequest instanceof OffsetCursorRequest offset) {
856860

857861
int limit = offset.getSize() + 1;
858862

859863
Mono<List<T>> result = doFind(collectionName, ReactiveCollectionPreparerDelegate.of(query),
860-
query.getQueryObject(), query.getFieldsObject(), entityType, new QueryFindPublisherPreparer(query,
861-
new Query().with(cursorRequest.getSort()).getSortObject(), limit, offset.getOffset(), entityType))
864+
query.getQueryObject(), query.getFieldsObject(), targetClass, new QueryFindPublisherPreparer(query,
865+
new Query().with(cursorRequest.getSort()).getSortObject(), limit, offset.getOffset(), sourceClass))
862866
.collectList();
863867

864868
return result.map(it -> CursorUtils.createWindow(offset, it));
@@ -869,11 +873,11 @@ public <T> Mono<CursorWindow<T>> findWindow(CursorRequest cursorRequest, Query q
869873
int limit = keyset.getSize() + 1;
870874

871875
KeySetCursorQuery keysetPaginationQuery = CursorUtils.createKeysetPaginationQuery(query, keyset,
872-
operations.getIdPropertyName(entityType));
876+
operations.getIdPropertyName(sourceClass));
873877

874878
Mono<List<T>> result = doFind(collectionName, ReactiveCollectionPreparerDelegate.of(query),
875-
keysetPaginationQuery.query(), keysetPaginationQuery.fields(), entityType,
876-
new QueryFindPublisherPreparer(query, keysetPaginationQuery.sort(), limit, 0, entityType)).collectList();
879+
keysetPaginationQuery.query(), keysetPaginationQuery.fields(), targetClass,
880+
new QueryFindPublisherPreparer(query, keysetPaginationQuery.sort(), limit, 0, sourceClass)).collectList();
877881

878882
return result.map(it -> CursorUtils.createWindow(keyset, it, operations));
879883
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ private ReactiveMongoQueryExecution getExecutionToWrap(MongoParameterAccessor ac
203203
return (q, t, c) -> operation.matching(q.with(accessor.getPageable())).tail();
204204
} else if (method.isCollectionQuery()) {
205205
return (q, t, c) -> operation.matching(q.with(accessor.getPageable())).all();
206+
} else if (method.isCursorWindowQuery()) {
207+
return (q, t, c) -> operation.matching(q).window(accessor.getCursorRequest());
206208
} else if (isCountQuery()) {
207209
return (q, t, c) -> operation.matching(q).count();
208210
} else if (isExistsQuery()) {

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,21 @@
3434
import java.util.concurrent.BlockingQueue;
3535
import java.util.concurrent.LinkedBlockingDeque;
3636
import java.util.concurrent.TimeUnit;
37+
import java.util.function.Function;
3738

3839
import org.junit.jupiter.api.Test;
3940
import org.junit.jupiter.api.TestInstance;
4041
import org.junit.jupiter.api.extension.ExtendWith;
4142
import org.reactivestreams.Publisher;
42-
4343
import org.springframework.beans.factory.BeanFactory;
4444
import org.springframework.beans.factory.annotation.Autowired;
4545
import org.springframework.context.annotation.Bean;
4646
import org.springframework.context.annotation.Configuration;
4747
import org.springframework.dao.IncorrectResultSizeDataAccessException;
48+
import org.springframework.data.domain.CursorRequest;
49+
import org.springframework.data.domain.CursorWindow;
50+
import org.springframework.data.domain.KeysetCursorRequest;
51+
import org.springframework.data.domain.OffsetCursorRequest;
4852
import org.springframework.data.domain.PageRequest;
4953
import org.springframework.data.domain.Pageable;
5054
import org.springframework.data.domain.Sort;
@@ -285,6 +289,39 @@ void shouldUseTailableCursorWithDtoProjection() {
285289
cappedRepository.findDtoProjectionByKey("value").as(StepVerifier::create).expectNextCount(1).thenCancel().verify();
286290
}
287291

292+
@Test // GH-4308
293+
void appliesCursorWindowingCorrectly() {
294+
295+
CursorWindow<Person> cursorWindow = repository
296+
.findByLastnameLike("*", KeysetCursorRequest.ofSize(2, Sort.by("firstname", "lastname"))).block();
297+
298+
assertThat(cursorWindow).hasSize(2);
299+
assertThat(cursorWindow).containsSequence(alicia, boyd);
300+
assertThat(cursorWindow.isFirst()).isTrue();
301+
assertThat(cursorWindow.isLast()).isFalse();
302+
303+
CursorWindow<Person> nextCursorWindow = repository.findByLastnameLike("*", cursorWindow.nextCursorRequest())
304+
.block();
305+
306+
assertThat(nextCursorWindow).hasSize(2);
307+
assertThat(nextCursorWindow).containsSequence(carter, dave);
308+
assertThat(nextCursorWindow.isFirst()).isFalse();
309+
assertThat(nextCursorWindow.isLast()).isFalse();
310+
}
311+
312+
@Test // GH-4308
313+
void appliesCursorWindowingWithProjectionCorrectly() {
314+
315+
repository
316+
.findCursorProjectionByLastnameLike("*",
317+
OffsetCursorRequest.ofSize(2, Sort.by(Direction.ASC, "firstname", "lastname"))) //
318+
.flatMapIterable(Function.identity()) //
319+
.as(StepVerifier::create) //
320+
.expectNext(new PersonSummaryDto(alicia.getFirstname(), alicia.getLastname())) //
321+
.expectNextCount(1) //
322+
.verifyComplete();
323+
}
324+
288325
@Test // DATAMONGO-1444
289326
@DirtiesState
290327
void findsPeopleByLocationWithinCircle() {
@@ -712,6 +749,11 @@ interface ReactivePersonRepository
712749
@Query("{ lastname: { $in: ?0 }, age: { $gt : ?1 } }")
713750
Flux<Person> findStringQuery(Flux<String> lastname, Mono<Integer> age);
714751

752+
Mono<CursorWindow<Person>> findByLastnameLike(String lastname, CursorRequest cursorRequest);
753+
754+
Mono<CursorWindow<PersonSummaryDto>> findCursorProjectionByLastnameLike(String lastname,
755+
OffsetCursorRequest cursorRequest);
756+
715757
Flux<Person> findByLocationWithin(Circle circle);
716758

717759
Flux<Person> findByLocationWithin(Circle circle, Pageable pageable);

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/query/StubParameterAccessor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.Arrays;
1919
import java.util.Iterator;
2020

21+
import org.springframework.data.domain.CursorRequest;
2122
import org.springframework.data.domain.Pageable;
2223
import org.springframework.data.domain.Range;
2324
import org.springframework.data.domain.Range.Bound;
@@ -72,6 +73,11 @@ public StubParameterAccessor(Object... values) {
7273
}
7374
}
7475

76+
@Override
77+
public CursorRequest getCursorRequest() {
78+
return null;
79+
}
80+
7581
public Pageable getPageable() {
7682
return null;
7783
}

0 commit comments

Comments
 (0)