Skip to content

Commit d512cca

Browse files
committed
Reactor2TcpClient constructor with address supplier
Issue: SPR-12452
1 parent 4bc3e0c commit d512cca

File tree

4 files changed

+106
-16
lines changed

4 files changed

+106
-16
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,8 @@
1919
import org.springframework.messaging.MessageChannel;
2020
import org.springframework.messaging.SubscribableChannel;
2121
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
22+
import org.springframework.messaging.tcp.TcpOperations;
23+
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
2224
import org.springframework.util.Assert;
2325

2426
/**
@@ -47,6 +49,8 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
4749

4850
private String virtualHost;
4951

52+
private TcpOperations<byte[]> tcpClient;
53+
5054
private boolean autoStartup = true;
5155

5256
private String userDestinationBroadcast;
@@ -160,6 +164,18 @@ public StompBrokerRelayRegistration setVirtualHost(String virtualHost) {
160164
return this;
161165
}
162166

167+
/**
168+
* Configure a TCP client for managing TCP connections to the STOMP broker.
169+
* By default {@link Reactor2TcpClient} is used.
170+
* <p><strong>Note:</strong> when this property is used, any
171+
* {@link #setRelayHost(String) host} or {@link #setRelayPort(int) port}
172+
* specified are effectively ignored.
173+
* @since 4.3.15
174+
*/
175+
public void setTcpClient(TcpOperations<byte[]> tcpClient) {
176+
this.tcpClient = tcpClient;
177+
}
178+
163179
/**
164180
* Configure whether the {@link StompBrokerRelayMessageHandler} should start
165181
* automatically when the Spring ApplicationContext is refreshed.
@@ -231,6 +247,9 @@ protected StompBrokerRelayMessageHandler getMessageHandler(SubscribableChannel b
231247
if (this.virtualHost != null) {
232248
handler.setVirtualHost(this.virtualHost);
233249
}
250+
if (this.tcpClient != null) {
251+
handler.setTcpClient(this.tcpClient);
252+
}
234253

235254
handler.setAutoStartup(this.autoStartup);
236255

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,9 @@ public String getVirtualHost() {
334334
/**
335335
* Configure a TCP client for managing TCP connections to the STOMP broker.
336336
* By default {@link Reactor2TcpClient} is used.
337+
* <p><strong>Note:</strong> when this property is used, any
338+
* {@link #setRelayHost(String) host} or {@link #setRelayPort(int) port}
339+
* specified are effectively ignored.
337340
*/
338341
public void setTcpClient(TcpOperations<byte[]> tcpClient) {
339342
this.tcpClient = tcpClient;

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@
3535
import reactor.core.support.NamedDaemonThreadFactory;
3636
import reactor.fn.Consumer;
3737
import reactor.fn.Function;
38+
import reactor.fn.Supplier;
3839
import reactor.fn.tuple.Tuple;
3940
import reactor.fn.tuple.Tuple2;
4041
import reactor.io.buffer.Buffer;
@@ -65,10 +66,10 @@
6566

6667
/**
6768
* An implementation of {@link org.springframework.messaging.tcp.TcpOperations}
68-
* based on the TCP client support of the Reactor project.
69+
* based on the TCP client support of project Reactor.
6970
*
70-
* <p>This implementation wraps N (Reactor) clients for N {@link #connect} calls,
71-
* i.e. a separate (Reactor) client instance for each connection.
71+
* <p>This implementation wraps N Reactor {@code TcpClient} instances created
72+
* for N {@link #connect} calls, i.e. once instance per connection.
7273
*
7374
* @author Rossen Stoyanchev
7475
* @author Stephane Maldini
@@ -100,13 +101,28 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
100101
* relying on Netty threads. The number of Netty threads can be tweaked with
101102
* the {@code reactor.tcp.ioThreadCount} System property. The network I/O
102103
* threads will be shared amongst the active clients.
103-
* <p>Also see the constructor accepting a ready Reactor
104-
* {@link TcpClientSpec} {@link Function} factory.
104+
*
105105
* @param host the host to connect to
106106
* @param port the port to connect to
107107
* @param codec the codec to use for encoding and decoding the TCP stream
108108
*/
109109
public Reactor2TcpClient(final String host, final int port, final Codec<Buffer, Message<P>, Message<P>> codec) {
110+
this(new FixedAddressSupplier(host, port), codec);
111+
}
112+
113+
/**
114+
* A variant of {@link #Reactor2TcpClient(String, int, Codec)} that takes a
115+
* supplier of any number of addresses instead of just one host and port.
116+
* This can be used to {@link #connect(TcpConnectionHandler, ReconnectStrategy)
117+
* reconnect} to a different address after the current host becomes unavailable.
118+
*
119+
* @param addressSupplier supplier of addresses to use for connecting
120+
* @param codec the codec to use for encoding and decoding the TCP stream
121+
* @since 4.3.15
122+
*/
123+
public Reactor2TcpClient(final Supplier<InetSocketAddress> addressSupplier,
124+
final Codec<Buffer, Message<P>, Message<P>> codec) {
125+
110126
// Reactor 2.0.5 requires NioEventLoopGroup vs 2.0.6+ requires EventLoopGroup
111127
final NioEventLoopGroup nioEventLoopGroup = initEventLoopGroup();
112128
this.eventLoopGroup = nioEventLoopGroup;
@@ -118,7 +134,7 @@ public TcpClientSpec<Message<P>, Message<P>> apply(TcpClientSpec<Message<P>, Mes
118134
return spec
119135
.env(environment)
120136
.codec(codec)
121-
.connect(host, port)
137+
.connect(addressSupplier)
122138
.options(createClientSocketOptions());
123139
}
124140

@@ -133,10 +149,13 @@ private ClientSocketOptions createClientSocketOptions() {
133149
* A constructor with a pre-configured {@link TcpClientSpec} {@link Function}
134150
* factory. This might be used to add SSL or specific network parameters to
135151
* the generated client configuration.
152+
*
136153
* <p><strong>NOTE:</strong> if the client is configured with a thread-creating
137-
* dispatcher, you are responsible for cleaning them, e.g. using
154+
* dispatcher, you are responsible for cleaning them, e.g. via
138155
* {@link reactor.core.Dispatcher#shutdown}.
139-
* @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for each client creation
156+
*
157+
* @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for
158+
* each client creation
140159
*/
141160
public Reactor2TcpClient(TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory) {
142161
Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null");
@@ -295,6 +314,21 @@ private static Method initEventLoopGroupMethod() {
295314
}
296315

297316

317+
private static class FixedAddressSupplier implements Supplier<InetSocketAddress> {
318+
319+
private final InetSocketAddress address;
320+
321+
FixedAddressSupplier(String host, int port) {
322+
this.address = new InetSocketAddress(host, port);
323+
}
324+
325+
@Override
326+
public InetSocketAddress get() {
327+
return this.address;
328+
}
329+
}
330+
331+
298332
private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
299333

300334
@Override

src/asciidoc/web-websocket.adoc

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1615,8 +1615,8 @@ values ``guest``/``guest``.
16151615
====
16161616
The STOMP broker relay always sets the `login` and `passcode` headers on every `CONNECT`
16171617
frame that it forwards to the broker on behalf of clients. Therefore WebSocket clients
1618-
need not set those headers; they will be ignored. As the following section explains,
1619-
instead WebSocket clients should rely on HTTP authentication to protect the WebSocket
1618+
need not set those headers; they will be ignored. As the <<websocket-stomp-authentication>>
1619+
explains, instead WebSocket clients should rely on HTTP authentication to protect the WebSocket
16201620
endpoint and establish the client identity.
16211621
====
16221622

@@ -1626,13 +1626,47 @@ and receiving heartbeats (10 seconds each by default). If connectivity to the br
16261626
is lost, the broker relay will continue to try to reconnect, every 5 seconds,
16271627
until it succeeds.
16281628

1629-
[NOTE]
1630-
====
1631-
A Spring bean can implement `ApplicationListener<BrokerAvailabilityEvent>` in order
1629+
Any Spring bean can implement `ApplicationListener<BrokerAvailabilityEvent>` in order
16321630
to receive notifications when the "system" connection to the broker is lost and
16331631
re-established. For example a Stock Quote service broadcasting stock quotes can
16341632
stop trying to send messages when there is no active "system" connection.
1635-
====
1633+
1634+
By default, the STOMP broker relay always connects, and reconnects as needed if
1635+
connectivity is lost, to the same host and port. If you wish to supply multiple addresses,
1636+
on each attempt to connect, you can configure a supplier of addresses, instead of a
1637+
fixed host and port. For example:
1638+
1639+
[source,java,indent=0]
1640+
[subs="verbatim,quotes"]
1641+
----
1642+
@Configuration
1643+
@EnableWebSocketMessageBroker
1644+
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
1645+
1646+
// ...
1647+
1648+
@Override
1649+
public void configureMessageBroker(MessageBrokerRegistry registry) {
1650+
registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
1651+
registry.setApplicationDestinationPrefixes("/app");
1652+
}
1653+
1654+
private Reactor2TcpClient<byte[]> createTcpClient() {
1655+
1656+
Supplier<InetSocketAddress> addressSupplier = new Supplier<InetSocketAddress>() {
1657+
@Override
1658+
public InetSocketAddress get() {
1659+
// Select address to connect to ...
1660+
}
1661+
};
1662+
1663+
StompDecoder decoder = new StompDecoder();
1664+
Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), decoder);
1665+
return new Reactor2TcpClient<>(addressSupplier, codec);
1666+
}
1667+
1668+
}
1669+
----
16361670

16371671
The STOMP broker relay can also be configured with a `virtualHost` property.
16381672
The value of this property will be set as the `host` header of every `CONNECT` frame

0 commit comments

Comments
 (0)