From 3930884c05eff3aae966e998fcb97cebdcef44d9 Mon Sep 17 00:00:00 2001 From: "sujl95(TheWing)" Date: Mon, 30 Sep 2024 01:30:27 +0900 Subject: [PATCH] Update remove method in RedisMessageListenerContainer to handle null listener. --- .../RedisMessageListenerContainer.java | 47 ++++++++++--------- ...edisMessageListenerContainerUnitTests.java | 36 ++++++++++++-- 2 files changed, 57 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java index 63b111528b..5c9c31a460 100644 --- a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java @@ -101,6 +101,7 @@ * @author Thomas Darimont * @author Mark Paluch * @author John Blum + * @author SEONGJUN LEE * @see MessageListener * @see SubscriptionListener */ @@ -770,33 +771,35 @@ else if (isListening()) { } private void remove(@Nullable MessageListener listener, Topic topic, ByteArrayWrapper holder, - Map> mapping, List topicToRemove) { + Map> mapping, List topicToRemove) { Collection listeners = mapping.get(holder); - Collection listenersToRemove = null; - - if (listeners != null) { - // remove only one listener - listeners.remove(listener); - listenersToRemove = Collections.singletonList(listener); - - // start removing listeners - for (MessageListener messageListener : listenersToRemove) { - Set topics = listenerTopics.get(messageListener); - if (topics != null) { - topics.remove(topic); - } - if (CollectionUtils.isEmpty(topics)) { - listenerTopics.remove(messageListener); - } - } + if (listeners == null || listeners.isEmpty()) { + return; + } - // if we removed everything, remove the empty holder collection - if (listeners.isEmpty()) { - mapping.remove(holder); - topicToRemove.add(holder.getArray()); + Collection listenersToRemove = (listener == null) ? new ArrayList<>(listeners) + : Collections.singletonList(listener); + + // Remove the specified listener(s) from the original collection + listeners.removeAll(listenersToRemove); + + // Start removing listeners + for (MessageListener messageListener : listenersToRemove) { + Set topics = listenerTopics.get(messageListener); + if (topics != null) { + topics.remove(topic); + } + if (CollectionUtils.isEmpty(topics)) { + listenerTopics.remove(messageListener); } } + + // If all listeners were removed, clean up the mapping and the holder + if (listeners.isEmpty()) { + mapping.remove(holder); + topicToRemove.add(holder.getArray()); + } } private Subscriber createSubscriber(RedisConnectionFactory connectionFactory, Executor executor) { diff --git a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java index 7d71909c71..d953353253 100644 --- a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java +++ b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java @@ -19,6 +19,7 @@ import static org.mockito.Mockito.*; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -30,10 +31,7 @@ import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.SyncTaskExecutor; import org.springframework.data.redis.RedisConnectionFailureException; -import org.springframework.data.redis.connection.RedisConnection; -import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.connection.Subscription; -import org.springframework.data.redis.connection.SubscriptionListener; +import org.springframework.data.redis.connection.*; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException; @@ -221,4 +219,34 @@ void shouldRecoverFromConnectionFailure() throws Exception { void failsOnDuplicateInit() { assertThatIllegalStateException().isThrownBy(() -> container.afterPropertiesSet()); } + + @Test + void shouldRemoveSpecificListenerFromMappingAndListenerTopics() { + MessageListener listener1 = mock(MessageListener.class); + MessageListener listener2 = mock(MessageListener.class); + Topic topic = new ChannelTopic("topic1"); + + container.addMessageListener(listener1, Collections.singletonList(topic)); + container.addMessageListener(listener2, Collections.singletonList(topic)); + + container.removeMessageListener(listener1, Collections.singletonList(topic)); + + container.addMessageListener(listener2, Collections.singletonList(topic)); + verify(listener1, never()).onMessage(any(), any()); + } + + @Test + void shouldRemoveAllListenersWhenListenerIsNull() { + MessageListener listener1 = mock(MessageListener.class); + MessageListener listener2 = mock(MessageListener.class); + Topic topic = new ChannelTopic("topic1"); + + container.addMessageListener(listener1, Collections.singletonList(topic)); + container.addMessageListener(listener2, Collections.singletonList(topic)); + + container.removeMessageListener(null, Collections.singletonList(topic)); + + verify(listener1, never()).onMessage(any(), any()); + verify(listener2, never()).onMessage(any(), any()); + } }