Skip to content

Commit e768e8e

Browse files
committed
DATACMNS-947 - Adopt ReactiveTypeDescriptor and ReactiveAdapterRegistry changes.
Remove our own ReactiveWrappers.Descriptor type in favor of ReactiveTypeDescriptor. Adopt changes in ReactiveAdapterRegistry.
1 parent cb6d2f7 commit e768e8e

File tree

3 files changed

+60
-105
lines changed

3 files changed

+60
-105
lines changed

src/main/java/org/springframework/data/repository/util/ReactiveWrapperConverters.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ private static ConversionService registerConvertersIn(ConfigurableConversionServ
169169
*/
170170
public static boolean supports(Class<?> type) {
171171
return RegistryHolder.REACTIVE_ADAPTER_REGISTRY != null
172-
&& RegistryHolder.REACTIVE_ADAPTER_REGISTRY.getAdapterFrom(type) != null;
172+
&& RegistryHolder.REACTIVE_ADAPTER_REGISTRY.getAdapter(type) != null;
173173
}
174174

175175
/**
@@ -476,7 +476,7 @@ private enum PublisherToRxJava1SingleConverter implements Converter<Publisher<?>
476476

477477
@Override
478478
public Single<?> convert(Publisher<?> source) {
479-
return (Single<?>) REACTIVE_ADAPTER_REGISTRY.getAdapterTo(Single.class).fromPublisher(Mono.from(source));
479+
return (Single<?>) REACTIVE_ADAPTER_REGISTRY.getAdapter(Single.class).fromPublisher(Mono.from(source));
480480
}
481481
}
482482

@@ -492,7 +492,7 @@ private enum PublisherToRxJava1CompletableConverter implements Converter<Publish
492492

493493
@Override
494494
public Completable convert(Publisher<?> source) {
495-
return (Completable) REACTIVE_ADAPTER_REGISTRY.getAdapterTo(Completable.class).fromPublisher(source);
495+
return (Completable) REACTIVE_ADAPTER_REGISTRY.getAdapter(Completable.class).fromPublisher(source);
496496
}
497497
}
498498

@@ -508,7 +508,7 @@ private enum PublisherToRxJava1ObservableConverter implements Converter<Publishe
508508

509509
@Override
510510
public Observable<?> convert(Publisher<?> source) {
511-
return (Observable<?>) REACTIVE_ADAPTER_REGISTRY.getAdapterTo(Observable.class).fromPublisher(Flux.from(source));
511+
return (Observable<?>) REACTIVE_ADAPTER_REGISTRY.getAdapter(Observable.class).fromPublisher(Flux.from(source));
512512
}
513513
}
514514

@@ -524,7 +524,7 @@ private enum RxJava1SingleToPublisherConverter implements Converter<Single<?>, P
524524

525525
@Override
526526
public Publisher<?> convert(Single<?> source) {
527-
return Flux.defer(() -> REACTIVE_ADAPTER_REGISTRY.getAdapterFrom(Single.class).toPublisher(source));
527+
return Flux.defer(() -> REACTIVE_ADAPTER_REGISTRY.getAdapter(Single.class).toPublisher(source));
528528
}
529529
}
530530

@@ -540,7 +540,7 @@ private enum RxJava1SingleToMonoConverter implements Converter<Single<?>, Mono<?
540540

541541
@Override
542542
public Mono<?> convert(Single<?> source) {
543-
return Mono.defer(() -> REACTIVE_ADAPTER_REGISTRY.getAdapterFrom(Single.class).toMono(source));
543+
return Mono.defer(() -> Mono.from(REACTIVE_ADAPTER_REGISTRY.getAdapter(Single.class).toPublisher(source)));
544544
}
545545
}
546546

@@ -556,7 +556,7 @@ private enum RxJava1SingleToFluxConverter implements Converter<Single<?>, Flux<?
556556

557557
@Override
558558
public Flux<?> convert(Single<?> source) {
559-
return Flux.defer(() -> REACTIVE_ADAPTER_REGISTRY.getAdapterFrom(Single.class).toFlux(source));
559+
return Flux.defer(() -> REACTIVE_ADAPTER_REGISTRY.getAdapter(Single.class).toPublisher(source));
560560
}
561561
}
562562

@@ -572,7 +572,7 @@ private enum RxJava1CompletableToPublisherConverter implements Converter<Complet
572572

573573
@Override
574574
public Publisher<?> convert(Completable source) {
575-
return Flux.defer(() -> REACTIVE_ADAPTER_REGISTRY.getAdapterFrom(Completable.class).toFlux(source));
575+
return Flux.defer(() -> REACTIVE_ADAPTER_REGISTRY.getAdapter(Completable.class).toPublisher(source));
576576
}
577577
}
578578

@@ -604,7 +604,7 @@ private enum RxJava1ObservableToPublisherConverter implements Converter<Observab
604604

605605
@Override
606606
public Publisher<?> convert(Observable<?> source) {
607-
return Flux.defer(() -> REACTIVE_ADAPTER_REGISTRY.getAdapterFrom(Observable.class).toFlux(source));
607+
return Flux.defer(() -> REACTIVE_ADAPTER_REGISTRY.getAdapter(Observable.class).toPublisher(source));
608608
}
609609
}
610610

@@ -620,7 +620,7 @@ private enum RxJava1ObservableToMonoConverter implements Converter<Observable<?>
620620

621621
@Override
622622
public Mono<?> convert(Observable<?> source) {
623-
return Mono.defer(() -> REACTIVE_ADAPTER_REGISTRY.getAdapterFrom(Observable.class).toMono(source));
623+
return Mono.defer(() -> Mono.from(REACTIVE_ADAPTER_REGISTRY.getAdapter(Observable.class).toPublisher(source)));
624624
}
625625
}
626626

@@ -636,7 +636,7 @@ private enum RxJava1ObservableToFluxConverter implements Converter<Observable<?>
636636

637637
@Override
638638
public Flux<?> convert(Observable<?> source) {
639-
return Flux.defer(() -> REACTIVE_ADAPTER_REGISTRY.getAdapterFrom(Observable.class).toFlux(source));
639+
return Flux.defer(() -> REACTIVE_ADAPTER_REGISTRY.getAdapter(Observable.class).toPublisher(source));
640640
}
641641
}
642642

@@ -688,7 +688,7 @@ private enum PublisherToRxJava2SingleConverter implements Converter<Publisher<?>
688688

689689
@Override
690690
public io.reactivex.Single<?> convert(Publisher<?> source) {
691-
return (io.reactivex.Single<?>) REACTIVE_ADAPTER_REGISTRY.getAdapterTo(io.reactivex.Single.class)
691+
return (io.reactivex.Single<?>) REACTIVE_ADAPTER_REGISTRY.getAdapter(io.reactivex.Single.class)
692692
.fromPublisher(source);
693693
}
694694
}
@@ -705,7 +705,7 @@ private enum PublisherToRxJava2CompletableConverter implements Converter<Publish
705705

706706
@Override
707707
public io.reactivex.Completable convert(Publisher<?> source) {
708-
return (io.reactivex.Completable) REACTIVE_ADAPTER_REGISTRY.getAdapterTo(io.reactivex.Completable.class)
708+
return (io.reactivex.Completable) REACTIVE_ADAPTER_REGISTRY.getAdapter(io.reactivex.Completable.class)
709709
.fromPublisher(source);
710710
}
711711
}
@@ -722,7 +722,7 @@ private enum PublisherToRxJava2ObservableConverter implements Converter<Publishe
722722

723723
@Override
724724
public io.reactivex.Observable<?> convert(Publisher<?> source) {
725-
return (io.reactivex.Observable<?>) REACTIVE_ADAPTER_REGISTRY.getAdapterTo(io.reactivex.Single.class)
725+
return (io.reactivex.Observable<?>) REACTIVE_ADAPTER_REGISTRY.getAdapter(io.reactivex.Single.class)
726726
.fromPublisher(source);
727727
}
728728
}
@@ -739,7 +739,7 @@ private enum RxJava2SingleToPublisherConverter implements Converter<io.reactivex
739739

740740
@Override
741741
public Publisher<?> convert(io.reactivex.Single<?> source) {
742-
return REACTIVE_ADAPTER_REGISTRY.getAdapterFrom(io.reactivex.Single.class).toMono(source);
742+
return REACTIVE_ADAPTER_REGISTRY.getAdapter(io.reactivex.Single.class).toPublisher(source);
743743
}
744744
}
745745

@@ -755,7 +755,7 @@ private enum RxJava2SingleToMonoConverter implements Converter<io.reactivex.Sing
755755

756756
@Override
757757
public Mono<?> convert(io.reactivex.Single<?> source) {
758-
return REACTIVE_ADAPTER_REGISTRY.getAdapterFrom(io.reactivex.Single.class).toMono(source);
758+
return Mono.from(REACTIVE_ADAPTER_REGISTRY.getAdapter(io.reactivex.Single.class).toPublisher(source));
759759
}
760760
}
761761

@@ -771,7 +771,7 @@ private enum RxJava2SingleToFluxConverter implements Converter<io.reactivex.Sing
771771

772772
@Override
773773
public Flux<?> convert(io.reactivex.Single<?> source) {
774-
return REACTIVE_ADAPTER_REGISTRY.getAdapterFrom(io.reactivex.Single.class).toFlux(source);
774+
return Flux.from(REACTIVE_ADAPTER_REGISTRY.getAdapter(io.reactivex.Single.class).toPublisher(source));
775775
}
776776
}
777777

@@ -787,7 +787,7 @@ private enum RxJava2CompletableToPublisherConverter implements Converter<io.reac
787787

788788
@Override
789789
public Publisher<?> convert(io.reactivex.Completable source) {
790-
return REACTIVE_ADAPTER_REGISTRY.getAdapterFrom(io.reactivex.Completable.class).toFlux(source);
790+
return REACTIVE_ADAPTER_REGISTRY.getAdapter(io.reactivex.Completable.class).toPublisher(source);
791791
}
792792
}
793793

@@ -899,7 +899,7 @@ private enum PublisherToRxJava2MaybeConverter implements Converter<Publisher<?>,
899899

900900
@Override
901901
public io.reactivex.Maybe<?> convert(Publisher<?> source) {
902-
return (io.reactivex.Maybe<?>) REACTIVE_ADAPTER_REGISTRY.getAdapterTo(Maybe.class).fromPublisher(source);
902+
return (io.reactivex.Maybe<?>) REACTIVE_ADAPTER_REGISTRY.getAdapter(Maybe.class).fromPublisher(source);
903903
}
904904
}
905905

src/main/java/org/springframework/data/repository/util/ReactiveWrappers.java

Lines changed: 39 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,23 @@
1515
*/
1616
package org.springframework.data.repository.util;
1717

18-
import lombok.AccessLevel;
19-
import lombok.RequiredArgsConstructor;
20-
import lombok.Value;
2118
import lombok.experimental.UtilityClass;
2219
import reactor.core.publisher.Flux;
2320
import reactor.core.publisher.Mono;
2421
import rx.Completable;
2522
import rx.Observable;
2623
import rx.Single;
2724

25+
import java.util.ArrayList;
2826
import java.util.Arrays;
2927
import java.util.Collection;
3028
import java.util.Collections;
31-
import java.util.LinkedHashMap;
32-
import java.util.Map;
33-
import java.util.Map.Entry;
3429
import java.util.Optional;
3530
import java.util.stream.Collectors;
3631

3732
import org.reactivestreams.Publisher;
38-
import org.springframework.core.ReactiveAdapter;
33+
34+
import org.springframework.core.ReactiveTypeDescriptor;
3935
import org.springframework.data.util.ReflectionUtils;
4036
import org.springframework.util.Assert;
4137
import org.springframework.util.ClassUtils;
@@ -75,7 +71,7 @@ public class ReactiveWrappers {
7571
private static final boolean RXJAVA2_PRESENT = ClassUtils.isPresent("io.reactivex.Flowable",
7672
ReactiveWrappers.class.getClassLoader());
7773

78-
private static final Map<Class<?>, Descriptor> REACTIVE_WRAPPERS;
74+
private static final Collection<ReactiveTypeDescriptor> REACTIVE_WRAPPERS;
7975

8076
/**
8177
* Enumeration of supported reactive libraries.
@@ -88,32 +84,36 @@ static enum ReactiveLibrary {
8884

8985
static {
9086

91-
Map<Class<?>, Descriptor> reactiveWrappers = new LinkedHashMap<>(5);
87+
Collection<ReactiveTypeDescriptor> reactiveWrappers = new ArrayList<>(5);
9288

9389
if (RXJAVA1_PRESENT) {
9490

95-
reactiveWrappers.put(Single.class, Descriptor.forSingleValue().forValue());
96-
reactiveWrappers.put(Completable.class, Descriptor.forSingleValue().forNoValue());
97-
reactiveWrappers.put(Observable.class, Descriptor.forMultiValue().forValue());
91+
reactiveWrappers.add(ReactiveTypeDescriptor.singleRequiredValue(Single.class));
92+
reactiveWrappers.add(ReactiveTypeDescriptor.noValue(Completable.class, Completable::complete));
93+
reactiveWrappers.add(ReactiveTypeDescriptor.multiValue(Observable.class, Observable::empty));
9894
}
9995

10096
if (RXJAVA2_PRESENT) {
10197

102-
reactiveWrappers.put(io.reactivex.Single.class, Descriptor.forSingleValue().forValue());
103-
reactiveWrappers.put(io.reactivex.Maybe.class, Descriptor.forSingleValue().forValue());
104-
reactiveWrappers.put(io.reactivex.Completable.class, Descriptor.forSingleValue().forNoValue());
105-
reactiveWrappers.put(io.reactivex.Flowable.class, Descriptor.forMultiValue().forValue());
106-
reactiveWrappers.put(io.reactivex.Observable.class, Descriptor.forMultiValue().forValue());
98+
reactiveWrappers.add(ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class));
99+
reactiveWrappers
100+
.add(ReactiveTypeDescriptor.singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty));
101+
reactiveWrappers
102+
.add(ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete));
103+
reactiveWrappers
104+
.add(ReactiveTypeDescriptor.multiValue(io.reactivex.Flowable.class, io.reactivex.Flowable::empty));
105+
reactiveWrappers
106+
.add(ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty));
107107
}
108108

109109
if (PROJECT_REACTOR_PRESENT) {
110110

111-
reactiveWrappers.put(Mono.class, Descriptor.forSingleValue().forValue());
112-
reactiveWrappers.put(Flux.class, Descriptor.forMultiValue().forNoValue());
113-
reactiveWrappers.put(Publisher.class, Descriptor.forMultiValue().forNoValue());
111+
reactiveWrappers.add(ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty));
112+
reactiveWrappers.add(ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty));
113+
reactiveWrappers.add(ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty));
114114
}
115115

116-
REACTIVE_WRAPPERS = Collections.unmodifiableMap(reactiveWrappers);
116+
REACTIVE_WRAPPERS = Collections.unmodifiableCollection(reactiveWrappers);
117117
}
118118

119119
/**
@@ -183,7 +183,7 @@ public static boolean isNoValueType(Class<?> type) {
183183

184184
Assert.notNull(type, "Candidate type must not be null!");
185185

186-
return findDescriptor(type).map(Descriptor::isNoValue).orElse(false);
186+
return findDescriptor(type).map(ReactiveTypeDescriptor::isNoValue).orElse(false);
187187
}
188188

189189
/**
@@ -213,7 +213,8 @@ public static boolean isMultiValueType(Class<?> type) {
213213

214214
// Prevent single-types with a multi-hierarchy supertype to be reported as multi type
215215
// See Mono implements Publisher
216-
return isSingleValueType(type) ? false : findDescriptor(type).map(Descriptor::isMultiValue).orElse(false);
216+
return isSingleValueType(type) ? false
217+
: findDescriptor(type).map(ReactiveTypeDescriptor::isMultiValue).orElse(false);
217218
}
218219

219220
/**
@@ -223,9 +224,10 @@ public static boolean isMultiValueType(Class<?> type) {
223224
*/
224225
public static Collection<Class<?>> getNoValueTypes() {
225226

226-
return REACTIVE_WRAPPERS.entrySet().stream()//
227-
.filter(entry -> entry.getValue().isNoValue())//
228-
.map(Entry::getKey).collect(Collectors.toList());
227+
return REACTIVE_WRAPPERS.stream()//
228+
.filter(ReactiveTypeDescriptor::isNoValue)//
229+
.map(ReactiveTypeDescriptor::getReactiveType)//
230+
.collect(Collectors.toList());
229231
}
230232

231233
/**
@@ -235,9 +237,9 @@ public static Collection<Class<?>> getNoValueTypes() {
235237
*/
236238
public static Collection<Class<?>> getSingleValueTypes() {
237239

238-
return REACTIVE_WRAPPERS.entrySet().stream()//
239-
.filter(entry -> !entry.getValue().isMultiValue())//
240-
.map(Entry::getKey).collect(Collectors.toList());
240+
return REACTIVE_WRAPPERS.stream()//
241+
.filter(entry -> !entry.isMultiValue())//
242+
.map(ReactiveTypeDescriptor::getReactiveType).collect(Collectors.toList());
241243
}
242244

243245
/**
@@ -247,9 +249,9 @@ public static Collection<Class<?>> getSingleValueTypes() {
247249
*/
248250
public static Collection<Class<?>> getMultiValueTypes() {
249251

250-
return REACTIVE_WRAPPERS.entrySet().stream()//
251-
.filter(entry -> entry.getValue().isMultiValue())//
252-
.map(Entry::getKey)//
252+
return REACTIVE_WRAPPERS.stream()//
253+
.filter(ReactiveTypeDescriptor::isMultiValue)//
254+
.map(ReactiveTypeDescriptor::getReactiveType)//
253255
.collect(Collectors.toList());
254256
}
255257

@@ -267,64 +269,17 @@ private static boolean isWrapper(Class<?> type) {
267269
}
268270

269271
/**
270-
* Looks up a {@link Descriptor} for the given wrapper type.
272+
* Looks up a {@link ReactiveTypeDescriptor} for the given wrapper type.
271273
*
272274
* @param type must not be {@literal null}.
273275
* @return
274276
*/
275-
private static Optional<Descriptor> findDescriptor(Class<?> type) {
277+
private static Optional<ReactiveTypeDescriptor> findDescriptor(Class<?> type) {
276278

277279
Assert.notNull(type, "Wrapper type must not be null!");
278280

279-
return REACTIVE_WRAPPERS.entrySet().stream()//
280-
.filter(it -> ClassUtils.isAssignable(it.getKey(), type))//
281-
.findFirst().map(it -> it.getValue());
282-
}
283-
284-
/**
285-
* Basically a copy of Spring's {@link ReactiveAdapter.Descriptor} but without introducing the strong dependency to
286-
* Reactor so that we can safely use the class in non-reactive environments.
287-
*
288-
* @author Oliver Gierke
289-
* @since 2.0
290-
*/
291-
@Value
292-
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
293-
private static class Descriptor {
294-
295-
/**
296-
* Return {@code true} if the adaptee implies 0..N values can be produced and is therefore a good fit to adapt to
297-
* {@link Flux}. A {@code false} return value implies the adaptee will produce 1 value at most and is therefore a
298-
* good fit for {@link Mono}.
299-
*/
300-
private final boolean multiValue;
301-
302-
/**
303-
* Return {@code true} if the adaptee implies no values will be produced, i.e. providing only completion or error
304-
* signal.
305-
*/
306-
private final boolean noValue;
307-
308-
public static DescriptorBuilder forSingleValue() {
309-
return new DescriptorBuilder(false);
310-
}
311-
312-
public static DescriptorBuilder forMultiValue() {
313-
return new DescriptorBuilder(true);
314-
}
315-
316-
@RequiredArgsConstructor
317-
static class DescriptorBuilder {
318-
319-
private final boolean multi;
320-
321-
public Descriptor forValue() {
322-
return new Descriptor(multi, false);
323-
}
324-
325-
public Descriptor forNoValue() {
326-
return new Descriptor(multi, true);
327-
}
328-
}
281+
return REACTIVE_WRAPPERS.stream()//
282+
.filter(it -> ClassUtils.isAssignable(it.getReactiveType(), type))//
283+
.findFirst();
329284
}
330285
}

0 commit comments

Comments
 (0)