Skip to content

Commit b482b00

Browse files
authored
GH-4014: MQTT ClientManager: completion timeouts (#8552)
Fixes #4014 The `ClientManager` implementations uses by mistake a `connectionTimeout` for operations with completion wait * Introduce `completionTimeout` and `disconnectCompletionTimeout` for `ClientManager` abstraction to realign the logic with existing channel adapters and Paho Client by itself. * Deprecate `DEFAULT_COMPLETION_TIMEOUT` and `DISCONNECT_COMPLETION_TIMEOUT` constants in the `AbstractMqttMessageDrivenChannelAdapter` in favor of respective replacement in the `ClientManager` * Pull `disconnectCompletionTimeout` property from the `MqttPahoMessageDrivenChannelAdapter` to its superclass * Use new `disconnectCompletionTimeout` in the `Mqttv5PahoMessageDrivenChannelAdapter` for similar `disconnectForcibly()` call * Fix Lifecycle race condition when `ClientManager` is started by the outbound channel adapter (`Integer.MIN_VALUE` phase and auto-startup - see `DefaultLifecycleProcessor.doStart()` and logic around `dependencies`) which is much earlier than `MessageProducerSupport` (`Integer.MAX_VALUE / 2` phase) and there a `connectComplete()` callback might be called before the `MessageProducerSupport.start()`. For that purpose check for an `isRunning()` in the `connectComplete()` before subscribing and set `readyToSubscribeOnStart` flag to `subscribe()` in a `doStart()` of this `MqttMessageDrivenChannelAdapter` * Remove redundant `MqttPahoMessageDrivenChannelAdapter.cleanSession` property in favor of `this.clientFactory.getConnectionOptions().isCleanSession()` call GH-8550: MQTT: Always re-subscribe on re-connect Fixes #8550 Turns out the Paho MQTT client does not re-subscribe when connection re-established on automatic reconnection * Fix `AbstractMqttMessageDrivenChannelAdapter` to always subscribe to their topics in the `connectComplete()` independently of the `reconnect` status * Verify behavior with `MOSQUITTO_CONTAINER` image restart in Docker
1 parent 3d24527 commit b482b00

File tree

8 files changed

+280
-89
lines changed

8 files changed

+280
-89
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -53,6 +53,10 @@ public abstract class AbstractMqttClientManager<T, C> implements ClientManager<T
5353

5454
private int phase = DEFAULT_MANAGER_PHASE;
5555

56+
private long completionTimeout = ClientManager.DEFAULT_COMPLETION_TIMEOUT;
57+
58+
private long disconnectCompletionTimeout = ClientManager.DISCONNECT_COMPLETION_TIMEOUT;
59+
5660
private boolean manualAcks;
5761

5862
private ApplicationEventPublisher applicationEventPublisher;
@@ -96,6 +100,34 @@ protected Set<ConnectCallback> getCallbacks() {
96100
return this.connectCallbacks;
97101
}
98102

103+
/**
104+
* Set the completion timeout for operations.
105+
* Default {@value #DEFAULT_COMPLETION_TIMEOUT} milliseconds.
106+
* @param completionTimeout The timeout.
107+
* @since 6.0.3
108+
*/
109+
public void setCompletionTimeout(long completionTimeout) {
110+
this.completionTimeout = completionTimeout;
111+
}
112+
113+
protected long getCompletionTimeout() {
114+
return this.completionTimeout;
115+
}
116+
117+
/**
118+
* Set the completion timeout when disconnecting.
119+
* Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
120+
* @param completionTimeout The timeout.
121+
* @since 6.0.3
122+
*/
123+
public synchronized void setDisconnectCompletionTimeout(long completionTimeout) {
124+
this.disconnectCompletionTimeout = completionTimeout;
125+
}
126+
127+
protected long getDisconnectCompletionTimeout() {
128+
return this.disconnectCompletionTimeout;
129+
}
130+
99131
@Override
100132
public boolean isManualAcks() {
101133
return this.manualAcks;
@@ -123,7 +155,7 @@ public String getBeanName() {
123155
}
124156

125157
/**
126-
* The phase of component autostart in {@link SmartLifecycle}.
158+
* The phase of component auto-start in {@link SmartLifecycle}.
127159
* If the custom one is required, note that for the correct behavior it should be less than phase of
128160
* {@link AbstractMqttMessageDrivenChannelAdapter} implementations.
129161
* The default phase is {@link #DEFAULT_MANAGER_PHASE}.

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,6 +33,16 @@
3333
*/
3434
public interface ClientManager<T, C> extends SmartLifecycle, MqttComponent<C> {
3535

36+
/**
37+
* The default completion timeout in milliseconds.
38+
*/
39+
long DEFAULT_COMPLETION_TIMEOUT = 30_000L;
40+
41+
/**
42+
* The default disconnect completion timeout in milliseconds.
43+
*/
44+
long DISCONNECT_COMPLETION_TIMEOUT = 5_000L;
45+
3646
/**
3747
* Return the managed client.
3848
* @return the managed client.

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,6 +35,8 @@
3535
* {@link MqttConnectionFailedEvent} and reconnect the MQTT client manually.
3636
*
3737
* @author Artem Vozhdayenko
38+
* @author Artem Bilan
39+
*
3840
* @since 6.0
3941
*/
4042
public class Mqttv3ClientManager
@@ -97,10 +99,9 @@ public synchronized void start() {
9799
}
98100
setClient(client);
99101
try {
100-
client.connect(this.connectionOptions)
101-
.waitForCompletion(this.connectionOptions.getConnectionTimeout());
102+
client.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout());
102103
}
103-
catch (MqttException e) {
104+
catch (MqttException ex) {
104105
// See GH-3822
105106
if (this.connectionOptions.isAutomaticReconnect()) {
106107
try {
@@ -113,10 +114,10 @@ public synchronized void start() {
113114
else {
114115
var applicationEventPublisher = getApplicationEventPublisher();
115116
if (applicationEventPublisher != null) {
116-
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e));
117+
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
117118
}
118119
else {
119-
logger.error("Could not start client manager, client_id=" + getClientId(), e);
120+
logger.error("Could not start client manager, client_id=" + getClientId(), ex);
120121
}
121122
}
122123
}
@@ -138,7 +139,7 @@ public synchronized void stop() {
138139
return;
139140
}
140141
try {
141-
client.disconnectForcibly(this.connectionOptions.getConnectionTimeout());
142+
client.disconnectForcibly(getDisconnectCompletionTimeout());
142143
}
143144
catch (MqttException e) {
144145
logger.error("Could not disconnect from the client", e);

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -101,8 +101,7 @@ public synchronized void start() {
101101
}
102102
setClient(client);
103103
try {
104-
client.connect(this.connectionOptions)
105-
.waitForCompletion(this.connectionOptions.getConnectionTimeout());
104+
client.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout());
106105
}
107106
catch (MqttException ex) {
108107
if (this.connectionOptions.isAutomaticReconnect()) {
@@ -142,7 +141,7 @@ public synchronized void stop() {
142141
}
143142

144143
try {
145-
client.disconnectForcibly(this.connectionOptions.getConnectionTimeout());
144+
client.disconnectForcibly(getDisconnectCompletionTimeout());
146145
}
147146
catch (MqttException e) {
148147
logger.error("Could not disconnect from the client", e);

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -61,7 +61,14 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter<T, C> extends Mess
6161
/**
6262
* The default completion timeout in milliseconds.
6363
*/
64-
public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L;
64+
@Deprecated(since = "6.0.3", forRemoval = true)
65+
public static final long DEFAULT_COMPLETION_TIMEOUT = ClientManager.DEFAULT_COMPLETION_TIMEOUT;
66+
67+
/**
68+
* The default disconnect completion timeout in milliseconds.
69+
*/
70+
@Deprecated(since = "6.0.3", forRemoval = true)
71+
public static final long DISCONNECT_COMPLETION_TIMEOUT = ClientManager.DISCONNECT_COMPLETION_TIMEOUT;
6572

6673
protected final Lock topicLock = new ReentrantLock(); // NOSONAR
6774

@@ -73,7 +80,9 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter<T, C> extends Mess
7380

7481
private final ClientManager<T, C> clientManager;
7582

76-
private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
83+
private long completionTimeout = ClientManager.DEFAULT_COMPLETION_TIMEOUT;
84+
85+
private long disconnectCompletionTimeout = ClientManager.DISCONNECT_COMPLETION_TIMEOUT;
7786

7887
private boolean manualAcks;
7988

@@ -179,6 +188,20 @@ public String[] getTopic() {
179188
}
180189
}
181190

191+
/**
192+
* Set the completion timeout when disconnecting.
193+
* Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
194+
* @param completionTimeout The timeout.
195+
* @since 5.1.10
196+
*/
197+
public synchronized void setDisconnectCompletionTimeout(long completionTimeout) {
198+
this.disconnectCompletionTimeout = completionTimeout;
199+
}
200+
201+
protected long getDisconnectCompletionTimeout() {
202+
return this.disconnectCompletionTimeout;
203+
}
204+
182205
@Override
183206
protected void onInit() {
184207
super.onInit();
@@ -223,8 +246,8 @@ protected boolean isManualAcks() {
223246
}
224247

225248
/**
226-
* Set the completion timeout for operations. Not settable using the namespace.
227-
* Default {@value #DEFAULT_COMPLETION_TIMEOUT} milliseconds.
249+
* Set the completion timeout for operations.
250+
* Default {@value ClientManager#DEFAULT_COMPLETION_TIMEOUT} milliseconds.
228251
* @param completionTimeout The timeout.
229252
* @since 4.1
230253
*/

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 34 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -64,22 +64,15 @@ public class MqttPahoMessageDrivenChannelAdapter
6464
extends AbstractMqttMessageDrivenChannelAdapter<IMqttAsyncClient, MqttConnectOptions>
6565
implements MqttCallbackExtended, MqttPahoComponent {
6666

67-
/**
68-
* The default disconnect completion timeout in milliseconds.
69-
*/
70-
public static final long DISCONNECT_COMPLETION_TIMEOUT = 5_000L;
71-
7267
private final MqttPahoClientFactory clientFactory;
7368

74-
private long disconnectCompletionTimeout = DISCONNECT_COMPLETION_TIMEOUT;
75-
7669
private volatile IMqttAsyncClient client;
7770

78-
private volatile boolean cleanSession;
79-
8071
@SuppressWarnings("deprecation")
8172
private volatile org.springframework.integration.mqtt.core.ConsumerStopAction consumerStopAction;
8273

74+
private volatile boolean readyToSubscribeOnStart;
75+
8376
/**
8477
* Use this constructor when you don't need additional {@link MqttConnectOptions}.
8578
* @param url The URL.
@@ -138,16 +131,6 @@ public MqttPahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient, MqttC
138131
this.clientFactory = factory;
139132
}
140133

141-
/**
142-
* Set the completion timeout when disconnecting. Not settable using the namespace.
143-
* Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
144-
* @param completionTimeout The timeout.
145-
* @since 5.1.10
146-
*/
147-
public synchronized void setDisconnectCompletionTimeout(long completionTimeout) {
148-
this.disconnectCompletionTimeout = completionTimeout;
149-
}
150-
151134
@Override
152135
public MqttConnectOptions getConnectionInfo() {
153136
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
@@ -175,6 +158,9 @@ protected void onInit() {
175158
protected void doStart() {
176159
try {
177160
connect();
161+
if (this.readyToSubscribeOnStart) {
162+
subscribe();
163+
}
178164
}
179165
catch (Exception ex) {
180166
if (getConnectionInfo().isAutomaticReconnect()) {
@@ -195,15 +181,38 @@ protected void doStart() {
195181
}
196182
}
197183

184+
@SuppressWarnings("deprecation")
185+
private synchronized void connect() throws MqttException {
186+
MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
187+
this.consumerStopAction = this.clientFactory.getConsumerStopAction();
188+
if (this.consumerStopAction == null) {
189+
this.consumerStopAction = org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_CLEAN;
190+
}
191+
192+
var clientManager = getClientManager();
193+
if (clientManager == null) {
194+
Assert.state(getUrl() != null || connectionOptions.getServerURIs() != null,
195+
"If no 'url' provided, connectionOptions.getServerURIs() must not be null");
196+
this.client = this.clientFactory.getAsyncClientInstance(getUrl(), getClientId());
197+
this.client.setCallback(this);
198+
this.client.connect(connectionOptions).waitForCompletion(getCompletionTimeout());
199+
this.client.setManualAcks(isManualAcks());
200+
}
201+
else {
202+
this.client = clientManager.getClient();
203+
}
204+
}
205+
198206
@SuppressWarnings("deprecation")
199207
@Override
200208
protected synchronized void doStop() {
209+
this.readyToSubscribeOnStart = false;
201210
try {
202211
if (this.consumerStopAction
203212
.equals(org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_ALWAYS)
204213
|| (this.consumerStopAction
205214
.equals(org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_CLEAN)
206-
&& this.cleanSession)) {
215+
&& this.clientFactory.getConnectionOptions().isCleanSession())) {
207216

208217
this.client.unsubscribe(getTopic());
209218
}
@@ -217,7 +226,7 @@ protected synchronized void doStop() {
217226
}
218227

219228
try {
220-
this.client.disconnectForcibly(this.disconnectCompletionTimeout);
229+
this.client.disconnectForcibly(getDisconnectCompletionTimeout());
221230
}
222231
catch (MqttException ex) {
223232
logger.error(ex, "Exception while disconnecting");
@@ -273,29 +282,6 @@ public void removeTopic(String... topic) {
273282
}
274283
}
275284

276-
@SuppressWarnings("deprecation")
277-
private synchronized void connect() throws MqttException { // NOSONAR
278-
MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
279-
this.cleanSession = connectionOptions.isCleanSession();
280-
this.consumerStopAction = this.clientFactory.getConsumerStopAction();
281-
if (this.consumerStopAction == null) {
282-
this.consumerStopAction = org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_CLEAN;
283-
}
284-
285-
var clientManager = getClientManager();
286-
if (clientManager == null) {
287-
Assert.state(getUrl() != null || connectionOptions.getServerURIs() != null,
288-
"If no 'url' provided, connectionOptions.getServerURIs() must not be null");
289-
this.client = this.clientFactory.getAsyncClientInstance(getUrl(), getClientId());
290-
this.client.setCallback(this);
291-
this.client.connect(connectionOptions).waitForCompletion(getCompletionTimeout());
292-
this.client.setManualAcks(isManualAcks());
293-
}
294-
else {
295-
this.client = clientManager.getClient();
296-
}
297-
}
298-
299285
private void subscribe() {
300286
this.topicLock.lock();
301287
String[] topics = getTopic();
@@ -418,9 +404,12 @@ public void connectComplete(boolean isReconnect) {
418404

419405
@Override
420406
public void connectComplete(boolean reconnect, String serverURI) {
421-
if (!reconnect) {
407+
if (isRunning()) {
422408
subscribe();
423409
}
410+
else {
411+
this.readyToSubscribeOnStart = true;
412+
}
424413
}
425414

426415
/**

0 commit comments

Comments
 (0)