Skip to content

Commit c555fef

Browse files
committed
Improve TCP connection info logging.
After the recent changes to expose configuring TcpOperations, it no longer makes sense to automatically log the relayHost/Port since that's mutually exclusive with a custom TcpOperations. Instead we delegate to TcpOperations.toString(). Issue: SPR-16801
1 parent ab0b0b3 commit c555fef

File tree

4 files changed

+34
-15
lines changed

4 files changed

+34
-15
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,8 @@ public void shutdown() {
8989
this.tcpClient.shutdown();
9090
}
9191

92+
@Override
93+
public String toString() {
94+
return "ReactorNettyTcpStompClient[" + this.tcpClient + "]";
95+
}
9296
}

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
8282

8383
public static final String SYSTEM_SESSION_ID = "_system_";
8484

85-
// STOMP recommends error of margin for receiving heartbeats
85+
/** STOMP recommended error of margin for receiving heartbeats */
8686
private static final long HEARTBEAT_MULTIPLIER = 3;
8787

8888
/**
89-
* A heartbeat is setup once a CONNECTED frame is received which contains the heartbeat settings
90-
* we need. If we don't receive CONNECTED within a minute, the connection is closed proactively.
89+
* Heartbeat starts once CONNECTED frame with heartbeat settings is received.
90+
* If CONNECTED doesn't arrive within a minute, we'll close the connection.
9191
*/
9292
private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000;
9393

@@ -403,7 +403,7 @@ protected void startInternal() {
403403
}
404404

405405
if (logger.isInfoEnabled()) {
406-
logger.info("Connecting \"system\" session to " + this.relayHost + ":" + this.relayPort);
406+
logger.info("Starting \"system\" session, " + toString());
407407
}
408408

409409
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
@@ -552,7 +552,11 @@ else if (StompCommand.DISCONNECT.equals(command)) {
552552

553553
@Override
554554
public String toString() {
555-
return "StompBrokerRelay[" + this.relayHost + ":" + this.relayPort + "]";
555+
return "StompBrokerRelay[" + getTcpClientInfo() + "]";
556+
}
557+
558+
private String getTcpClientInfo() {
559+
return this.tcpClient != null ? this.tcpClient.toString() : this.relayHost + ":" + this.relayPort;
556560
}
557561

558562

@@ -987,7 +991,7 @@ public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor ac
987991
private static class VoidCallable implements Callable<Void> {
988992

989993
@Override
990-
public Void call() throws Exception {
994+
public Void call() {
991995
return null;
992996
}
993997
}
@@ -1014,7 +1018,7 @@ public void incrementDisconnectCount() {
10141018
}
10151019

10161020
public String toString() {
1017-
return (connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort +
1021+
return (connectionHandlers.size() + " sessions, " + getTcpClientInfo() +
10181022
(isBrokerAvailable() ? " (available)" : " (not available)") +
10191023
", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" +
10201024
this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")");

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,6 +30,8 @@
3030
import io.netty.channel.group.DefaultChannelGroup;
3131
import io.netty.handler.codec.ByteToMessageDecoder;
3232
import io.netty.util.concurrent.ImmediateEventExecutor;
33+
import org.apache.commons.logging.Log;
34+
import org.apache.commons.logging.LogFactory;
3335
import org.reactivestreams.Publisher;
3436
import reactor.core.publisher.DirectProcessor;
3537
import reactor.core.publisher.Flux;
@@ -65,6 +67,8 @@
6567
*/
6668
public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
6769

70+
private static Log logger = LogFactory.getLog(ReactorNettyTcpClient.class);
71+
6872
private static final int PUBLISH_ON_BUFFER_SIZE = 16;
6973

7074

@@ -201,7 +205,7 @@ public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, Reconnect
201205
.doOnNext(updateConnectMono(connectMono))
202206
.doOnError(updateConnectMono(connectMono))
203207
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
204-
.flatMap(NettyContext::onClose) // post-connect issues
208+
.flatMap(NettyContext::onClose) // post-connect issues
205209
.retryWhen(reconnectFunction(strategy))
206210
.repeatWhen(reconnectFunction(strategy))
207211
.subscribe();
@@ -281,6 +285,11 @@ private Mono<Void> stopScheduler() {
281285
});
282286
}
283287

288+
@Override
289+
public String toString() {
290+
return "ReactorNettyTcpClient[" + this.tcpClient + "]";
291+
}
292+
284293

285294
private class ReactorNettyHandler implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
286295

@@ -293,6 +302,9 @@ private class ReactorNettyHandler implements BiFunction<NettyInbound, NettyOutbo
293302
@Override
294303
@SuppressWarnings("unchecked")
295304
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
305+
if (logger.isDebugEnabled()) {
306+
logger.debug("Connected to " + inbound.remoteAddress());
307+
}
296308
DirectProcessor<Void> completion = DirectProcessor.create();
297309
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
298310
scheduler.schedule(() -> connectionHandler.afterConnected(connection));
@@ -321,7 +333,7 @@ public StompMessageDecoder(ReactorNettyCodec<P> codec) {
321333
}
322334

323335
@Override
324-
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
336+
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
325337
Collection<Message<P>> messages = codec.decode(in);
326338
out.addAll(messages);
327339
}

spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -278,18 +278,17 @@ public void stompBrokerRelay() {
278278
assertEquals(5000, messageBroker.getSystemHeartbeatSendInterval());
279279
assertThat(messageBroker.getDestinationPrefixes(), Matchers.containsInAnyOrder("/topic","/queue"));
280280

281-
List<Class<? extends MessageHandler>> subscriberTypes =
282-
Arrays.<Class<? extends MessageHandler>>asList(SimpAnnotationMethodMessageHandler.class,
283-
UserDestinationMessageHandler.class, StompBrokerRelayMessageHandler.class);
281+
List<Class<? extends MessageHandler>> subscriberTypes = Arrays.asList(
282+
SimpAnnotationMethodMessageHandler.class, UserDestinationMessageHandler.class,
283+
StompBrokerRelayMessageHandler.class);
284284
testChannel("clientInboundChannel", subscriberTypes, 2);
285285
testExecutor("clientInboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
286286

287287
subscriberTypes = Collections.singletonList(SubProtocolWebSocketHandler.class);
288288
testChannel("clientOutboundChannel", subscriberTypes, 1);
289289
testExecutor("clientOutboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
290290

291-
subscriberTypes = Arrays.<Class<? extends MessageHandler>>asList(
292-
StompBrokerRelayMessageHandler.class, UserDestinationMessageHandler.class);
291+
subscriberTypes = Arrays.asList(StompBrokerRelayMessageHandler.class, UserDestinationMessageHandler.class);
293292
testChannel("brokerChannel", subscriberTypes, 1);
294293
try {
295294
this.appContext.getBean("brokerChannelExecutor", ThreadPoolTaskExecutor.class);

0 commit comments

Comments
 (0)