diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java index 62d37815e4b..5470e40b640 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -91,7 +91,7 @@ public class Mqttv5PahoMessageDrivenChannelAdapter extends AbstractMqttMessageDr public Mqttv5PahoMessageDrivenChannelAdapter(String url, String clientId, String... topic) { super(url, clientId, topic); this.connectionOptions = new MqttConnectionOptions(); - this.connectionOptions.setServerURIs(new String[]{ url }); + this.connectionOptions.setServerURIs(new String[] {url}); this.connectionOptions.setAutomaticReconnect(true); } @@ -312,30 +312,28 @@ public void deliveryComplete(IMqttToken token) { @Override public void connectComplete(boolean reconnect, String serverURI) { - if (!reconnect) { - ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); - String[] topics = getTopic(); - this.topicLock.lock(); - try { - if (topics.length > 0) { - int[] requestedQos = getQos(); - this.mqttClient.subscribe(topics, requestedQos).waitForCompletion(getCompletionTimeout()); - String message = "Connected and subscribed to " + Arrays.toString(topics); - logger.debug(message); - if (applicationEventPublisher != null) { - applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message)); - } - } - } - catch (MqttException ex) { + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + String[] topics = getTopic(); + this.topicLock.lock(); + try { + if (topics.length > 0) { + int[] requestedQos = getQos(); + this.mqttClient.subscribe(topics, requestedQos).waitForCompletion(getCompletionTimeout()); + String message = "Connected and subscribed to " + Arrays.toString(topics); + logger.debug(message); if (applicationEventPublisher != null) { - applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex)); + applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message)); } - logger.error(ex, () -> "Error subscribing to " + Arrays.toString(topics)); } - finally { - this.topicLock.unlock(); + } + catch (MqttException ex) { + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex)); } + logger.error(ex, () -> "Error subscribing to " + Arrays.toString(topics)); + } + finally { + this.topicLock.unlock(); } } diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ResubscribeAfterAutomaticReconnectTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ResubscribeAfterAutomaticReconnectTests.java new file mode 100644 index 00000000000..2cb4e666229 --- /dev/null +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ResubscribeAfterAutomaticReconnectTests.java @@ -0,0 +1,126 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mqtt; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.EventListener; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.mqtt.event.MqttSubscribedEvent; +import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Artem Bilan + * + * @since 5.5.17 + */ +@SpringJUnitConfig +@DirtiesContext +public class ResubscribeAfterAutomaticReconnectTests implements MosquittoContainerTest { + + @Autowired + private MessageChannel mqttOutInput; + + @Autowired + private PollableChannel fromMqttChannel; + + @Autowired + private MqttConnectionOptions connectionOptions; + + @Autowired + Config config; + + @Test + void messageReceivedAfterResubscriptionOnLostConnection() throws InterruptedException { + GenericMessage testMessage = new GenericMessage<>("test"); + this.mqttOutInput.send(testMessage); + assertThat(this.fromMqttChannel.receive(10_000)).isNotNull(); + + MOSQUITTO_CONTAINER.stop(); + MOSQUITTO_CONTAINER.start(); + connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()}); + + assertThat(this.config.subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + this.mqttOutInput.send(testMessage); + assertThat(this.fromMqttChannel.receive(10_000)).isNotNull(); + } + + @Configuration + @EnableIntegration + public static class Config { + + CountDownLatch subscribeLatch = new CountDownLatch(2); + + @Bean + public MqttConnectionOptions mqttConnectOptions() { + return new MqttConnectionOptionsBuilder() + .serverURI(MosquittoContainerTest.mqttUrl()) + .automaticReconnect(true) + .build(); + } + + @Bean + @ServiceActivator(inputChannel = "mqttOutInput") + public Mqttv5PahoMessageHandler mqttOutHandler(MqttConnectionOptions mqttConnectOptions) { + Mqttv5PahoMessageHandler messageHandler = + new Mqttv5PahoMessageHandler(mqttConnectOptions, "mqttv5SIout"); + messageHandler.setDefaultTopic("siTest"); + return messageHandler; + } + + @Bean + QueueChannel fromMqttChannel() { + return new QueueChannel(); + } + + @Bean + public Mqttv5PahoMessageDrivenChannelAdapter mqttInChannelAdapter( + MqttConnectionOptions mqttConnectOptions, QueueChannel fromMqttChannel) { + + Mqttv5PahoMessageDrivenChannelAdapter messageProducer = + new Mqttv5PahoMessageDrivenChannelAdapter(mqttConnectOptions, "mqttInClient", "siTest"); + messageProducer.setOutputChannel(fromMqttChannel); + return messageProducer; + } + + @EventListener(MqttSubscribedEvent.class) + public void mqttEvents() { + this.subscribeLatch.countDown(); + } + + } + +}