diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ConsumerStopAction.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ConsumerStopAction.java index 639b61744c8..1bffae3f944 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ConsumerStopAction.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ConsumerStopAction.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,12 +17,17 @@ package org.springframework.integration.mqtt.core; /** - * Action to take regarding subscrptions when consumer stops. + * Action to take regarding subscriptions when consumer stops. * * @author Gary Russell + * * @since 4.2.3 * + * @deprecated since 5.5.17 + * in favor of standard {@link org.eclipse.paho.client.mqttv3.MqttConnectOptions#setCleanSession(boolean)}. + * Will be removed in 6.1.0. */ +@Deprecated public enum ConsumerStopAction { /** diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/DefaultMqttPahoClientFactory.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/DefaultMqttPahoClientFactory.java index c17eed727e1..6472173c297 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/DefaultMqttPahoClientFactory.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/DefaultMqttPahoClientFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,6 +41,7 @@ public class DefaultMqttPahoClientFactory implements MqttPahoClientFactory { private MqttClientPersistence persistence; + @SuppressWarnings("deprecation") private ConsumerStopAction consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN; /** @@ -55,7 +56,10 @@ public void setPersistence(MqttClientPersistence persistence) { * Get the consumer stop action. * @return the consumer stop action. * @since 4.2.3 + * @deprecated since 5.5.17 in favor of standard {@link MqttConnectOptions#setCleanSession(boolean)}. + * Will be removed in 6.1.0. */ + @Deprecated @Override public ConsumerStopAction getConsumerStopAction() { return this.consumerStopAction; @@ -66,7 +70,10 @@ public ConsumerStopAction getConsumerStopAction() { * Default: {@link ConsumerStopAction#UNSUBSCRIBE_CLEAN}. * @param consumerStopAction the consumer stop action. * @since 4.2.3. + * @deprecated since 5.5.17 in favor of standard {@link MqttConnectOptions#setCleanSession(boolean)}. + * Will be removed in 6.1.0. */ + @Deprecated public void setConsumerStopAction(ConsumerStopAction consumerStopAction) { this.consumerStopAction = consumerStopAction; } diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/MqttPahoClientFactory.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/MqttPahoClientFactory.java index 1a760b39a96..f15e0208ece 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/MqttPahoClientFactory.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/MqttPahoClientFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -60,7 +60,10 @@ public interface MqttPahoClientFactory { * Get the consumer stop action. * @return the consumer stop action. * @since 4.3 + * @deprecated since 5.5.17 in favor of standard {@link MqttConnectOptions#setCleanSession(boolean)}. + * Will be removed in 6.1.0. */ + @Deprecated ConsumerStopAction getConsumerStopAction(); } diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java index c41b84aacba..cd4f54cc014 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,7 +32,6 @@ import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.acks.SimpleAcknowledgment; import org.springframework.integration.mqtt.core.ClientManager; -import org.springframework.integration.mqtt.core.ConsumerStopAction; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoComponent; @@ -78,7 +77,8 @@ public class MqttPahoMessageDrivenChannelAdapter private volatile boolean cleanSession; - private volatile ConsumerStopAction consumerStopAction; + @SuppressWarnings("deprecation") + private volatile org.springframework.integration.mqtt.core.ConsumerStopAction consumerStopAction; /** * Use this constructor when you don't need additional {@link MqttConnectOptions}. @@ -195,11 +195,14 @@ protected void doStart() { } } + @SuppressWarnings("deprecation") @Override protected synchronized void doStop() { try { - if (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_ALWAYS) - || (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_CLEAN) + if (this.consumerStopAction + .equals(org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_ALWAYS) + || (this.consumerStopAction + .equals(org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_CLEAN) && this.cleanSession)) { this.client.unsubscribe(getTopic()); @@ -270,12 +273,13 @@ public void removeTopic(String... topic) { } } + @SuppressWarnings("deprecation") private synchronized void connect() throws MqttException { // NOSONAR MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions(); this.cleanSession = connectionOptions.isCleanSession(); this.consumerStopAction = this.clientFactory.getConsumerStopAction(); if (this.consumerStopAction == null) { - this.consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN; + this.consumerStopAction = org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_CLEAN; } var clientManager = getClientManager(); diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java index 724a352bba5..ed82b6f6e43 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,7 +53,6 @@ import org.springframework.integration.channel.NullChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.handler.MessageProcessor; -import org.springframework.integration.mqtt.core.ConsumerStopAction; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.Mqttv3ClientManager; import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; @@ -373,7 +372,7 @@ public Message toMessage(Object payload, MessageHeaders headers) { @Test public void testStopActionDefault() throws Exception { final IMqttAsyncClient client = mock(IMqttAsyncClient.class); - MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, null); + MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null); adapter.start(); adapter.connectComplete(false, null); @@ -384,30 +383,7 @@ public void testStopActionDefault() throws Exception { @Test public void testStopActionDefaultNotClean() throws Exception { final IMqttAsyncClient client = mock(IMqttAsyncClient.class); - MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, false, null); - - adapter.start(); - adapter.connectComplete(false, null); - adapter.stop(); - verifyNotUnsubscribe(client); - } - - @Test - public void testStopActionAlways() throws Exception { - final IMqttAsyncClient client = mock(IMqttAsyncClient.class); - MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, false, - ConsumerStopAction.UNSUBSCRIBE_ALWAYS); - - adapter.start(); - adapter.connectComplete(false, null); - adapter.stop(); - verifyUnsubscribe(client); - } - - @Test - public void testStopActionNever() throws Exception { - final IMqttAsyncClient client = mock(IMqttAsyncClient.class); - MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER); + MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, false); adapter.start(); adapter.connectComplete(false, null); @@ -539,8 +515,8 @@ public void testDifferentQos() throws Exception { verify(client).disconnectForcibly(5_000L); } - private MqttPahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttAsyncClient client, Boolean cleanSession, - ConsumerStopAction action) throws MqttException { + private MqttPahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttAsyncClient client, Boolean cleanSession) + throws MqttException { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory() { @@ -555,9 +531,6 @@ public IMqttAsyncClient getAsyncClientInstance(String uri, String clientId) { if (cleanSession != null) { connectOptions.setCleanSession(cleanSession); } - if (action != null) { - factory.setConsumerStopAction(action); - } factory.setConnectionOptions(connectOptions); given(client.isConnected()).willReturn(true); IMqttToken token = mock(IMqttToken.class);