Skip to content

Commit 44000b9

Browse files
authored
GH-4002: Deprecate ConsumerStopAction in MQTT (#4006)
Fixes #4002 Turns out the `ConsumerStopAction` was introduced in a point version to mitigate an unsubscription bug and let to preserve a previous behaviour. * Deprecate `ConsumerStopAction` in favor of just `cleanSession` flag in the `MqttConnectOptions` **Cherry-pick to `5.5.x`**
1 parent d9d7c49 commit 44000b9

File tree

5 files changed

+34
-42
lines changed

5 files changed

+34
-42
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ConsumerStopAction.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,12 +17,17 @@
1717
package org.springframework.integration.mqtt.core;
1818

1919
/**
20-
* Action to take regarding subscrptions when consumer stops.
20+
* Action to take regarding subscriptions when consumer stops.
2121
*
2222
* @author Gary Russell
23+
*
2324
* @since 4.2.3
2425
*
26+
* @deprecated since 5.5.17
27+
* in favor of standard {@link org.eclipse.paho.client.mqttv3.MqttConnectOptions#setCleanSession(boolean)}.
28+
* Will be removed in 6.1.0.
2529
*/
30+
@Deprecated
2631
public enum ConsumerStopAction {
2732

2833
/**

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/DefaultMqttPahoClientFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,6 +41,7 @@ public class DefaultMqttPahoClientFactory implements MqttPahoClientFactory {
4141

4242
private MqttClientPersistence persistence;
4343

44+
@SuppressWarnings("deprecation")
4445
private ConsumerStopAction consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN;
4546

4647
/**
@@ -55,7 +56,10 @@ public void setPersistence(MqttClientPersistence persistence) {
5556
* Get the consumer stop action.
5657
* @return the consumer stop action.
5758
* @since 4.2.3
59+
* @deprecated since 5.5.17 in favor of standard {@link MqttConnectOptions#setCleanSession(boolean)}.
60+
* Will be removed in 6.1.0.
5861
*/
62+
@Deprecated
5963
@Override
6064
public ConsumerStopAction getConsumerStopAction() {
6165
return this.consumerStopAction;
@@ -66,7 +70,10 @@ public ConsumerStopAction getConsumerStopAction() {
6670
* Default: {@link ConsumerStopAction#UNSUBSCRIBE_CLEAN}.
6771
* @param consumerStopAction the consumer stop action.
6872
* @since 4.2.3.
73+
* @deprecated since 5.5.17 in favor of standard {@link MqttConnectOptions#setCleanSession(boolean)}.
74+
* Will be removed in 6.1.0.
6975
*/
76+
@Deprecated
7077
public void setConsumerStopAction(ConsumerStopAction consumerStopAction) {
7178
this.consumerStopAction = consumerStopAction;
7279
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/MqttPahoClientFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -60,7 +60,10 @@ public interface MqttPahoClientFactory {
6060
* Get the consumer stop action.
6161
* @return the consumer stop action.
6262
* @since 4.3
63+
* @deprecated since 5.5.17 in favor of standard {@link MqttConnectOptions#setCleanSession(boolean)}.
64+
* Will be removed in 6.1.0.
6365
*/
66+
@Deprecated
6467
ConsumerStopAction getConsumerStopAction();
6568

6669
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,7 +32,6 @@
3232
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3333
import org.springframework.integration.acks.SimpleAcknowledgment;
3434
import org.springframework.integration.mqtt.core.ClientManager;
35-
import org.springframework.integration.mqtt.core.ConsumerStopAction;
3635
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
3736
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
3837
import org.springframework.integration.mqtt.core.MqttPahoComponent;
@@ -78,7 +77,8 @@ public class MqttPahoMessageDrivenChannelAdapter
7877

7978
private volatile boolean cleanSession;
8079

81-
private volatile ConsumerStopAction consumerStopAction;
80+
@SuppressWarnings("deprecation")
81+
private volatile org.springframework.integration.mqtt.core.ConsumerStopAction consumerStopAction;
8282

8383
/**
8484
* Use this constructor when you don't need additional {@link MqttConnectOptions}.
@@ -195,11 +195,14 @@ protected void doStart() {
195195
}
196196
}
197197

198+
@SuppressWarnings("deprecation")
198199
@Override
199200
protected synchronized void doStop() {
200201
try {
201-
if (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_ALWAYS)
202-
|| (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_CLEAN)
202+
if (this.consumerStopAction
203+
.equals(org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_ALWAYS)
204+
|| (this.consumerStopAction
205+
.equals(org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_CLEAN)
203206
&& this.cleanSession)) {
204207

205208
this.client.unsubscribe(getTopic());
@@ -270,12 +273,13 @@ public void removeTopic(String... topic) {
270273
}
271274
}
272275

276+
@SuppressWarnings("deprecation")
273277
private synchronized void connect() throws MqttException { // NOSONAR
274278
MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
275279
this.cleanSession = connectionOptions.isCleanSession();
276280
this.consumerStopAction = this.clientFactory.getConsumerStopAction();
277281
if (this.consumerStopAction == null) {
278-
this.consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN;
282+
this.consumerStopAction = org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_CLEAN;
279283
}
280284

281285
var clientManager = getClientManager();

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -53,7 +53,6 @@
5353
import org.springframework.integration.channel.NullChannel;
5454
import org.springframework.integration.channel.QueueChannel;
5555
import org.springframework.integration.handler.MessageProcessor;
56-
import org.springframework.integration.mqtt.core.ConsumerStopAction;
5756
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
5857
import org.springframework.integration.mqtt.core.Mqttv3ClientManager;
5958
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
@@ -373,7 +372,7 @@ public Message<?> toMessage(Object payload, MessageHeaders headers) {
373372
@Test
374373
public void testStopActionDefault() throws Exception {
375374
final IMqttAsyncClient client = mock(IMqttAsyncClient.class);
376-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, null);
375+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null);
377376

378377
adapter.start();
379378
adapter.connectComplete(false, null);
@@ -384,30 +383,7 @@ public void testStopActionDefault() throws Exception {
384383
@Test
385384
public void testStopActionDefaultNotClean() throws Exception {
386385
final IMqttAsyncClient client = mock(IMqttAsyncClient.class);
387-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, false, null);
388-
389-
adapter.start();
390-
adapter.connectComplete(false, null);
391-
adapter.stop();
392-
verifyNotUnsubscribe(client);
393-
}
394-
395-
@Test
396-
public void testStopActionAlways() throws Exception {
397-
final IMqttAsyncClient client = mock(IMqttAsyncClient.class);
398-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, false,
399-
ConsumerStopAction.UNSUBSCRIBE_ALWAYS);
400-
401-
adapter.start();
402-
adapter.connectComplete(false, null);
403-
adapter.stop();
404-
verifyUnsubscribe(client);
405-
}
406-
407-
@Test
408-
public void testStopActionNever() throws Exception {
409-
final IMqttAsyncClient client = mock(IMqttAsyncClient.class);
410-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
386+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, false);
411387

412388
adapter.start();
413389
adapter.connectComplete(false, null);
@@ -539,8 +515,8 @@ public void testDifferentQos() throws Exception {
539515
verify(client).disconnectForcibly(5_000L);
540516
}
541517

542-
private MqttPahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttAsyncClient client, Boolean cleanSession,
543-
ConsumerStopAction action) throws MqttException {
518+
private MqttPahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttAsyncClient client, Boolean cleanSession)
519+
throws MqttException {
544520

545521
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory() {
546522

@@ -555,9 +531,6 @@ public IMqttAsyncClient getAsyncClientInstance(String uri, String clientId) {
555531
if (cleanSession != null) {
556532
connectOptions.setCleanSession(cleanSession);
557533
}
558-
if (action != null) {
559-
factory.setConsumerStopAction(action);
560-
}
561534
factory.setConnectionOptions(connectOptions);
562535
given(client.isConnected()).willReturn(true);
563536
IMqttToken token = mock(IMqttToken.class);

0 commit comments

Comments
 (0)