Skip to content

Correctly handle null listener in RedisMessageListenerContainer.remove(…) #3009

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
* @author Thomas Darimont
* @author Mark Paluch
* @author John Blum
* @author SEONGJUN LEE
* @see MessageListener
* @see SubscriptionListener
*/
Expand Down Expand Up @@ -770,33 +771,35 @@ else if (isListening()) {
}

private void remove(@Nullable MessageListener listener, Topic topic, ByteArrayWrapper holder,
Map<ByteArrayWrapper, Collection<MessageListener>> mapping, List<byte[]> topicToRemove) {
Map<ByteArrayWrapper, Collection<MessageListener>> mapping, List<byte[]> topicToRemove) {

Collection<MessageListener> listeners = mapping.get(holder);
Collection<MessageListener> listenersToRemove = null;

if (listeners != null) {
// remove only one listener
listeners.remove(listener);
listenersToRemove = Collections.singletonList(listener);

// start removing listeners
for (MessageListener messageListener : listenersToRemove) {
Set<Topic> topics = listenerTopics.get(messageListener);
if (topics != null) {
topics.remove(topic);
}
if (CollectionUtils.isEmpty(topics)) {
listenerTopics.remove(messageListener);
}
}
if (listeners == null || listeners.isEmpty()) {
return;
}

// if we removed everything, remove the empty holder collection
if (listeners.isEmpty()) {
mapping.remove(holder);
topicToRemove.add(holder.getArray());
Collection<MessageListener> listenersToRemove = (listener == null) ? new ArrayList<>(listeners)
: Collections.singletonList(listener);

// Remove the specified listener(s) from the original collection
listeners.removeAll(listenersToRemove);

// Start removing listeners
for (MessageListener messageListener : listenersToRemove) {
Set<Topic> topics = listenerTopics.get(messageListener);
if (topics != null) {
topics.remove(topic);
}
if (CollectionUtils.isEmpty(topics)) {
listenerTopics.remove(messageListener);
}
}

// If all listeners were removed, clean up the mapping and the holder
if (listeners.isEmpty()) {
mapping.remove(holder);
topicToRemove.add(holder.getArray());
}
}

private Subscriber createSubscriber(RedisConnectionFactory connectionFactory, Executor executor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.mockito.Mockito.*;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -30,10 +31,7 @@
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.Subscription;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException;
Expand Down Expand Up @@ -221,4 +219,34 @@ void shouldRecoverFromConnectionFailure() throws Exception {
void failsOnDuplicateInit() {
assertThatIllegalStateException().isThrownBy(() -> container.afterPropertiesSet());
}

@Test
void shouldRemoveSpecificListenerFromMappingAndListenerTopics() {
MessageListener listener1 = mock(MessageListener.class);
MessageListener listener2 = mock(MessageListener.class);
Topic topic = new ChannelTopic("topic1");

container.addMessageListener(listener1, Collections.singletonList(topic));
container.addMessageListener(listener2, Collections.singletonList(topic));

container.removeMessageListener(listener1, Collections.singletonList(topic));

container.addMessageListener(listener2, Collections.singletonList(topic));
verify(listener1, never()).onMessage(any(), any());
}

@Test
void shouldRemoveAllListenersWhenListenerIsNull() {
MessageListener listener1 = mock(MessageListener.class);
MessageListener listener2 = mock(MessageListener.class);
Topic topic = new ChannelTopic("topic1");

container.addMessageListener(listener1, Collections.singletonList(topic));
container.addMessageListener(listener2, Collections.singletonList(topic));

container.removeMessageListener(null, Collections.singletonList(topic));

verify(listener1, never()).onMessage(any(), any());
verify(listener2, never()).onMessage(any(), any());
}
}