Skip to content

GH-4014: MQTT ClientManager: completion timeouts #8552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,6 +53,10 @@ public abstract class AbstractMqttClientManager<T, C> implements ClientManager<T

private int phase = DEFAULT_MANAGER_PHASE;

private long completionTimeout = ClientManager.DEFAULT_COMPLETION_TIMEOUT;

private long disconnectCompletionTimeout = ClientManager.DISCONNECT_COMPLETION_TIMEOUT;

private boolean manualAcks;

private ApplicationEventPublisher applicationEventPublisher;
Expand Down Expand Up @@ -96,6 +100,34 @@ protected Set<ConnectCallback> getCallbacks() {
return this.connectCallbacks;
}

/**
* Set the completion timeout for operations.
* Default {@value #DEFAULT_COMPLETION_TIMEOUT} milliseconds.
* @param completionTimeout The timeout.
* @since 6.0.3
*/
public void setCompletionTimeout(long completionTimeout) {
this.completionTimeout = completionTimeout;
}

protected long getCompletionTimeout() {
return this.completionTimeout;
}

/**
* Set the completion timeout when disconnecting.
* Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
* @param completionTimeout The timeout.
* @since 6.0.3
*/
public synchronized void setDisconnectCompletionTimeout(long completionTimeout) {
this.disconnectCompletionTimeout = completionTimeout;
}

protected long getDisconnectCompletionTimeout() {
return this.disconnectCompletionTimeout;
}

@Override
public boolean isManualAcks() {
return this.manualAcks;
Expand Down Expand Up @@ -123,7 +155,7 @@ public String getBeanName() {
}

/**
* The phase of component autostart in {@link SmartLifecycle}.
* The phase of component auto-start in {@link SmartLifecycle}.
* If the custom one is required, note that for the correct behavior it should be less than phase of
* {@link AbstractMqttMessageDrivenChannelAdapter} implementations.
* The default phase is {@link #DEFAULT_MANAGER_PHASE}.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,16 @@
*/
public interface ClientManager<T, C> extends SmartLifecycle, MqttComponent<C> {

/**
* The default completion timeout in milliseconds.
*/
long DEFAULT_COMPLETION_TIMEOUT = 30_000L;

/**
* The default disconnect completion timeout in milliseconds.
*/
long DISCONNECT_COMPLETION_TIMEOUT = 5_000L;

/**
* Return the managed client.
* @return the managed client.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,6 +35,8 @@
* {@link MqttConnectionFailedEvent} and reconnect the MQTT client manually.
*
* @author Artem Vozhdayenko
* @author Artem Bilan
*
* @since 6.0
*/
public class Mqttv3ClientManager
Expand Down Expand Up @@ -97,10 +99,9 @@ public synchronized void start() {
}
setClient(client);
try {
client.connect(this.connectionOptions)
.waitForCompletion(this.connectionOptions.getConnectionTimeout());
client.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout());
}
catch (MqttException e) {
catch (MqttException ex) {
// See GH-3822
if (this.connectionOptions.isAutomaticReconnect()) {
try {
Expand All @@ -113,10 +114,10 @@ public synchronized void start() {
else {
var applicationEventPublisher = getApplicationEventPublisher();
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e));
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
}
else {
logger.error("Could not start client manager, client_id=" + getClientId(), e);
logger.error("Could not start client manager, client_id=" + getClientId(), ex);
}
}
}
Expand All @@ -138,7 +139,7 @@ public synchronized void stop() {
return;
}
try {
client.disconnectForcibly(this.connectionOptions.getConnectionTimeout());
client.disconnectForcibly(getDisconnectCompletionTimeout());
}
catch (MqttException e) {
logger.error("Could not disconnect from the client", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -101,8 +101,7 @@ public synchronized void start() {
}
setClient(client);
try {
client.connect(this.connectionOptions)
.waitForCompletion(this.connectionOptions.getConnectionTimeout());
client.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout());
}
catch (MqttException ex) {
if (this.connectionOptions.isAutomaticReconnect()) {
Expand Down Expand Up @@ -142,7 +141,7 @@ public synchronized void stop() {
}

try {
client.disconnectForcibly(this.connectionOptions.getConnectionTimeout());
client.disconnectForcibly(getDisconnectCompletionTimeout());
}
catch (MqttException e) {
logger.error("Could not disconnect from the client", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,7 +61,14 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter<T, C> extends Mess
/**
* The default completion timeout in milliseconds.
*/
public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L;
@Deprecated(since = "6.0.3", forRemoval = true)
public static final long DEFAULT_COMPLETION_TIMEOUT = ClientManager.DEFAULT_COMPLETION_TIMEOUT;

/**
* The default disconnect completion timeout in milliseconds.
*/
@Deprecated(since = "6.0.3", forRemoval = true)
public static final long DISCONNECT_COMPLETION_TIMEOUT = ClientManager.DISCONNECT_COMPLETION_TIMEOUT;

protected final Lock topicLock = new ReentrantLock(); // NOSONAR

Expand All @@ -73,7 +80,9 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter<T, C> extends Mess

private final ClientManager<T, C> clientManager;

private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
private long completionTimeout = ClientManager.DEFAULT_COMPLETION_TIMEOUT;

private long disconnectCompletionTimeout = ClientManager.DISCONNECT_COMPLETION_TIMEOUT;

private boolean manualAcks;

Expand Down Expand Up @@ -179,6 +188,20 @@ public String[] getTopic() {
}
}

/**
* Set the completion timeout when disconnecting.
* Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
* @param completionTimeout The timeout.
* @since 5.1.10
*/
public synchronized void setDisconnectCompletionTimeout(long completionTimeout) {
this.disconnectCompletionTimeout = completionTimeout;
}

protected long getDisconnectCompletionTimeout() {
return this.disconnectCompletionTimeout;
}

@Override
protected void onInit() {
super.onInit();
Expand Down Expand Up @@ -223,8 +246,8 @@ protected boolean isManualAcks() {
}

/**
* Set the completion timeout for operations. Not settable using the namespace.
* Default {@value #DEFAULT_COMPLETION_TIMEOUT} milliseconds.
* Set the completion timeout for operations.
* Default {@value ClientManager#DEFAULT_COMPLETION_TIMEOUT} milliseconds.
* @param completionTimeout The timeout.
* @since 4.1
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,15 @@ public class MqttPahoMessageDrivenChannelAdapter
extends AbstractMqttMessageDrivenChannelAdapter<IMqttAsyncClient, MqttConnectOptions>
implements MqttCallbackExtended, MqttPahoComponent {

/**
* The default disconnect completion timeout in milliseconds.
*/
public static final long DISCONNECT_COMPLETION_TIMEOUT = 5_000L;

private final MqttPahoClientFactory clientFactory;

private long disconnectCompletionTimeout = DISCONNECT_COMPLETION_TIMEOUT;

private volatile IMqttAsyncClient client;

private volatile boolean cleanSession;

@SuppressWarnings("deprecation")
private volatile org.springframework.integration.mqtt.core.ConsumerStopAction consumerStopAction;

private volatile boolean readyToSubscribeOnStart;

/**
* Use this constructor when you don't need additional {@link MqttConnectOptions}.
* @param url The URL.
Expand Down Expand Up @@ -138,16 +131,6 @@ public MqttPahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient, MqttC
this.clientFactory = factory;
}

/**
* Set the completion timeout when disconnecting. Not settable using the namespace.
* Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
* @param completionTimeout The timeout.
* @since 5.1.10
*/
public synchronized void setDisconnectCompletionTimeout(long completionTimeout) {
this.disconnectCompletionTimeout = completionTimeout;
}

@Override
public MqttConnectOptions getConnectionInfo() {
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
Expand Down Expand Up @@ -175,6 +158,9 @@ protected void onInit() {
protected void doStart() {
try {
connect();
if (this.readyToSubscribeOnStart) {
subscribe();
}
}
catch (Exception ex) {
if (getConnectionInfo().isAutomaticReconnect()) {
Expand All @@ -195,15 +181,38 @@ protected void doStart() {
}
}

@SuppressWarnings("deprecation")
private synchronized void connect() throws MqttException {
MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
this.consumerStopAction = this.clientFactory.getConsumerStopAction();
if (this.consumerStopAction == null) {
this.consumerStopAction = org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_CLEAN;
}

var clientManager = getClientManager();
if (clientManager == null) {
Assert.state(getUrl() != null || connectionOptions.getServerURIs() != null,
"If no 'url' provided, connectionOptions.getServerURIs() must not be null");
this.client = this.clientFactory.getAsyncClientInstance(getUrl(), getClientId());
this.client.setCallback(this);
this.client.connect(connectionOptions).waitForCompletion(getCompletionTimeout());
this.client.setManualAcks(isManualAcks());
}
else {
this.client = clientManager.getClient();
}
}

@SuppressWarnings("deprecation")
@Override
protected synchronized void doStop() {
this.readyToSubscribeOnStart = false;
try {
if (this.consumerStopAction
.equals(org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_ALWAYS)
|| (this.consumerStopAction
.equals(org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_CLEAN)
&& this.cleanSession)) {
&& this.clientFactory.getConnectionOptions().isCleanSession())) {

this.client.unsubscribe(getTopic());
}
Expand All @@ -217,7 +226,7 @@ protected synchronized void doStop() {
}

try {
this.client.disconnectForcibly(this.disconnectCompletionTimeout);
this.client.disconnectForcibly(getDisconnectCompletionTimeout());
}
catch (MqttException ex) {
logger.error(ex, "Exception while disconnecting");
Expand Down Expand Up @@ -273,29 +282,6 @@ public void removeTopic(String... topic) {
}
}

@SuppressWarnings("deprecation")
private synchronized void connect() throws MqttException { // NOSONAR
MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
this.cleanSession = connectionOptions.isCleanSession();
this.consumerStopAction = this.clientFactory.getConsumerStopAction();
if (this.consumerStopAction == null) {
this.consumerStopAction = org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_CLEAN;
}

var clientManager = getClientManager();
if (clientManager == null) {
Assert.state(getUrl() != null || connectionOptions.getServerURIs() != null,
"If no 'url' provided, connectionOptions.getServerURIs() must not be null");
this.client = this.clientFactory.getAsyncClientInstance(getUrl(), getClientId());
this.client.setCallback(this);
this.client.connect(connectionOptions).waitForCompletion(getCompletionTimeout());
this.client.setManualAcks(isManualAcks());
}
else {
this.client = clientManager.getClient();
}
}

private void subscribe() {
this.topicLock.lock();
String[] topics = getTopic();
Expand Down Expand Up @@ -418,9 +404,12 @@ public void connectComplete(boolean isReconnect) {

@Override
public void connectComplete(boolean reconnect, String serverURI) {
if (!reconnect) {
if (isRunning()) {
subscribe();
}
else {
this.readyToSubscribeOnStart = true;
}
}

/**
Expand Down
Loading