From f63ce74439e04b01065181cf8de758f0f25547b3 Mon Sep 17 00:00:00 2001 From: abilan Date: Tue, 14 Feb 2023 16:17:47 -0500 Subject: [PATCH] GH-8550: MQTT: Always re-subscribe on re-connect Fixes https://github.com/spring-projects/spring-integration/issues/8550 Turns out the Paho MQTT client does not re-subscribe when connection re-established on automatic reconnection * Fix `Mqttv5PahoMessageDrivenChannelAdapter` to always subscribe to its topics in the `connectComplete()` independently of the `reconnect` status * Verify behavior with `MOSQUITTO_CONTAINER` image restart in Docker --- ...Mqttv5PahoMessageDrivenChannelAdapter.java | 42 +++--- ...subscribeAfterAutomaticReconnectTests.java | 126 ++++++++++++++++++ 2 files changed, 146 insertions(+), 22 deletions(-) create mode 100644 spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ResubscribeAfterAutomaticReconnectTests.java diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java index 62d37815e4b..5470e40b640 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -91,7 +91,7 @@ public class Mqttv5PahoMessageDrivenChannelAdapter extends AbstractMqttMessageDr public Mqttv5PahoMessageDrivenChannelAdapter(String url, String clientId, String... topic) { super(url, clientId, topic); this.connectionOptions = new MqttConnectionOptions(); - this.connectionOptions.setServerURIs(new String[]{ url }); + this.connectionOptions.setServerURIs(new String[] {url}); this.connectionOptions.setAutomaticReconnect(true); } @@ -312,30 +312,28 @@ public void deliveryComplete(IMqttToken token) { @Override public void connectComplete(boolean reconnect, String serverURI) { - if (!reconnect) { - ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); - String[] topics = getTopic(); - this.topicLock.lock(); - try { - if (topics.length > 0) { - int[] requestedQos = getQos(); - this.mqttClient.subscribe(topics, requestedQos).waitForCompletion(getCompletionTimeout()); - String message = "Connected and subscribed to " + Arrays.toString(topics); - logger.debug(message); - if (applicationEventPublisher != null) { - applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message)); - } - } - } - catch (MqttException ex) { + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + String[] topics = getTopic(); + this.topicLock.lock(); + try { + if (topics.length > 0) { + int[] requestedQos = getQos(); + this.mqttClient.subscribe(topics, requestedQos).waitForCompletion(getCompletionTimeout()); + String message = "Connected and subscribed to " + Arrays.toString(topics); + logger.debug(message); if (applicationEventPublisher != null) { - applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex)); + applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message)); } - logger.error(ex, () -> "Error subscribing to " + Arrays.toString(topics)); } - finally { - this.topicLock.unlock(); + } + catch (MqttException ex) { + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex)); } + logger.error(ex, () -> "Error subscribing to " + Arrays.toString(topics)); + } + finally { + this.topicLock.unlock(); } } diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ResubscribeAfterAutomaticReconnectTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ResubscribeAfterAutomaticReconnectTests.java new file mode 100644 index 00000000000..2cb4e666229 --- /dev/null +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ResubscribeAfterAutomaticReconnectTests.java @@ -0,0 +1,126 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mqtt; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.EventListener; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.mqtt.event.MqttSubscribedEvent; +import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Artem Bilan + * + * @since 5.5.17 + */ +@SpringJUnitConfig +@DirtiesContext +public class ResubscribeAfterAutomaticReconnectTests implements MosquittoContainerTest { + + @Autowired + private MessageChannel mqttOutInput; + + @Autowired + private PollableChannel fromMqttChannel; + + @Autowired + private MqttConnectionOptions connectionOptions; + + @Autowired + Config config; + + @Test + void messageReceivedAfterResubscriptionOnLostConnection() throws InterruptedException { + GenericMessage testMessage = new GenericMessage<>("test"); + this.mqttOutInput.send(testMessage); + assertThat(this.fromMqttChannel.receive(10_000)).isNotNull(); + + MOSQUITTO_CONTAINER.stop(); + MOSQUITTO_CONTAINER.start(); + connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()}); + + assertThat(this.config.subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + this.mqttOutInput.send(testMessage); + assertThat(this.fromMqttChannel.receive(10_000)).isNotNull(); + } + + @Configuration + @EnableIntegration + public static class Config { + + CountDownLatch subscribeLatch = new CountDownLatch(2); + + @Bean + public MqttConnectionOptions mqttConnectOptions() { + return new MqttConnectionOptionsBuilder() + .serverURI(MosquittoContainerTest.mqttUrl()) + .automaticReconnect(true) + .build(); + } + + @Bean + @ServiceActivator(inputChannel = "mqttOutInput") + public Mqttv5PahoMessageHandler mqttOutHandler(MqttConnectionOptions mqttConnectOptions) { + Mqttv5PahoMessageHandler messageHandler = + new Mqttv5PahoMessageHandler(mqttConnectOptions, "mqttv5SIout"); + messageHandler.setDefaultTopic("siTest"); + return messageHandler; + } + + @Bean + QueueChannel fromMqttChannel() { + return new QueueChannel(); + } + + @Bean + public Mqttv5PahoMessageDrivenChannelAdapter mqttInChannelAdapter( + MqttConnectionOptions mqttConnectOptions, QueueChannel fromMqttChannel) { + + Mqttv5PahoMessageDrivenChannelAdapter messageProducer = + new Mqttv5PahoMessageDrivenChannelAdapter(mqttConnectOptions, "mqttInClient", "siTest"); + messageProducer.setOutputChannel(fromMqttChannel); + return messageProducer; + } + + @EventListener(MqttSubscribedEvent.class) + public void mqttEvents() { + this.subscribeLatch.countDown(); + } + + } + +}