Skip to content

Commit b088091

Browse files
artembilangaryrussell
authored andcommitted
GH-4002: Deprecate ConsumerStopAction in MQTT (#4006)
Fixes #4002 Turns out the `ConsumerStopAction` was introduced in a point version to mitigate an unsubscription bug and let to preserve a previous behaviour. * Deprecate `ConsumerStopAction` in favor of just `cleanSession` flag in the `MqttConnectOptions` **Cherry-pick to `5.5.x`**
1 parent 070d186 commit b088091

File tree

5 files changed

+36
-50
lines changed

5 files changed

+36
-50
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ConsumerStopAction.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,12 +17,17 @@
1717
package org.springframework.integration.mqtt.core;
1818

1919
/**
20-
* Action to take regarding subscrptions when consumer stops.
20+
* Action to take regarding subscriptions when consumer stops.
2121
*
2222
* @author Gary Russell
23+
*
2324
* @since 4.2.3
2425
*
26+
* @deprecated since 5.5.17
27+
* in favor of standard {@link org.eclipse.paho.client.mqttv3.MqttConnectOptions#setCleanSession(boolean)}.
28+
* Will be removed in 6.1.0.
2529
*/
30+
@Deprecated
2631
public enum ConsumerStopAction {
2732

2833
/**

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/DefaultMqttPahoClientFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,6 +41,7 @@ public class DefaultMqttPahoClientFactory implements MqttPahoClientFactory {
4141

4242
private MqttClientPersistence persistence;
4343

44+
@SuppressWarnings("deprecation")
4445
private ConsumerStopAction consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN;
4546

4647
/**
@@ -55,7 +56,10 @@ public void setPersistence(MqttClientPersistence persistence) {
5556
* Get the consumer stop action.
5657
* @return the consumer stop action.
5758
* @since 4.2.3
59+
* @deprecated since 5.5.17 in favor of standard {@link MqttConnectOptions#setCleanSession(boolean)}.
60+
* Will be removed in 6.1.0.
5861
*/
62+
@Deprecated
5963
@Override
6064
public ConsumerStopAction getConsumerStopAction() {
6165
return this.consumerStopAction;
@@ -66,7 +70,10 @@ public ConsumerStopAction getConsumerStopAction() {
6670
* Default: {@link ConsumerStopAction#UNSUBSCRIBE_CLEAN}.
6771
* @param consumerStopAction the consumer stop action.
6872
* @since 4.2.3.
73+
* @deprecated since 5.5.17 in favor of standard {@link MqttConnectOptions#setCleanSession(boolean)}.
74+
* Will be removed in 6.1.0.
6975
*/
76+
@Deprecated
7077
public void setConsumerStopAction(ConsumerStopAction consumerStopAction) {
7178
this.consumerStopAction = consumerStopAction;
7279
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/MqttPahoClientFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -60,7 +60,10 @@ public interface MqttPahoClientFactory {
6060
* Get the consumer stop action.
6161
* @return the consumer stop action.
6262
* @since 4.3
63+
* @deprecated since 5.5.17 in favor of standard {@link MqttConnectOptions#setCleanSession(boolean)}.
64+
* Will be removed in 6.1.0.
6365
*/
66+
@Deprecated
6467
ConsumerStopAction getConsumerStopAction();
6568

6669
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,7 +31,6 @@
3131
import org.springframework.context.ApplicationEventPublisher;
3232
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3333
import org.springframework.integration.acks.SimpleAcknowledgment;
34-
import org.springframework.integration.mqtt.core.ConsumerStopAction;
3534
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
3635
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
3736
import org.springframework.integration.mqtt.core.MqttPahoComponent;
@@ -83,7 +82,8 @@ public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDriv
8382

8483
private volatile boolean cleanSession;
8584

86-
private volatile ConsumerStopAction consumerStopAction;
85+
@SuppressWarnings("deprecation")
86+
private volatile org.springframework.integration.mqtt.core.ConsumerStopAction consumerStopAction;
8787

8888
/**
8989
* Use this constructor for a single url (although it may be overridden if the server
@@ -184,13 +184,16 @@ protected void doStart() {
184184
}
185185
}
186186

187+
@SuppressWarnings("deprecation")
187188
@Override
188189
protected synchronized void doStop() {
189190
cancelReconnect();
190191
if (this.client != null) {
191192
try {
192-
if (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_ALWAYS)
193-
|| (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_CLEAN)
193+
if (this.consumerStopAction
194+
.equals(org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_ALWAYS)
195+
|| (this.consumerStopAction
196+
.equals(org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_CLEAN)
194197
&& this.cleanSession)) {
195198

196199
this.client.unsubscribe(getTopic());
@@ -254,12 +257,13 @@ public void removeTopic(String... topic) {
254257
}
255258
}
256259

260+
@SuppressWarnings("deprecation")
257261
private synchronized void connectAndSubscribe() throws MqttException { // NOSONAR
258262
MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
259263
this.cleanSession = connectionOptions.isCleanSession();
260264
this.consumerStopAction = this.clientFactory.getConsumerStopAction();
261265
if (this.consumerStopAction == null) {
262-
this.consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN;
266+
this.consumerStopAction = org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_CLEAN;
263267
}
264268
Assert.state(getUrl() != null || connectionOptions.getServerURIs() != null,
265269
"If no 'url' provided, connectionOptions.getServerURIs() must not be null");

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

Lines changed: 7 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import java.lang.reflect.InvocationTargetException;
2020
import java.lang.reflect.Method;
21-
import java.util.Date;
2221
import java.util.Properties;
2322
import java.util.concurrent.BlockingQueue;
2423
import java.util.concurrent.CountDownLatch;
@@ -62,7 +61,6 @@
6261
import org.springframework.integration.channel.NullChannel;
6362
import org.springframework.integration.channel.QueueChannel;
6463
import org.springframework.integration.handler.MessageProcessor;
65-
import org.springframework.integration.mqtt.core.ConsumerStopAction;
6664
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
6765
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
6866
import org.springframework.integration.mqtt.event.MqttIntegrationEvent;
@@ -124,7 +122,7 @@ public class MqttAdapterTests {
124122
public void testCloseOnBadConnectIn() throws Exception {
125123
final IMqttClient client = mock(IMqttClient.class);
126124
willThrow(new MqttException(0)).given(client).connect(any());
127-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
125+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null);
128126
adapter.start();
129127
verify(client).close();
130128
adapter.stop();
@@ -370,7 +368,7 @@ public Message<?> toMessage(Object payload, MessageHeaders headers) {
370368
@Test
371369
public void testStopActionDefault() throws Exception {
372370
final IMqttClient client = mock(IMqttClient.class);
373-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, null);
371+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null);
374372

375373
adapter.start();
376374
adapter.stop();
@@ -380,35 +378,7 @@ public void testStopActionDefault() throws Exception {
380378
@Test
381379
public void testStopActionDefaultNotClean() throws Exception {
382380
final IMqttClient client = mock(IMqttClient.class);
383-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, false, null);
384-
385-
adapter.start();
386-
adapter.stop();
387-
verifyNotUnsubscribe(client);
388-
}
389-
390-
@Test
391-
public void testStopActionAlways() throws Exception {
392-
final IMqttClient client = mock(IMqttClient.class);
393-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, false,
394-
ConsumerStopAction.UNSUBSCRIBE_ALWAYS);
395-
396-
adapter.start();
397-
adapter.stop();
398-
verifyUnsubscribe(client);
399-
400-
adapter.connectionLost(new RuntimeException("Intentional"));
401-
402-
TaskScheduler taskScheduler = TestUtils.getPropertyValue(adapter, "taskScheduler", TaskScheduler.class);
403-
404-
verify(taskScheduler, never())
405-
.schedule(any(Runnable.class), any(Date.class));
406-
}
407-
408-
@Test
409-
public void testStopActionNever() throws Exception {
410-
final IMqttClient client = mock(IMqttClient.class);
411-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
381+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, false);
412382

413383
adapter.start();
414384
adapter.stop();
@@ -439,7 +409,7 @@ public void testCustomExpressions() {
439409
@Test
440410
public void testReconnect() throws Exception {
441411
final IMqttClient client = mock(IMqttClient.class);
442-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
412+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null);
443413
adapter.setRecoveryInterval(10);
444414
LogAccessor logger = spy(TestUtils.getPropertyValue(adapter, "logger", LogAccessor.class));
445415
new DirectFieldAccessor(adapter).setPropertyValue("logger", logger);
@@ -576,7 +546,7 @@ public void testDifferentQos() throws Exception {
576546
@Test
577547
public void testNoNPEOnReconnectAndStopRaceCondition() throws Exception {
578548
final IMqttClient client = mock(IMqttClient.class);
579-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
549+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null);
580550
adapter.setRecoveryInterval(10);
581551

582552
MqttException mqttException = new MqttException(MqttException.REASON_CODE_SUBSCRIBE_FAILED);
@@ -615,8 +585,8 @@ public void testNoNPEOnReconnectAndStopRaceCondition() throws Exception {
615585
taskScheduler.destroy();
616586
}
617587

618-
private MqttPahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttClient client, Boolean cleanSession,
619-
ConsumerStopAction action) {
588+
private MqttPahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttClient client, Boolean cleanSession)
589+
throws MqttException {
620590

621591
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory() {
622592

@@ -631,9 +601,6 @@ public IMqttClient getClientInstance(String uri, String clientId) throws MqttExc
631601
if (cleanSession != null) {
632602
connectOptions.setCleanSession(cleanSession);
633603
}
634-
if (action != null) {
635-
factory.setConsumerStopAction(action);
636-
}
637604
factory.setConnectionOptions(connectOptions);
638605
given(client.isConnected()).willReturn(true);
639606
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("client", factory, "foo");

0 commit comments

Comments
 (0)