18
18
import reactor .core .publisher .Flux ;
19
19
import reactor .core .publisher .Mono ;
20
20
21
- import org .reactivestreams .Publisher ;
22
21
import org .springframework .data .couchbase .core .CouchbaseOperations ;
23
22
import org .springframework .data .couchbase .core .ExecutableFindByQueryOperation .ExecutableFindByQuery ;
24
23
import org .springframework .data .couchbase .core .query .Query ;
@@ -112,14 +111,14 @@ public Object execute(Object[] parameters) {
112
111
113
112
ReactiveCouchbaseParameterAccessor accessor = new ReactiveCouchbaseParameterAccessor (getQueryMethod (), parameters );
114
113
115
- return accessor .resolveParameters ().flatMapMany (this ::executeDeferred );
114
+ Object result = accessor .resolveParameters ()./* flatMapMany */ map (this ::executeDeferred );
115
+ result = result instanceof Flux ? ((Flux <Object >) result ).collectList ().block ()
116
+ : result instanceof Mono ? ((Mono <Object >) result ).block () : result ;
117
+ return result ;
116
118
}
117
119
118
- private Publisher <Object > executeDeferred (ReactiveCouchbaseParameterAccessor parameterAccessor ) {
119
- if (getQueryMethod ().isCollectionQuery ()) {
120
- return Flux .defer (() -> (Publisher <Object >) execute (parameterAccessor ));
121
- }
122
- return Mono .defer (() -> (Mono <Object >) execute (parameterAccessor ));
120
+ private Object executeDeferred (ReactiveCouchbaseParameterAccessor parameterAccessor ) {
121
+ return execute (parameterAccessor );
123
122
}
124
123
125
124
private Object execute (ParametersParameterAccessor parameterAccessor ) {
@@ -131,7 +130,9 @@ private Object execute(ParametersParameterAccessor parameterAccessor) {
131
130
if (typeToRead == null && returnType .getComponentType () != null ) {
132
131
typeToRead = returnType .getComponentType ().getType ();
133
132
}
134
- return doExecute (getQueryMethod (), processor , parameterAccessor , typeToRead );
133
+ Object result = doExecute (getQueryMethod (), processor , parameterAccessor , typeToRead );
134
+ System .err .println ("execute parameterAccessor result: " + result );
135
+ return result ;
135
136
}
136
137
137
138
/**
0 commit comments