55
55
* connection to the broker is opened and used exclusively for all messages from the
56
56
* client that originated the CONNECT message. Messages from the same client are
57
57
* identified through the session id message header. Reversely, when the STOMP broker
58
- * sends messages back on the TCP connection, those messages are enriched with the session
59
- * id of the client and sent back downstream through the {@link MessageChannel} provided
60
- * to the constructor.
58
+ * sends messages back on the TCP connection, those messages are enriched with the
59
+ * session id of the client and sent back downstream through the {@link MessageChannel}
60
+ * provided to the constructor.
61
61
*
62
- * <p>This class also automatically opens a default "system" TCP connection to the message
63
- * broker that is used for sending messages that originate from the server application (as
64
- * opposed to from a client). Such messages are not associated with any client and
65
- * therefore do not have a session id header. The "system" connection is effectively
66
- * shared and cannot be used to receive messages. Several properties are provided to
67
- * configure the "system" connection including:
62
+ * <p>This class also automatically opens a default "system" TCP connection to the
63
+ * message broker that is used for sending messages that originate from the server
64
+ * application (as opposed to from a client). Such messages are not associated with
65
+ * any client and therefore do not have a session id header. The "system" connection
66
+ * is effectively shared and cannot be used to receive messages. Several properties
67
+ * are provided to configure the "system" connection including:
68
68
* <ul>
69
- * <li>{@link #setSystemLogin(String) }</li>
70
- * <li>{@link #setSystemPasscode(String) }</li>
71
- * <li>{@link #setSystemHeartbeatSendInterval(long) }</li>
72
- * <li>{@link #setSystemHeartbeatReceiveInterval(long) }</li>
69
+ * <li>{@link #setSystemLogin}</li>
70
+ * <li>{@link #setSystemPasscode}</li>
71
+ * <li>{@link #setSystemHeartbeatSendInterval}</li>
72
+ * <li>{@link #setSystemHeartbeatReceiveInterval}</li>
73
73
* </ul>
74
74
*
75
75
* @author Rossen Stoyanchev
@@ -80,22 +80,20 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
80
80
81
81
public static final String SYSTEM_SESSION_ID = "_system_" ;
82
82
83
- private static final byte [] EMPTY_PAYLOAD = new byte [0 ];
84
-
85
- private static final ListenableFutureTask <Void > EMPTY_TASK = new ListenableFutureTask <>(new VoidCallable ());
86
-
87
83
// STOMP recommends error of margin for receiving heartbeats
88
84
private static final long HEARTBEAT_MULTIPLIER = 3 ;
89
85
90
- private static final Message <byte []> HEARTBEAT_MESSAGE ;
91
-
92
86
/**
93
- * A heartbeat is setup once a CONNECTED frame is received which contains
94
- * the heartbeat settings we need. If we don't receive CONNECTED within
95
- * a minute, the connection is closed proactively.
87
+ * A heartbeat is setup once a CONNECTED frame is received which contains the heartbeat settings
88
+ * we need. If we don't receive CONNECTED within a minute, the connection is closed proactively.
96
89
*/
97
90
private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000 ;
98
91
92
+ private static final byte [] EMPTY_PAYLOAD = new byte [0 ];
93
+
94
+ private static final ListenableFutureTask <Void > EMPTY_TASK = new ListenableFutureTask <>(new VoidCallable ());
95
+
96
+ private static final Message <byte []> HEARTBEAT_MESSAGE ;
99
97
100
98
101
99
static {
@@ -121,19 +119,18 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
121
119
122
120
private long systemHeartbeatReceiveInterval = 10000 ;
123
121
124
- private String virtualHost ;
125
-
126
122
private final Map <String , MessageHandler > systemSubscriptions = new HashMap <>(4 );
127
123
124
+ private String virtualHost ;
125
+
128
126
private TcpOperations <byte []> tcpClient ;
129
127
130
128
private MessageHeaderInitializer headerInitializer ;
131
129
132
- private final Map <String , StompConnectionHandler > connectionHandlers =
133
- new ConcurrentHashMap <>();
134
-
135
130
private final Stats stats = new Stats ();
136
131
132
+ private final Map <String , StompConnectionHandler > connectionHandlers = new ConcurrentHashMap <>();
133
+
137
134
138
135
/**
139
136
* Create a StompBrokerRelayMessageHandler instance with the given message channels
@@ -179,46 +176,6 @@ public void setRelayPort(int relayPort) {
179
176
public int getRelayPort () {
180
177
return this .relayPort ;
181
178
}
182
-
183
- /**
184
- * Set the interval, in milliseconds, at which the "system" connection will, in the
185
- * absence of any other data being sent, send a heartbeat to the STOMP broker. A value
186
- * of zero will prevent heartbeats from being sent to the broker.
187
- * <p>The default value is 10000.
188
- * <p>See class-level documentation for more information on the "system" connection.
189
- */
190
- public void setSystemHeartbeatSendInterval (long systemHeartbeatSendInterval ) {
191
- this .systemHeartbeatSendInterval = systemHeartbeatSendInterval ;
192
- }
193
-
194
- /**
195
- * Return the interval, in milliseconds, at which the "system" connection will
196
- * send heartbeats to the STOMP broker.
197
- */
198
- public long getSystemHeartbeatSendInterval () {
199
- return this .systemHeartbeatSendInterval ;
200
- }
201
-
202
- /**
203
- * Set the maximum interval, in milliseconds, at which the "system" connection
204
- * expects, in the absence of any other data, to receive a heartbeat from the STOMP
205
- * broker. A value of zero will configure the connection to expect not to receive
206
- * heartbeats from the broker.
207
- * <p>The default value is 10000.
208
- * <p>See class-level documentation for more information on the "system" connection.
209
- */
210
- public void setSystemHeartbeatReceiveInterval (long heartbeatReceiveInterval ) {
211
- this .systemHeartbeatReceiveInterval = heartbeatReceiveInterval ;
212
- }
213
-
214
- /**
215
- * Return the interval, in milliseconds, at which the "system" connection expects
216
- * to receive heartbeats from the STOMP broker.
217
- */
218
- public long getSystemHeartbeatReceiveInterval () {
219
- return this .systemHeartbeatReceiveInterval ;
220
- }
221
-
222
179
/**
223
180
* Set the login to use when creating connections to the STOMP broker on
224
181
* behalf of connected clients.
@@ -294,6 +251,46 @@ public String getSystemPasscode() {
294
251
return this .systemPasscode ;
295
252
}
296
253
254
+
255
+ /**
256
+ * Set the interval, in milliseconds, at which the "system" connection will, in the
257
+ * absence of any other data being sent, send a heartbeat to the STOMP broker. A value
258
+ * of zero will prevent heartbeats from being sent to the broker.
259
+ * <p>The default value is 10000.
260
+ * <p>See class-level documentation for more information on the "system" connection.
261
+ */
262
+ public void setSystemHeartbeatSendInterval (long systemHeartbeatSendInterval ) {
263
+ this .systemHeartbeatSendInterval = systemHeartbeatSendInterval ;
264
+ }
265
+
266
+ /**
267
+ * Return the interval, in milliseconds, at which the "system" connection will
268
+ * send heartbeats to the STOMP broker.
269
+ */
270
+ public long getSystemHeartbeatSendInterval () {
271
+ return this .systemHeartbeatSendInterval ;
272
+ }
273
+
274
+ /**
275
+ * Set the maximum interval, in milliseconds, at which the "system" connection
276
+ * expects, in the absence of any other data, to receive a heartbeat from the STOMP
277
+ * broker. A value of zero will configure the connection to expect not to receive
278
+ * heartbeats from the broker.
279
+ * <p>The default value is 10000.
280
+ * <p>See class-level documentation for more information on the "system" connection.
281
+ */
282
+ public void setSystemHeartbeatReceiveInterval (long heartbeatReceiveInterval ) {
283
+ this .systemHeartbeatReceiveInterval = heartbeatReceiveInterval ;
284
+ }
285
+
286
+ /**
287
+ * Return the interval, in milliseconds, at which the "system" connection expects
288
+ * to receive heartbeats from the STOMP broker.
289
+ */
290
+ public long getSystemHeartbeatReceiveInterval () {
291
+ return this .systemHeartbeatReceiveInterval ;
292
+ }
293
+
297
294
/**
298
295
* Configure one more destinations to subscribe to on the shared "system"
299
296
* connection along with MessageHandler's to handle received messages.
@@ -336,28 +333,21 @@ public String getVirtualHost() {
336
333
337
334
/**
338
335
* Configure a TCP client for managing TCP connections to the STOMP broker.
339
- * By default {@link ReactorNettyTcpClient} is used.
336
+ * <p> By default {@link ReactorNettyTcpClient} is used.
340
337
*/
341
338
public void setTcpClient (TcpOperations <byte []> tcpClient ) {
342
339
this .tcpClient = tcpClient ;
343
340
}
344
341
345
342
/**
346
- * Get the configured TCP client. Never {@code null} unless not configured
343
+ * Get the configured TCP client (never {@code null} unless not configured
347
344
* invoked and this method is invoked before the handler is started and
348
- * hence a default implementation initialized.
345
+ * hence a default implementation initialized) .
349
346
*/
350
347
public TcpOperations <byte []> getTcpClient () {
351
348
return this .tcpClient ;
352
349
}
353
350
354
- /**
355
- * Return the current count of TCP connection to the broker.
356
- */
357
- public int getConnectionCount () {
358
- return this .connectionHandlers .size ();
359
- }
360
-
361
351
/**
362
352
* Configure a {@link MessageHeaderInitializer} to apply to the headers of all
363
353
* messages created through the {@code StompBrokerRelayMessageHandler} that
@@ -382,6 +372,13 @@ public String getStatsInfo() {
382
372
return this .stats .toString ();
383
373
}
384
374
375
+ /**
376
+ * Return the current count of TCP connection to the broker.
377
+ */
378
+ public int getConnectionCount () {
379
+ return this .connectionHandlers .size ();
380
+ }
381
+
385
382
386
383
@ Override
387
384
protected void startInternal () {
@@ -872,6 +869,7 @@ public String toString() {
872
869
}
873
870
}
874
871
872
+
875
873
private class SystemStompConnectionHandler extends StompConnectionHandler {
876
874
877
875
public SystemStompConnectionHandler (StompHeaderAccessor connectHeaders ) {
@@ -971,10 +969,11 @@ public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor ac
971
969
}
972
970
}
973
971
972
+
974
973
private static class VoidCallable implements Callable <Void > {
975
974
976
975
@ Override
977
- public Void call () throws Exception {
976
+ public Void call () {
978
977
return null ;
979
978
}
980
979
}
@@ -1001,10 +1000,10 @@ public void incrementDisconnectCount() {
1001
1000
}
1002
1001
1003
1002
public String toString () {
1004
- return connectionHandlers .size () + " sessions, " + relayHost + ":" + relayPort +
1003
+ return ( connectionHandlers .size () + " sessions, " + relayHost + ":" + relayPort +
1005
1004
(isBrokerAvailable () ? " (available)" : " (not available)" ) +
1006
1005
", processed CONNECT(" + this .connect .get () + ")-CONNECTED(" +
1007
- this .connected .get () + ")-DISCONNECT(" + this .disconnect .get () + ")" ;
1006
+ this .connected .get () + ")-DISCONNECT(" + this .disconnect .get () + ")" ) ;
1008
1007
}
1009
1008
}
1010
1009
0 commit comments