diff --git a/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java b/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java index 38c05bf65..6a7a5efab 100644 --- a/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java +++ b/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java @@ -18,7 +18,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import org.reactivestreams.Publisher; import org.springframework.data.couchbase.core.CouchbaseOperations; import org.springframework.data.couchbase.core.ExecutableFindByQueryOperation.ExecutableFindByQuery; import org.springframework.data.couchbase.core.query.Query; @@ -112,14 +111,12 @@ public Object execute(Object[] parameters) { ReactiveCouchbaseParameterAccessor accessor = new ReactiveCouchbaseParameterAccessor(getQueryMethod(), parameters); - return accessor.resolveParameters().flatMapMany(this::executeDeferred); + Object result = accessor.resolveParameters().map(this::executeDeferred); + return ((Mono) result).block() ; } - private Publisher executeDeferred(ReactiveCouchbaseParameterAccessor parameterAccessor) { - if (getQueryMethod().isCollectionQuery()) { - return Flux.defer(() -> (Publisher) execute(parameterAccessor)); - } - return Mono.defer(() -> (Mono) execute(parameterAccessor)); + private Object executeDeferred(ReactiveCouchbaseParameterAccessor parameterAccessor) { + return execute(parameterAccessor); } private Object execute(ParametersParameterAccessor parameterAccessor) {