Skip to content

Commit 6d6269f

Browse files
committed
Switch to Reactor 2020.0.0 snapshots
A switch to RSocket 1.0.1 snapshots is also required to pick up a for froward compatibility with Reactor Netty 1.0. See gh-25085
1 parent 42ff01b commit 6d6269f

File tree

13 files changed

+84
-45
lines changed

13 files changed

+84
-45
lines changed

build.gradle

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ configure(allprojects) { project ->
2525
imports {
2626
mavenBom "com.fasterxml.jackson:jackson-bom:2.11.0"
2727
mavenBom "io.netty:netty-bom:4.1.50.Final"
28-
mavenBom "io.projectreactor:reactor-bom:Dysprosium-SR7"
29-
mavenBom "io.rsocket:rsocket-bom:1.0.0"
28+
mavenBom "io.projectreactor:reactor-bom:2020.0.0-SNAPSHOT"
29+
mavenBom "io.rsocket:rsocket-bom:1.0.1-SNAPSHOT"
3030
mavenBom "org.eclipse.jetty:jetty-bom:9.4.29.v20200521"
3131
mavenBom "org.jetbrains.kotlin:kotlin-bom:1.3.72"
3232
mavenBom "org.jetbrains.kotlinx:kotlinx-coroutines-bom:1.3.5"
@@ -281,6 +281,8 @@ configure(allprojects) { project ->
281281
repositories {
282282
mavenCentral()
283283
maven { url "https://repo.spring.io/libs-spring-framework-build" }
284+
maven { url "https://repo.spring.io/snapshot" } // Reactor
285+
maven { url "https://oss.jfrog.org/artifactory/oss-snapshot-local" } // RSocket
284286
}
285287
}
286288
configurations.all {

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.time.Duration;
2020
import java.util.Collection;
2121
import java.util.List;
22-
import java.util.Optional;
2322
import java.util.function.BiFunction;
2423
import java.util.function.Consumer;
2524
import java.util.function.Function;
@@ -34,7 +33,6 @@
3433
import org.apache.commons.logging.LogFactory;
3534
import org.reactivestreams.Publisher;
3635
import reactor.core.publisher.DirectProcessor;
37-
import reactor.core.publisher.Flux;
3836
import reactor.core.publisher.Mono;
3937
import reactor.core.publisher.MonoProcessor;
4038
import reactor.core.scheduler.Scheduler;
@@ -46,6 +44,7 @@
4644
import reactor.netty.resources.ConnectionProvider;
4745
import reactor.netty.resources.LoopResources;
4846
import reactor.netty.tcp.TcpClient;
47+
import reactor.util.retry.Retry;
4948

5049
import org.springframework.lang.Nullable;
5150
import org.springframework.messaging.Message;
@@ -103,14 +102,13 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
103102
* @param codec for encoding and decoding the input/output byte streams
104103
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
105104
*/
106-
@SuppressWarnings("deprecation")
107105
public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec) {
108106
Assert.notNull(host, "host is required");
109107
Assert.notNull(codec, "ReactorNettyCodec is required");
110108

111109
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
112110
this.loopResources = LoopResources.create("tcp-client-loop");
113-
this.poolResources = ConnectionProvider.fixed("tcp-client-pool", 10000);
111+
this.poolResources = ConnectionProvider.create("tcp-client-pool", 10000);
114112
this.codec = codec;
115113

116114
this.tcpClient = TcpClient.create(this.poolResources)
@@ -129,13 +127,12 @@ public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec)
129127
* @since 5.1.3
130128
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
131129
*/
132-
@SuppressWarnings("deprecation")
133130
public ReactorNettyTcpClient(Function<TcpClient, TcpClient> clientConfigurer, ReactorNettyCodec<P> codec) {
134131
Assert.notNull(codec, "ReactorNettyCodec is required");
135132

136133
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
137134
this.loopResources = LoopResources.create("tcp-client-loop");
138-
this.poolResources = ConnectionProvider.fixed("tcp-client-pool", 10000);
135+
this.poolResources = ConnectionProvider.create("tcp-client-pool", 10000);
139136
this.codec = codec;
140137

141138
this.tcpClient = clientConfigurer.apply(TcpClient
@@ -199,7 +196,6 @@ public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
199196
}
200197

201198
@Override
202-
@SuppressWarnings("deprecation")
203199
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
204200
Assert.notNull(handler, "TcpConnectionHandler is required");
205201
Assert.notNull(strategy, "ReconnectStrategy is required");
@@ -218,8 +214,12 @@ public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, Reconnect
218214
.doOnError(updateConnectMono(connectMono))
219215
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
220216
.flatMap(Connection::onDispose) // post-connect issues
221-
.retryWhen(reconnectFunction(strategy))
222-
.repeatWhen(reconnectFunction(strategy))
217+
.retryWhen(Retry.from(signals -> signals
218+
.map(retrySignal -> (int) retrySignal.totalRetriesInARow())
219+
.flatMap(attempt -> reconnect(attempt, strategy))))
220+
.repeatWhen(flux -> flux
221+
.scan(1, (count, element) -> count++)
222+
.flatMap(attempt -> reconnect(attempt, strategy)))
223223
.subscribe();
224224

225225
return new MonoToListenableFutureAdapter<>(connectMono);
@@ -244,12 +244,9 @@ private <T> Consumer<T> updateConnectMono(MonoProcessor<Void> connectMono) {
244244
};
245245
}
246246

247-
private <T> Function<Flux<T>, Publisher<?>> reconnectFunction(ReconnectStrategy reconnectStrategy) {
248-
return flux -> flux
249-
.scan(1, (count, element) -> count++)
250-
.flatMap(attempt -> Optional.ofNullable(reconnectStrategy.getTimeToNextAttempt(attempt))
251-
.map(time -> Mono.delay(Duration.ofMillis(time), this.scheduler))
252-
.orElse(Mono.empty()));
247+
private Publisher<? extends Long> reconnect(Integer attempt, ReconnectStrategy reconnectStrategy) {
248+
Long time = reconnectStrategy.getTimeToNextAttempt(attempt);
249+
return (time != null ? Mono.delay(Duration.ofMillis(time), this.scheduler) : Mono.empty());
253250
}
254251

255252
@Override
@@ -342,7 +339,7 @@ private static class StompMessageDecoder<P> extends ByteToMessageDecoder {
342339

343340
private final ReactorNettyCodec<P> codec;
344341

345-
public StompMessageDecoder(ReactorNettyCodec<P> codec) {
342+
StompMessageDecoder(ReactorNettyCodec<P> codec) {
346343
this.codec = codec;
347344
}
348345

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -64,13 +64,11 @@ public ListenableFuture<Void> send(Message<P> message) {
6464
}
6565

6666
@Override
67-
@SuppressWarnings("deprecation")
6867
public void onReadInactivity(Runnable runnable, long inactivityDuration) {
6968
this.inbound.withConnection(conn -> conn.onReadIdle(inactivityDuration, runnable));
7069
}
7170

7271
@Override
73-
@SuppressWarnings("deprecation")
7472
public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
7573
this.inbound.withConnection(conn -> conn.onWriteIdle(inactivityDuration, runnable));
7674
}

spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252

5353
import static org.assertj.core.api.Assertions.assertThat;
5454
import static org.mockito.ArgumentMatchers.any;
55-
import static org.mockito.ArgumentMatchers.anyInt;
5655
import static org.mockito.BDDMockito.given;
5756
import static org.mockito.Mockito.mock;
5857
import static org.mockito.Mockito.verify;
@@ -75,7 +74,7 @@ public class DefaultRSocketRequesterBuilderTests {
7574
@BeforeEach
7675
public void setup() {
7776
this.transport = mock(ClientTransport.class);
78-
given(this.transport.connect(anyInt())).willReturn(Mono.just(this.connection));
77+
given(this.transport.connect()).willReturn(Mono.just(this.connection));
7978
}
8079

8180

@@ -106,7 +105,7 @@ public void rsocketConnectorConfigurer() {
106105

107106
// RSocketStrategies and RSocketConnector configurers should have been called
108107

109-
verify(this.transport).connect(anyInt());
108+
verify(this.transport).connect();
110109
verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class));
111110
verify(factoryConfigurer).configure(any(io.rsocket.RSocketFactory.ClientRSocketFactory.class));
112111
assertThat(this.connectorConfigurer.connector()).isNotNull();

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ private static HttpClient initHttpClient(ReactorResourceFactory resourceFactory)
8282
LoopResources resources = resourceFactory.getLoopResources();
8383
Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?");
8484
Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?");
85-
return HttpClient.create(provider).tcpConfiguration(tcpClient -> tcpClient.runOn(resources));
85+
return HttpClient.create(provider).runOn(resources);
8686
}
8787

8888
/**

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean
4747
@Nullable
4848
private Consumer<HttpResources> globalResourcesConsumer;
4949

50-
@SuppressWarnings("deprecation")
51-
private Supplier<ConnectionProvider> connectionProviderSupplier = () -> ConnectionProvider.fixed("webflux", 500);
50+
private Supplier<ConnectionProvider> connectionProviderSupplier = () -> ConnectionProvider.create("webflux", 500);
5251

5352
@Nullable
5453
private ConnectionProvider connectionProvider;

spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/ReactorHttpServer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.web.testfixture.http.server.reactive.bootstrap;
1818

19+
import java.net.InetSocketAddress;
1920
import java.util.concurrent.atomic.AtomicReference;
2021

2122
import reactor.netty.DisposableServer;
@@ -38,8 +39,7 @@ public class ReactorHttpServer extends AbstractHttpServer {
3839
protected void initServer() {
3940
this.reactorHandler = createHttpHandlerAdapter();
4041
this.reactorServer = reactor.netty.http.server.HttpServer.create()
41-
.tcpConfiguration(server -> server.host(getHost()))
42-
.port(getPort());
42+
.host(getHost()).port(getPort());
4343
}
4444

4545
private ReactorHttpHandlerAdapter createHttpHandlerAdapter() {
@@ -49,7 +49,7 @@ private ReactorHttpHandlerAdapter createHttpHandlerAdapter() {
4949
@Override
5050
protected void startInternal() {
5151
DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block();
52-
setPort(server.address().getPort());
52+
setPort(((InetSocketAddress) server.address()).getPort());
5353
this.serverRef.set(server);
5454
}
5555

spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/ReactorHttpsServer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.web.testfixture.http.server.reactive.bootstrap;
1818

19+
import java.net.InetSocketAddress;
1920
import java.util.concurrent.atomic.AtomicReference;
2021

2122
import io.netty.handler.ssl.SslContextBuilder;
@@ -57,7 +58,7 @@ private ReactorHttpHandlerAdapter createHttpHandlerAdapter() {
5758
@Override
5859
protected void startInternal() {
5960
DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block();
60-
setPort(server.address().getPort());
61+
setPort(((InetSocketAddress) server.address()).getPort());
6162
this.serverRef.set(server);
6263
}
6364

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,8 @@ public WebClient.Builder exchangeStrategies(ExchangeStrategies strategies) {
228228
return this;
229229
}
230230

231-
@SuppressWarnings("deprecation")
232231
@Override
232+
@SuppressWarnings("deprecation")
233233
public WebClient.Builder exchangeStrategies(Consumer<ExchangeStrategies.Builder> configurer) {
234234
if (this.strategiesConfigurers == null) {
235235
this.strategiesConfigurers = new ArrayList<>(4);

spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ public CssLinkResourceTransformer() {
7070
}
7171

7272

73-
@SuppressWarnings("deprecation")
7473
@Override
74+
@SuppressWarnings("deprecation")
7575
public Mono<Resource> transform(ServerWebExchange exchange, Resource inputResource,
7676
ResourceTransformerChain transformerChain) {
7777

spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@
1717
package org.springframework.web.reactive.socket.client;
1818

1919
import java.net.URI;
20+
import java.util.function.Supplier;
2021

2122
import org.apache.commons.logging.Log;
2223
import org.apache.commons.logging.LogFactory;
2324
import reactor.core.publisher.Mono;
2425
import reactor.netty.http.client.HttpClient;
26+
import reactor.netty.http.client.WebsocketClientSpec;
2527
import reactor.netty.http.websocket.WebsocketInbound;
2628

2729
import org.springframework.core.io.buffer.NettyDataBufferFactory;
2830
import org.springframework.http.HttpHeaders;
31+
import org.springframework.lang.Nullable;
2932
import org.springframework.util.Assert;
3033
import org.springframework.util.StringUtils;
3134
import org.springframework.web.reactive.socket.HandshakeInfo;
@@ -47,9 +50,13 @@ public class ReactorNettyWebSocketClient implements WebSocketClient {
4750

4851
private final HttpClient httpClient;
4952

50-
private int maxFramePayloadLength = NettyWebSocketSessionSupport.DEFAULT_FRAME_MAX_SIZE;
53+
private final Supplier<WebsocketClientSpec.Builder> specBuilderSupplier;
5154

52-
private boolean handlePing;
55+
@Nullable
56+
private Integer maxFramePayloadLength = NettyWebSocketSessionSupport.DEFAULT_FRAME_MAX_SIZE;
57+
58+
@Nullable
59+
private Boolean handlePing;
5360

5461

5562
/**
@@ -60,12 +67,25 @@ public ReactorNettyWebSocketClient() {
6067
}
6168

6269
/**
63-
* Constructor that accepts an existing {@link HttpClient} builder.
70+
* Constructor that accepts an existing {@link HttpClient}.
6471
* @since 5.1
6572
*/
6673
public ReactorNettyWebSocketClient(HttpClient httpClient) {
74+
this(httpClient, WebsocketClientSpec.builder());
75+
}
76+
77+
/**
78+
* Constructor with an {@link HttpClient} and a supplier for the
79+
* {@link WebsocketClientSpec.Builder} to use.
80+
* @since 5.3
81+
*/
82+
public ReactorNettyWebSocketClient(
83+
HttpClient httpClient, Supplier<WebsocketClientSpec.Builder> builderSupplier) {
84+
6785
Assert.notNull(httpClient, "HttpClient is required");
86+
Assert.notNull(builderSupplier, "WebsocketClientSpec.Builder is required");
6887
this.httpClient = httpClient;
88+
this.specBuilderSupplier = builderSupplier;
6989
}
7090

7191

@@ -76,6 +96,31 @@ public HttpClient getHttpClient() {
7696
return this.httpClient;
7797
}
7898

99+
/**
100+
* Build an instance of {@code WebsocketClientSpec} that reflects the current
101+
* configuration. This can be used to check the configured parameters except
102+
* for sub-protocols which depend on the {@link WebSocketHandler} that is used
103+
* for a given upgrade.
104+
* @since 5.3
105+
*/
106+
public WebsocketClientSpec getWebsocketClientSpec() {
107+
return buildSpec(null);
108+
}
109+
110+
private WebsocketClientSpec buildSpec(@Nullable String protocols) {
111+
WebsocketClientSpec.Builder builder = this.specBuilderSupplier.get();
112+
if (StringUtils.hasText(protocols)) {
113+
builder.protocols(protocols);
114+
}
115+
if (this.maxFramePayloadLength != null) {
116+
builder.maxFramePayloadLength(this.maxFramePayloadLength);
117+
}
118+
if (this.handlePing != null) {
119+
builder.handlePing(this.handlePing);
120+
}
121+
return builder.build();
122+
}
123+
79124
/**
80125
* Configure the maximum allowable frame payload length. Setting this value
81126
* to your application's requirement may reduce denial of service attacks
@@ -96,7 +141,7 @@ public void setMaxFramePayloadLength(int maxFramePayloadLength) {
96141
* @since 5.2
97142
*/
98143
public int getMaxFramePayloadLength() {
99-
return this.maxFramePayloadLength;
144+
return getWebsocketClientSpec().maxFramePayloadLength();
100145
}
101146

102147
/**
@@ -119,7 +164,7 @@ public void setHandlePing(boolean handlePing) {
119164
* @since 5.2.4
120165
*/
121166
public boolean getHandlePing() {
122-
return this.handlePing;
167+
return getWebsocketClientSpec().handlePing();
123168
}
124169

125170
@Override
@@ -128,12 +173,11 @@ public Mono<Void> execute(URI url, WebSocketHandler handler) {
128173
}
129174

130175
@Override
131-
@SuppressWarnings("deprecation")
132176
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
133177
String protocols = StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols());
134178
return getHttpClient()
135179
.headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
136-
.websocket(protocols, getMaxFramePayloadLength(), this.handlePing)
180+
.websocket(buildSpec(protocols))
137181
.uri(url.toString())
138182
.handle((inbound, outbound) -> {
139183
HttpHeaders responseHeaders = toHttpHeaders(inbound);

0 commit comments

Comments
 (0)