Skip to content

Commit 78a23be

Browse files
committed
Polish Reactor2TcpStompClient
1 parent 09c5958 commit 78a23be

File tree

1 file changed

+11
-38
lines changed

1 file changed

+11
-38
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java

Lines changed: 11 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,8 @@
1616

1717
package org.springframework.messaging.simp.stomp;
1818

19-
import java.util.Collections;
20-
import java.util.List;
21-
import java.util.Properties;
22-
2319
import io.netty.channel.EventLoopGroup;
2420
import reactor.Environment;
25-
import reactor.core.config.ConfigurationReader;
26-
import reactor.core.config.DispatcherConfiguration;
27-
import reactor.core.config.DispatcherType;
28-
import reactor.core.config.ReactorConfiguration;
2921
import reactor.io.net.NetStreams.TcpClientFactory;
3022
import reactor.io.net.Spec.TcpClientSpec;
3123
import reactor.io.net.impl.netty.NettyClientSocketOptions;
@@ -61,9 +53,7 @@ public Reactor2TcpStompClient() {
6153
}
6254

6355
/**
64-
* Create an instance with the given host and port.
65-
* @param host the host
66-
* @param port the port
56+
* Create an instance with the given host and port to connect to
6757
*/
6858
public Reactor2TcpStompClient(String host, int port) {
6959
this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup();
@@ -87,7 +77,6 @@ public Reactor2TcpStompClient(TcpOperations<byte[]> tcpClient) {
8777
public void start() {
8878
if (!isRunning()) {
8979
this.running = true;
90-
9180
}
9281
}
9382

@@ -108,7 +97,6 @@ public void stop() {
10897
logger.error("Failed to shutdown gracefully", ex);
10998
}
11099
}
111-
112100
}
113101
}
114102

@@ -149,52 +137,37 @@ public void shutdown() {
149137
}
150138

151139

152-
/**
153-
* A ConfigurationReader with a thread pool-based dispatcher.
154-
*/
155-
private static class StompClientDispatcherConfigReader implements ConfigurationReader {
156-
157-
@Override
158-
public ReactorConfiguration read() {
159-
String dispatcherName = "StompClient";
160-
DispatcherType dispatcherType = DispatcherType.DISPATCHER_GROUP;
161-
DispatcherConfiguration config = new DispatcherConfiguration(dispatcherName, dispatcherType, 128, 0);
162-
List<DispatcherConfiguration> configList = Collections.<DispatcherConfiguration>singletonList(config);
163-
return new ReactorConfiguration(configList, dispatcherName, new Properties());
164-
}
165-
}
166-
167-
168140
private static class StompTcpClientSpecFactory implements TcpClientFactory<Message<byte[]>, Message<byte[]>> {
169141

170142
private final String host;
171143

172144
private final int port;
173145

174-
private final EventLoopGroup eventLoopGroup;
146+
private final NettyClientSocketOptions socketOptions;
175147

176148
private final Environment environment;
177149

150+
private final Reactor2StompCodec codec;
151+
178152

179-
public StompTcpClientSpecFactory(String host, int port, EventLoopGroup group, Environment environment) {
153+
StompTcpClientSpecFactory(String host, int port, EventLoopGroup group, Environment environment) {
180154
this.host = host;
181155
this.port = port;
182-
this.eventLoopGroup = group;
156+
this.socketOptions = new NettyClientSocketOptions().eventLoopGroup(group);
183157
this.environment = environment;
158+
this.codec = new Reactor2StompCodec(new StompEncoder(), new StompDecoder());
184159
}
185160

186161
@Override
187162
public TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
188-
TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
189-
190-
final Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), new StompDecoder());
163+
TcpClientSpec<Message<byte[]>, Message<byte[]>> clientSpec) {
191164

192-
return tcpClientSpec
165+
return clientSpec
193166
.env(this.environment)
194167
.dispatcher(this.environment.getDispatcher(Environment.WORK_QUEUE))
195168
.connect(this.host, this.port)
196-
.codec(codec)
197-
.options(new NettyClientSocketOptions().eventLoopGroup(this.eventLoopGroup));
169+
.codec(this.codec)
170+
.options(this.socketOptions);
198171
}
199172
}
200173

0 commit comments

Comments
 (0)