Skip to content

Commit ee7ad14

Browse files
artembilantzolov
authored andcommitted
GH-8720: Check MQTT topics if not empty strings
Fixes #8720 Validate MQTT topics for empty strings in the channel adapters configuration Use plural names for varargs params **Cherry-pick to `6.1.x` & `6.0.x`**
1 parent ba82efd commit ee7ad14

File tree

3 files changed

+62
-32
lines changed

3 files changed

+62
-32
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,22 @@ public AbstractMqttMessageDrivenChannelAdapter(ClientManager<T, C> clientManager
106106
this.clientId = null;
107107
}
108108

109-
private static Map<String, Integer> initTopics(String[] topic) {
110-
Assert.notNull(topic, "'topics' cannot be null");
111-
Assert.noNullElements(topic, "'topics' cannot have null elements");
109+
private static Map<String, Integer> initTopics(String[] topics) {
110+
validateTopics(topics);
112111

113-
return Arrays.stream(topic)
112+
return Arrays.stream(topics)
114113
.collect(Collectors.toMap(Function.identity(), (key) -> 1, (x, y) -> y, LinkedHashMap::new));
115114
}
116115

116+
private static void validateTopics(String[] topics) {
117+
Assert.notNull(topics, "'topics' cannot be null");
118+
Assert.noNullElements(topics, "'topics' cannot have null elements");
119+
120+
for (String topic : topics) {
121+
Assert.hasText(topic, "The topic to subscribe cannot be empty string");
122+
}
123+
}
124+
117125
public void setConverter(MqttMessageConverter converter) {
118126
Assert.notNull(converter, "'converter' cannot be null");
119127
this.converter = converter;
@@ -190,7 +198,7 @@ public String[] getTopic() {
190198

191199
/**
192200
* Set the completion timeout when disconnecting.
193-
* Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
201+
* Default {@value ClientManager#DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
194202
* @param completionTimeout The timeout.
195203
* @since 5.1.10
196204
*/
@@ -268,6 +276,7 @@ protected long getCompletionTimeout() {
268276
*/
269277
@ManagedOperation
270278
public void addTopic(String topic, int qos) {
279+
validateTopics(new String[] {topic});
271280
this.topicLock.lock();
272281
try {
273282
if (this.topics.containsKey(topic)) {
@@ -283,16 +292,16 @@ public void addTopic(String topic, int qos) {
283292

284293
/**
285294
* Add a topic (or topics) to the subscribed list (qos=1).
286-
* @param topic The topics.
287-
* @throws MessagingException if the topic is already in the list.
295+
* @param topics The topics.
296+
* @throws MessagingException if the topics is already in the list.
288297
* @since 4.1
289298
*/
290299
@ManagedOperation
291-
public void addTopic(String... topic) {
292-
Assert.notNull(topic, "'topic' cannot be null");
300+
public void addTopic(String... topics) {
301+
validateTopics(topics);
293302
this.topicLock.lock();
294303
try {
295-
for (String t : topic) {
304+
for (String t : topics) {
296305
addTopic(t, 1);
297306
}
298307
}
@@ -303,25 +312,24 @@ public void addTopic(String... topic) {
303312

304313
/**
305314
* Add topics to the subscribed list.
306-
* @param topic The topics.
315+
* @param topics The topics.
307316
* @param qos The qos for each topic.
308-
* @throws MessagingException if a topic is already in the list.
317+
* @throws MessagingException if a topics is already in the list.
309318
* @since 4.1
310319
*/
311320
@ManagedOperation
312-
public void addTopics(String[] topic, int[] qos) {
313-
Assert.notNull(topic, "'topic' cannot be null.");
314-
Assert.noNullElements(topic, "'topic' cannot contain any null elements.");
315-
Assert.isTrue(topic.length == qos.length, "topic and qos arrays must the be the same length.");
321+
public void addTopics(String[] topics, int[] qos) {
322+
validateTopics(topics);
323+
Assert.isTrue(topics.length == qos.length, "topics and qos arrays must the be the same length.");
316324
this.topicLock.lock();
317325
try {
318-
for (String newTopic : topic) {
326+
for (String newTopic : topics) {
319327
if (this.topics.containsKey(newTopic)) {
320328
throw new MessagingException("Topic '" + newTopic + "' is already subscribed.");
321329
}
322330
}
323-
for (int i = 0; i < topic.length; i++) {
324-
addTopic(topic[i], qos[i]);
331+
for (int i = 0; i < topics.length; i++) {
332+
addTopic(topics[i], qos[i]);
325333
}
326334
}
327335
finally {

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.util.concurrent.atomic.AtomicBoolean;
2020

21+
import org.springframework.beans.factory.BeanFactory;
2122
import org.springframework.beans.factory.BeanFactoryAware;
2223
import org.springframework.context.ApplicationEventPublisher;
2324
import org.springframework.context.ApplicationEventPublisherAware;
@@ -122,6 +123,7 @@ protected ApplicationEventPublisher getApplicationEventPublisher() {
122123
* @param defaultTopic the default topic.
123124
*/
124125
public void setDefaultTopic(String defaultTopic) {
126+
Assert.hasText(defaultTopic, "'defaultTopic' must not be empty");
125127
this.defaultTopic = defaultTopic;
126128
}
127129

@@ -316,14 +318,17 @@ protected ClientManager<T, C> getClientManager() {
316318
@Override
317319
protected void onInit() {
318320
super.onInit();
319-
if (this.topicProcessor instanceof BeanFactoryAware && getBeanFactory() != null) {
320-
((BeanFactoryAware) this.topicProcessor).setBeanFactory(getBeanFactory());
321-
}
322-
if (this.qosProcessor instanceof BeanFactoryAware && getBeanFactory() != null) {
323-
((BeanFactoryAware) this.qosProcessor).setBeanFactory(getBeanFactory());
324-
}
325-
if (this.retainedProcessor instanceof BeanFactoryAware && getBeanFactory() != null) {
326-
((BeanFactoryAware) this.retainedProcessor).setBeanFactory(getBeanFactory());
321+
BeanFactory beanFactory = getBeanFactory();
322+
if (beanFactory != null) {
323+
if (this.topicProcessor instanceof BeanFactoryAware beanFactoryAware) {
324+
beanFactoryAware.setBeanFactory(beanFactory);
325+
}
326+
if (this.qosProcessor instanceof BeanFactoryAware beanFactoryAware) {
327+
beanFactoryAware.setBeanFactory(beanFactory);
328+
}
329+
if (this.retainedProcessor instanceof BeanFactoryAware beanFactoryAware) {
330+
beanFactoryAware.setBeanFactory(beanFactory);
331+
}
327332
}
328333
}
329334

@@ -354,11 +359,13 @@ public boolean isRunning() {
354359
protected void handleMessageInternal(Message<?> message) {
355360
Object mqttMessage = this.converter.fromMessage(message, Object.class);
356361
String topic = this.topicProcessor.processMessage(message);
357-
if (topic == null && this.defaultTopic == null) {
358-
throw new IllegalStateException(
359-
"No topic could be determined from the message and no default topic defined");
362+
if (topic == null) {
363+
topic = this.defaultTopic;
360364
}
361-
publish(topic == null ? this.defaultTopic : topic, mqttMessage, message);
365+
366+
Assert.state(topic != null, "No topic could be determined from the message and no default topic defined");
367+
368+
publish(topic, mqttMessage, message);
362369
}
363370

364371
protected abstract void publish(String topic, Object mqttMessage, Message<?> message);

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.springframework.integration.channel.QueueChannel;
5555
import org.springframework.integration.handler.MessageProcessor;
5656
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
57+
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
5758
import org.springframework.integration.mqtt.core.Mqttv3ClientManager;
5859
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
5960
import org.springframework.integration.mqtt.event.MqttIntegrationEvent;
@@ -73,6 +74,7 @@
7374
import org.springframework.util.ReflectionUtils;
7475

7576
import static org.assertj.core.api.Assertions.assertThat;
77+
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
7678
import static org.assertj.core.api.Assertions.fail;
7779
import static org.mockito.ArgumentMatchers.any;
7880
import static org.mockito.ArgumentMatchers.anyLong;
@@ -515,6 +517,19 @@ public void testDifferentQos() throws Exception {
515517
verify(client).disconnectForcibly(5_000L);
516518
}
517519

520+
@Test
521+
public void emptyTopicNotAllowed() {
522+
assertThatIllegalArgumentException()
523+
.isThrownBy(() ->
524+
new MqttPahoMessageDrivenChannelAdapter("client_id", mock(MqttPahoClientFactory.class), ""))
525+
.withMessage("The topic to subscribe cannot be empty string");
526+
527+
var adapter = new MqttPahoMessageDrivenChannelAdapter("client_id", mock(MqttPahoClientFactory.class), "topic1");
528+
assertThatIllegalArgumentException()
529+
.isThrownBy(() -> adapter.addTopic(""))
530+
.withMessage("The topic to subscribe cannot be empty string");
531+
}
532+
518533
private MqttPahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttAsyncClient client, Boolean cleanSession)
519534
throws MqttException {
520535

0 commit comments

Comments
 (0)