Skip to content

Commit ef2702d

Browse files
committed
Merge #4015 into 3.7.5
2 parents 9e7150c + 4de9509 commit ef2702d

File tree

46 files changed

+251
-126
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+251
-126
lines changed

reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/TimedScheduler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2024 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2022-2025 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -130,6 +130,7 @@ public void dispose() {
130130
delegate.dispose();
131131
}
132132

133+
@SuppressWarnings("deprecation")
133134
@Override
134135
public void start() {
135136
delegate.start();

reactor-core/src/jcstress/java/reactor/core/publisher/FluxBufferTimeoutStressTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2023-2024 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2023-2025 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -468,6 +468,7 @@ private static void fail(FastLogger fastLogger, String msg) {
468468
throw new IllegalStateException(msg + "\n" + fastLogger);
469469
}
470470

471+
@SafeVarargs
471472
private static boolean allValuesHandled(FastLogger logger, int range, List<Object> discarded, List<List<Long>>... delivered) {
472473
if (delivered.length == 0) {
473474
return false;

reactor-core/src/jcstress/java/reactor/core/publisher/FluxSwitchMapStressTest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2021-2024 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2021-2025 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,6 +29,8 @@
2929
import org.openjdk.jcstress.infra.results.JI_Result;
3030
import org.openjdk.jcstress.infra.results.JJJJJJJ_Result;
3131
import org.reactivestreams.Publisher;
32+
import org.reactivestreams.Subscriber;
33+
import org.reactivestreams.Subscription;
3234
import reactor.core.CoreSubscriber;
3335
import reactor.core.Exceptions;
3436
import reactor.core.publisher.FluxSwitchMapNoPrefetch.SwitchMapMain;
@@ -37,13 +39,16 @@
3739

3840
import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;
3941

42+
@SuppressWarnings({"rawtypes", "unchecked"})
4043
public abstract class FluxSwitchMapStressTest {
4144

45+
private static final CoreSubscriber<Object> DUMMY_SUBSCRIBER = Operators.emptySubscriber();
46+
4247
final FastLogger fastLogger = new FastLogger(this.getClass().getSimpleName());
4348
final StateLogger logger = new StateLogger(fastLogger);
4449

4550
final StressSubscriber<Object> stressSubscriber = new StressSubscriber<>(0);
46-
final StressSubscription stressSubscription = new StressSubscription(null);
51+
final StressSubscription stressSubscription = new StressSubscription(DUMMY_SUBSCRIBER);
4752

4853
final SwitchMapMain<Object, Object> switchMapMain =
4954
new SwitchMapMain<>(stressSubscriber, this::handle, logger);
@@ -149,7 +154,7 @@ public static class CancelInnerErrorStressTest extends FluxSwitchMapStressTest {
149154
Publisher<Object> handle(Object value) {
150155
return s -> {
151156
final StressSubscription subscription =
152-
new StressSubscription<>((CoreSubscriber) s);
157+
new StressSubscription<>((CoreSubscriber<?>) s);
153158
this.subscription = subscription;
154159
s.onSubscribe(subscription);
155160
};

reactor-core/src/jcstress/java/reactor/core/publisher/MonoDelayElementStressTest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2021-2023 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2021-2025 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,7 +51,13 @@ public static class OnNextAndRunStressTest {
5151
monoDelay = new MonoDelayElement<>(Mono.never(), 0L,
5252
TimeUnit.MILLISECONDS,
5353
virtualTimeScheduler);
54-
monoDelay.doOnSubscribe(s -> subscription = ((MonoDelayElement.DelayElementSubscriber) s)).subscribe(subscriber);
54+
55+
monoDelay.doOnSubscribe(s -> {
56+
@SuppressWarnings("unchecked")
57+
MonoDelayElement.DelayElementSubscriber<Object> elementSubscriber =
58+
(MonoDelayElement.DelayElementSubscriber<Object>) s;
59+
subscription = elementSubscriber;
60+
}).subscribe(subscriber);
5561
}
5662

5763
@Actor

reactor-core/src/jcstress/java/reactor/core/publisher/StressSubscription.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,8 @@ public class StressSubscription<T> implements Subscription {
2828
final CoreSubscriber<? super T> actual;
2929

3030
volatile long requested;
31+
32+
@SuppressWarnings("rawtypes")
3133
static final AtomicLongFieldUpdater<StressSubscription> REQUESTED =
3234
AtomicLongFieldUpdater.newUpdater(StressSubscription.class, "requested");
3335

reactor-core/src/jcstress/java/reactor/core/scheduler/BasicSchedulersStressTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2023 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2022-2025 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,6 +32,7 @@
3232

3333
public abstract class BasicSchedulersStressTest {
3434

35+
@SuppressWarnings("deprecation")
3536
private static void restart(Scheduler scheduler) {
3637
scheduler.disposeGracefully().block(Duration.ofMillis(500));
3738
// TODO: in 3.6.x: remove restart capability and this validation

reactor-core/src/main/java/reactor/core/publisher/ContextPropagation.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2024 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2022-2025 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,7 +31,6 @@
3131

3232
import io.micrometer.context.ContextSnapshotFactory;
3333
import io.micrometer.context.ThreadLocalAccessor;
34-
import reactor.core.Fuseable;
3534
import reactor.core.observability.SignalListener;
3635
import reactor.util.annotation.Nullable;
3736
import reactor.util.context.Context;
@@ -100,7 +99,7 @@ static <C> ContextSnapshot.Scope setThreadLocals(Object context) {
10099
}
101100
}
102101

103-
@SuppressWarnings("unchecked")
102+
@SuppressWarnings({"unchecked", "deprecation"})
104103
private static <V> Map<Object, Object> setThreadLocal(Object key, @Nullable V value,
105104
ThreadLocalAccessor<?> accessor, @Nullable Map<Object, Object> previousValues) {
106105

@@ -115,6 +114,7 @@ private static <V> Map<Object, Object> setThreadLocal(Object key, @Nullable V va
115114
return previousValues;
116115
}
117116

117+
@SuppressWarnings("deprecation")
118118
static ContextSnapshot captureThreadLocals() {
119119
if (ContextPropagationSupport.isContextPropagation103OnClasspath) {
120120
return globalContextSnapshotFactory.captureAll();
@@ -163,6 +163,7 @@ static Context contextCaptureToEmpty() {
163163
* @param <T> type of handled values
164164
* @param <R> the transformed type
165165
*/
166+
@SuppressWarnings("try")
166167
static <T, R> BiConsumer<T, SynchronousSink<R>> contextRestoreForHandle(BiConsumer<T, SynchronousSink<R>> handler, Supplier<Context> contextSupplier) {
167168
if (ContextPropagationSupport.shouldRestoreThreadLocalsInSomeOperators()) {
168169
final Context ctx = contextSupplier.get();
@@ -172,14 +173,16 @@ static <T, R> BiConsumer<T, SynchronousSink<R>> contextRestoreForHandle(BiConsum
172173

173174
if (ContextPropagationSupport.isContextPropagation103OnClasspath) {
174175
return (v, sink) -> {
175-
try (ContextSnapshot.Scope ignored = globalContextSnapshotFactory.setThreadLocalsFrom(ctx)) {
176+
try (ContextSnapshot.Scope ignored =
177+
globalContextSnapshotFactory.setThreadLocalsFrom(ctx)) {
176178
handler.accept(v, sink);
177179
}
178180
};
179181
}
180182
else {
181183
return (v, sink) -> {
182-
try (ContextSnapshot.Scope ignored = ContextSnapshot.setAllThreadLocalsFrom(ctx)) {
184+
try (@SuppressWarnings("deprecation") ContextSnapshot.Scope ignored =
185+
ContextSnapshot.setAllThreadLocalsFrom(ctx)) {
183186
handler.accept(v, sink);
184187
}
185188
};
@@ -234,116 +237,133 @@ public ContextRestoreSignalListener(SignalListener<T> original,
234237
this.context = context;
235238
}
236239

240+
@SuppressWarnings("deprecation")
237241
ContextSnapshot.Scope restoreThreadLocals() {
238242
return ContextSnapshot.setAllThreadLocalsFrom(this.context);
239243
}
240244

241245
@Override
246+
@SuppressWarnings("try")
242247
public final void doFirst() throws Throwable {
243248
try (ContextSnapshot.Scope ignored = restoreThreadLocals()) {
244249
original.doFirst();
245250
}
246251
}
247252

248253
@Override
254+
@SuppressWarnings("try")
249255
public final void doFinally(SignalType terminationType) throws Throwable {
250256
try (ContextSnapshot.Scope ignored = restoreThreadLocals()) {
251257
original.doFinally(terminationType);
252258
}
253259
}
254260

255261
@Override
262+
@SuppressWarnings("try")
256263
public final void doOnSubscription() throws Throwable {
257264
try (ContextSnapshot.Scope ignored = restoreThreadLocals()) {
258265
original.doOnSubscription();
259266
}
260267
}
261268

262269
@Override
270+
@SuppressWarnings("try")
263271
public final void doOnFusion(int negotiatedFusion) throws Throwable {
264272
try (ContextSnapshot.Scope ignored = restoreThreadLocals()) {
265273
original.doOnFusion(negotiatedFusion);
266274
}
267275
}
268276

269277
@Override
278+
@SuppressWarnings("try")
270279
public final void doOnRequest(long requested) throws Throwable {
271280
try (ContextSnapshot.Scope ignored = restoreThreadLocals()) {
272281
original.doOnRequest(requested);
273282
}
274283
}
275284

276285
@Override
286+
@SuppressWarnings("try")
277287
public final void doOnCancel() throws Throwable {
278288
try (ContextSnapshot.Scope ignored = restoreThreadLocals()) {
279289
original.doOnCancel();
280290
}
281291
}
282292

283293
@Override
294+
@SuppressWarnings("try")
284295
public final void doOnNext(T value) throws Throwable {
285296
try (ContextSnapshot.Scope ignored = restoreThreadLocals()) {
286297
original.doOnNext(value);
287298
}
288299
}
289300

290301
@Override
302+
@SuppressWarnings("try")
291303
public final void doOnComplete() throws Throwable {
292304
try (ContextSnapshot.Scope ignored = restoreThreadLocals()) {
293305
original.doOnComplete();
294306
}
295307
}
296308

297309
@Override
310+
@SuppressWarnings("try")
298311
public final void doOnError(Throwable error) throws Throwable {
299312
try (ContextSnapshot.Scope ignored = restoreThreadLocals()) {
300313
original.doOnError(error);
301314
}
302315
}
303316

304317
@Override
318+
@SuppressWarnings("try")
305319
public final void doAfterComplete() throws Throwable {
306320
try (ContextSnapshot.Scope ignored = restoreThreadLocals()) {
307321
original.doAfterComplete();
308322
}
309323
}
310324

311325
@Override
326+
@SuppressWarnings("try")
312327
public final void doAfterError(Throwable error) throws Throwable {
313328
try (ContextSnapshot.Scope ignored = restoreThreadLocals()) {
314329
original.doAfterError(error);
315330
}
316331
}
317332

318333
@Override
334+
@SuppressWarnings("try")
319335
public final void doOnMalformedOnNext(T value) throws Throwable {
320336
try (ContextSnapshot.Scope ignored = restoreThreadLocals()) {
321337
original.doOnMalformedOnNext(value);
322338
}
323339
}
324340

325341
@Override
342+
@SuppressWarnings("try")
326343
public final void doOnMalformedOnError(Throwable error) throws Throwable {
327344
try (ContextSnapshot.Scope ignored = restoreThreadLocals()) {
328345
original.doOnMalformedOnError(error);
329346
}
330347
}
331348

332349
@Override
350+
@SuppressWarnings("try")
333351
public final void doOnMalformedOnComplete() throws Throwable {
334352
try (ContextSnapshot.Scope ignored = restoreThreadLocals()) {
335353
original.doOnMalformedOnComplete();
336354
}
337355
}
338356

339357
@Override
358+
@SuppressWarnings("try")
340359
public final void handleListenerError(Throwable listenerError) {
341360
try (ContextSnapshot.Scope ignored = restoreThreadLocals()) {
342361
original.handleListenerError(listenerError);
343362
}
344363
}
345364

346365
@Override
366+
@SuppressWarnings("try")
347367
public final Context addToContext(Context originalContext) {
348368
try (ContextSnapshot.Scope ignored = restoreThreadLocals()) {
349369
return original.addToContext(originalContext);
@@ -492,10 +512,12 @@ public void close() {
492512
}
493513
}
494514

495-
@SuppressWarnings("unchecked")
515+
@SuppressWarnings("deprecation")
496516
private <V> void resetThreadLocalValue(ThreadLocalAccessor<?> accessor, @Nullable V previousValue) {
497517
if (previousValue != null) {
498-
((ThreadLocalAccessor<V>) accessor).restore(previousValue);
518+
@SuppressWarnings("unchecked")
519+
ThreadLocalAccessor<V> typedAccessor = (ThreadLocalAccessor<V>) accessor;
520+
typedAccessor.restore(previousValue);
499521
}
500522
else {
501523
accessor.reset();
@@ -530,10 +552,12 @@ public void close() {
530552
}
531553
}
532554

533-
@SuppressWarnings("unchecked")
555+
@SuppressWarnings("deprecation")
534556
private <V> void resetThreadLocalValue(ThreadLocalAccessor<?> accessor, @Nullable V previousValue) {
535557
if (previousValue != null) {
536-
((ThreadLocalAccessor<V>) accessor).setValue(previousValue);
558+
@SuppressWarnings("unchecked")
559+
ThreadLocalAccessor<V> typedAccessor = (ThreadLocalAccessor<V>) accessor;
560+
typedAccessor.setValue(previousValue);
537561
}
538562
else {
539563
accessor.reset();

reactor-core/src/main/java/reactor/core/publisher/Flux.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -10842,6 +10842,7 @@ public final <T2> Flux<Tuple2<T, T2>> zipWith(Publisher<? extends T2> source2) {
1084210842
public final <T2, V> Flux<V> zipWith(Publisher<? extends T2> source2,
1084310843
final BiFunction<? super T, ? super T2, ? extends V> combinator) {
1084410844
if (this instanceof FluxZip) {
10845+
@SuppressWarnings("unchecked")
1084510846
FluxZip<T, V> o = (FluxZip<T, V>) this;
1084610847
Flux<V> result = o.zipAdditionalSource(source2, combinator);
1084710848
if (result != null) {

reactor-core/src/main/java/reactor/core/publisher/FluxTapFuseable.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2023 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2022-2025 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -147,8 +147,11 @@ public void onSubscribe(Subscription s) {
147147
return;
148148
}
149149
this.s = s;
150-
//noinspection unchecked
151-
this.qs = (QueueSubscription<T>) s;
150+
151+
@SuppressWarnings("unchecked")
152+
QueueSubscription<T> qs = (QueueSubscription<T>) s;
153+
154+
this.qs = qs;
152155

153156
try {
154157
listener.doOnSubscription();

0 commit comments

Comments
 (0)