From 422556af626904548e6c250e558bbe24c7c0ef0d Mon Sep 17 00:00:00 2001 From: John Blum Date: Fri, 4 Aug 2023 18:05:53 -0700 Subject: [PATCH 1/2] Prepare issue branch for 2662. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 2a4c209133..e5af72ffbc 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 3.2.0-SNAPSHOT + 3.2.0-GH-2662-SNAPSHOT Spring Data Redis Spring Data module for Redis From 5c1c158ddcd01fc31d810459dfba8f8c8fc792d1 Mon Sep 17 00:00:00 2001 From: John Blum Date: Fri, 4 Aug 2023 18:07:27 -0700 Subject: [PATCH 2/2] Simplify logic in RedisMessageListenerContainer and supporting classes Closes #2662 --- .../data/redis/listener/AbstractTopic.java | 71 ++++ .../data/redis/listener/ChannelTopic.java | 63 +--- .../data/redis/listener/PatternTopic.java | 58 +-- .../RedisMessageListenerContainer.java | 348 +++++++++--------- 4 files changed, 274 insertions(+), 266 deletions(-) create mode 100644 src/main/java/org/springframework/data/redis/listener/AbstractTopic.java diff --git a/src/main/java/org/springframework/data/redis/listener/AbstractTopic.java b/src/main/java/org/springframework/data/redis/listener/AbstractTopic.java new file mode 100644 index 0000000000..3820051093 --- /dev/null +++ b/src/main/java/org/springframework/data/redis/listener/AbstractTopic.java @@ -0,0 +1,71 @@ +/* + * Copyright 2017-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.listener; + +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; + +/** + * Abstract base class for defining {@link Topic Topics}. + * + * @author John Blum + * @see org.springframework.data.redis.listener.Topic + * @since 3.2.0 + */ +abstract class AbstractTopic implements Topic { + + private final String name; + + AbstractTopic(String label, String name) { + Assert.notNull(name,() -> label + " must not be null"); + this.name = name; + } + + @Override + public String getTopic() { + return this.name; + } + + @Override + public boolean equals(@Nullable Object obj) { + + if (this == obj) { + return true; + } + + if (!(obj instanceof AbstractTopic that)) { + return false; + } + + // Must be exact Topic type + if (this.getClass() != that.getClass()) { + return false; + } + + return ObjectUtils.nullSafeEquals(this.getTopic(), that.getTopic()); + } + + @Override + public int hashCode() { + return ObjectUtils.nullSafeHashCode(getTopic()); + } + + @Override + public String toString() { + return getTopic(); + } +} diff --git a/src/main/java/org/springframework/data/redis/listener/ChannelTopic.java b/src/main/java/org/springframework/data/redis/listener/ChannelTopic.java index 6eab92ea72..a0335ecc1c 100644 --- a/src/main/java/org/springframework/data/redis/listener/ChannelTopic.java +++ b/src/main/java/org/springframework/data/redis/listener/ChannelTopic.java @@ -15,71 +15,32 @@ */ package org.springframework.data.redis.listener; -import org.springframework.lang.Nullable; -import org.springframework.util.Assert; -import org.springframework.util.ObjectUtils; - /** - * Channel topic implementation (maps to a Redis channel). + * {@link Topic Channel Topic} implementation mapping to a Redis channel. * * @author Costin Leau * @author Mark Paluch + * @author John Blum */ -public class ChannelTopic implements Topic { - - private final String channelName; - - /** - * Constructs a new {@link ChannelTopic} instance. - * - * @param name must not be {@literal null}. - */ - public ChannelTopic(String name) { - - Assert.notNull(name, "Topic name must not be null"); - - this.channelName = name; - } +public class ChannelTopic extends AbstractTopic { /** * Create a new {@link ChannelTopic} for channel subscriptions. * - * @param name the channel name, must not be {@literal null} or empty. - * @return the {@link ChannelTopic} for {@code channelName}. + * @param channelName {@link String name} of the Redis channel; must not be {@literal null} or {@literal empty}. + * @return the {@link ChannelTopic} for the given {@code channelName}. * @since 2.1 */ - public static ChannelTopic of(String name) { - return new ChannelTopic(name); + public static ChannelTopic of(String channelName) { + return new ChannelTopic(channelName); } /** - * @return topic name. + * Constructs a new {@link ChannelTopic} instance. + * + * @param channelName must not be {@literal null}. */ - @Override - public String getTopic() { - return channelName; - } - - @Override - public String toString() { - return channelName; - } - - @Override - public boolean equals(@Nullable Object o) { - - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - ChannelTopic that = (ChannelTopic) o; - - return ObjectUtils.nullSafeEquals(channelName, that.channelName); - } - - @Override - public int hashCode() { - return ObjectUtils.nullSafeHashCode(channelName); + public ChannelTopic(String channelName) { + super("Topic name", channelName); } } diff --git a/src/main/java/org/springframework/data/redis/listener/PatternTopic.java b/src/main/java/org/springframework/data/redis/listener/PatternTopic.java index de5e05d612..e1b5977cc2 100644 --- a/src/main/java/org/springframework/data/redis/listener/PatternTopic.java +++ b/src/main/java/org/springframework/data/redis/listener/PatternTopic.java @@ -15,38 +15,20 @@ */ package org.springframework.data.redis.listener; -import org.springframework.lang.Nullable; -import org.springframework.util.Assert; -import org.springframework.util.ObjectUtils; - /** - * Pattern topic (matching multiple channels). + * {@link Topic} {@link String pattern} matching multiple Redis channels. * * @author Costin Leau * @author Mark Paluch * @author Christoph Strobl */ -public class PatternTopic implements Topic { - - private final String channelPattern; - - /** - * Constructs a new {@link PatternTopic} instance. - * - * @param pattern must not be {@literal null}. - */ - public PatternTopic(String pattern) { - - Assert.notNull(pattern, "Pattern must not be null"); - - this.channelPattern = pattern; - } +public class PatternTopic extends AbstractTopic { /** * Create a new {@link PatternTopic} for channel subscriptions based on a {@code pattern}. * - * @param pattern the channel pattern, must not be {@literal null} or empty. - * @return the {@link PatternTopic} for {@code pattern}. + * @param pattern {@link String pattern} used to match channels; must not be {@literal null} or {@literal empty}. + * @return the {@link PatternTopic} for the given {@code pattern}. * @since 2.1 */ public static PatternTopic of(String pattern) { @@ -54,33 +36,11 @@ public static PatternTopic of(String pattern) { } /** - * @return channel pattern. + * Constructs a new {@link PatternTopic} instance. + * + * @param channelPattern must not be {@literal null}. */ - @Override - public String getTopic() { - return channelPattern; - } - - @Override - public String toString() { - return channelPattern; - } - - @Override - public boolean equals(@Nullable Object o) { - - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - PatternTopic that = (PatternTopic) o; - - return ObjectUtils.nullSafeEquals(channelPattern, that.channelPattern); - } - - @Override - public int hashCode() { - return ObjectUtils.nullSafeHashCode(channelPattern); + public PatternTopic(String channelPattern) { + super("Pattern", channelPattern); } } diff --git a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java index b0f2042e00..20b351236b 100644 --- a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.commons.logging.Log; @@ -98,20 +99,12 @@ * @author Way Joke * @author Thomas Darimont * @author Mark Paluch + * @author John Blum * @see MessageListener * @see SubscriptionListener */ public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle { - /** Logger available to subclasses */ - protected final Log logger = LogFactory.getLog(getClass()); - - /** - * Default thread name prefix: "RedisListeningContainer-". - */ - public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RedisMessageListenerContainer.class) - + "-"; - /** * The default recovery interval: 5000 ms = 5 seconds. */ @@ -122,6 +115,17 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab */ public static final long DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME = 2000L; + /** + * Default thread name prefix: "RedisListeningContainer-". + */ + public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RedisMessageListenerContainer.class) + + "-"; + + /** Logger available to subclasses */ + protected final Log logger = LogFactory.getLog(getClass()); + + private @Nullable ErrorHandler errorHandler; + private @Nullable Executor subscriptionExecutor; private @Nullable Executor taskExecutor; @@ -130,8 +134,6 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab private @Nullable String beanName; - private @Nullable ErrorHandler errorHandler; - private @Nullable Subscriber subscriber; private final AtomicBoolean started = new AtomicBoolean(); @@ -142,50 +144,46 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab // whether the container has been initialized via afterPropertiesSet private boolean afterPropertiesSet = false; + // whether the TaskExecutor was created by the container private boolean manageExecutor = false; - // lookup maps - // to avoid creation of hashes for each message, the maps use raw byte arrays (wrapped to respect the equals/hashcode - // contract) + private long maxSubscriptionRegistrationWaitingTime = DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME; + + private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS); + + private volatile CompletableFuture listenFuture = new CompletableFuture<>(); + + private volatile CompletableFuture unsubscribeFuture = new CompletableFuture<>(); + + // Lookup maps; to avoid creation of hashes for each message, the maps use raw byte arrays (wrapped to respect + // the equals/hashcode contract) - // lookup map between patterns and listeners - private final Map> patternMapping = new ConcurrentHashMap<>(); // lookup map between channels and listeners private final Map> channelMapping = new ConcurrentHashMap<>(); + // lookup map between patterns and listeners + private final Map> patternMapping = new ConcurrentHashMap<>(); // lookup map between listeners and channels private final Map> listenerTopics = new ConcurrentHashMap<>(); private volatile RedisSerializer serializer = RedisSerializer.string(); - private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS); - - private long maxSubscriptionRegistrationWaitingTime = DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME; - - private volatile CompletableFuture listenFuture = new CompletableFuture<>(); - - private volatile CompletableFuture unsubscribeFuture = new CompletableFuture<>(); - @Override public void afterPropertiesSet() { - Assert.state(!afterPropertiesSet, "Container already initialized"); - - if (this.connectionFactory == null) { - throw new IllegalArgumentException("RedisConnectionFactory is not set"); - } + Assert.state(!this.afterPropertiesSet, "Container already initialized"); + Assert.notNull(this.connectionFactory, "RedisConnectionFactory is not set"); - if (taskExecutor == null) { - manageExecutor = true; - taskExecutor = createDefaultTaskExecutor(); + if (this.taskExecutor == null) { + this.manageExecutor = true; + this.taskExecutor = createDefaultTaskExecutor(); } - if (subscriptionExecutor == null) { - subscriptionExecutor = taskExecutor; + if (this.subscriptionExecutor == null) { + this.subscriptionExecutor = this.taskExecutor; } this.subscriber = createSubscriber(connectionFactory, this.subscriptionExecutor); - - afterPropertiesSet = true; + this.afterPropertiesSet = true; } /** @@ -197,7 +195,7 @@ public void afterPropertiesSet() { * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String) */ protected TaskExecutor createDefaultTaskExecutor() { - String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX); + String threadNamePrefix = this.beanName != null ? this.beanName + "-" : DEFAULT_THREAD_NAME_PREFIX; return new SimpleAsyncTaskExecutor(threadNamePrefix); } @@ -208,17 +206,15 @@ protected TaskExecutor createDefaultTaskExecutor() { */ @Override public void destroy() throws Exception { - afterPropertiesSet = false; - stop(); + this.afterPropertiesSet = false; - if (manageExecutor) { - if (taskExecutor instanceof DisposableBean) { - ((DisposableBean) taskExecutor).destroy(); + stop(); - if (logger.isDebugEnabled()) { - logger.debug("Stopped internally-managed task executor"); - } + if (this.manageExecutor) { + if (this.taskExecutor instanceof DisposableBean bean) { + bean.destroy(); + logDebug(() -> "Stopped internally-managed task executor"); } } } @@ -241,11 +237,7 @@ public void destroy() throws Exception { public void start() { if (started.compareAndSet(false, true)) { - - if (logger.isDebugEnabled()) { - logger.debug("Starting RedisMessageListenerContainer..."); - } - + logDebug(() -> "Starting RedisMessageListenerContainer..."); lazyListen(); } } @@ -258,26 +250,22 @@ private void lazyListen() { CompletableFuture containerListenFuture = this.listenFuture; State state = this.state.get(); - CompletableFuture futureToAwait; - if (state.isPrepareListening()) { - futureToAwait = containerListenFuture; - } else { - futureToAwait = lazyListen(backOff.start()); - } + CompletableFuture futureToAwait = state.isPrepareListening() ? containerListenFuture + : lazyListen(this.backOff.start()); try { futureToAwait.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { + } catch (InterruptedException cause) { Thread.currentThread().interrupt(); - } catch (ExecutionException e) { + } catch (ExecutionException cause) { - if (e.getCause() instanceof DataAccessException) { - throw new RedisListenerExecutionFailedException(e.getMessage(), e.getCause()); + if (cause.getCause() instanceof DataAccessException) { + throw new RedisListenerExecutionFailedException(cause.getMessage(), cause.getCause()); } - throw new CompletionException(e.getCause()); - } catch (TimeoutException e) { - throw new IllegalStateException("Subscription registration timeout exceeded", e); + throw new CompletionException(cause.getCause()); + } catch (TimeoutException cause) { + throw new IllegalStateException("Subscription registration timeout exceeded", cause); } } @@ -292,6 +280,7 @@ private CompletableFuture lazyListen(BackOffExecution backOffExecution) { } CompletableFuture containerListenFuture = this.listenFuture; + while (!doSubscribe(backOffExecution)) { // busy-loop, allow for synchronization against doUnsubscribe therefore we want to retry. containerListenFuture = this.listenFuture; @@ -304,6 +293,7 @@ private boolean doSubscribe(BackOffExecution backOffExecution) { CompletableFuture containerListenFuture = this.listenFuture; CompletableFuture containerUnsubscribeFuture = this.unsubscribeFuture; + State state = this.state.get(); // someone has called stop while we were in here. @@ -393,6 +383,7 @@ private boolean doUnsubscribe() { CompletableFuture listenFuture = this.listenFuture; State state = this.state.get(); + if (!state.isListenerActivated()) { return true; } @@ -421,12 +412,12 @@ private boolean doUnsubscribe() { } private void awaitRegistrationTime(CompletableFuture future) { + try { future.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { + } catch (InterruptedException cause) { Thread.currentThread().interrupt(); - } catch (ExecutionException | TimeoutException e) { - // ignore + } catch (ExecutionException | TimeoutException ignore) { } } @@ -443,7 +434,7 @@ public boolean isListening() { * Return whether this container is currently active, that is, whether it has been set up but not shut down yet. */ public final boolean isActive() { - return afterPropertiesSet; + return this.afterPropertiesSet; } /** @@ -453,7 +444,7 @@ public final boolean isActive() { */ @Nullable public RedisConnectionFactory getConnectionFactory() { - return connectionFactory; + return this.connectionFactory; } /** @@ -462,6 +453,7 @@ public RedisConnectionFactory getConnectionFactory() { public void setConnectionFactory(RedisConnectionFactory connectionFactory) { Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); + this.connectionFactory = connectionFactory; } @@ -471,14 +463,24 @@ public void setBeanName(String name) { } /** - * Sets the task executor used for running the message listeners when messages are received. If no task executor is - * set, an instance of {@link SimpleAsyncTaskExecutor} will be used by default. The task executor can be adjusted - * depending on the work done by the listeners and the number of messages coming in. + * Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default + * there will be no ErrorHandler so that error-level logging is the only result. + */ + public void setErrorHandler(ErrorHandler errorHandler) { + this.errorHandler = errorHandler; + } + + /** + * Attaches the given listeners (and their topics) to the container. + *

+ * Note: it's possible to call this method while the container is running forcing a reinitialization of the container. + * Note however that this might cause some messages to be lost (while the container reinitializes) - hence calling + * this method at runtime is considered advanced usage. * - * @param taskExecutor The taskExecutor to set. + * @param listeners map of message listeners and their associated topics */ - public void setTaskExecutor(Executor taskExecutor) { - this.taskExecutor = taskExecutor; + public void setMessageListeners(Map> listeners) { + initMapping(listeners); } /** @@ -496,34 +498,24 @@ public void setSubscriptionExecutor(Executor subscriptionExecutor) { } /** - * Sets the serializer for converting the {@link Topic}s into low-level channels and patterns. By default, - * {@link StringRedisSerializer} is used. + * Sets the task executor used for running the message listeners when messages are received. If no task executor is + * set, an instance of {@link SimpleAsyncTaskExecutor} will be used by default. The task executor can be adjusted + * depending on the work done by the listeners and the number of messages coming in. * - * @param serializer The serializer to set. - */ - public void setTopicSerializer(RedisSerializer serializer) { - this.serializer = serializer; - } - - /** - * Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default - * there will be no ErrorHandler so that error-level logging is the only result. + * @param taskExecutor The taskExecutor to set. */ - public void setErrorHandler(ErrorHandler errorHandler) { - this.errorHandler = errorHandler; + public void setTaskExecutor(Executor taskExecutor) { + this.taskExecutor = taskExecutor; } /** - * Attaches the given listeners (and their topics) to the container. - *

- * Note: it's possible to call this method while the container is running forcing a reinitialization of the container. - * Note however that this might cause some messages to be lost (while the container reinitializes) - hence calling - * this method at runtime is considered advanced usage. + * Sets the serializer for converting the {@link Topic}s into low-level channels and patterns. By default, + * {@link StringRedisSerializer} is used. * - * @param listeners map of message listeners and their associated topics + * @param serializer The serializer to set. */ - public void setMessageListeners(Map> listeners) { - initMapping(listeners); + public void setTopicSerializer(RedisSerializer serializer) { + this.serializer = serializer; } /** @@ -585,10 +577,12 @@ public void removeMessageListener(@Nullable MessageListener listener, Topic topi public void removeMessageListener(MessageListener listener) { Assert.notNull(listener, "MessageListener must not be null"); + removeMessageListener(listener, Collections.emptySet()); } private void initMapping(Map> listeners) { + // stop the listener if currently running if (isRunning()) { stop(); @@ -605,7 +599,7 @@ private void initMapping(Map t List channels = new ArrayList<>(topics.size()); List patterns = new ArrayList<>(topics.size()); - boolean trace = logger.isTraceEnabled(); - // add listener mapping Set set = listenerTopics.get(listener); + if (set == null) { set = new CopyOnWriteArraySet<>(); listenerTopics.put(listener, set); } + set.addAll(topics); for (Topic topic : topics) { - ByteArrayWrapper holder = new ByteArrayWrapper(serialize(topic)); + ByteArrayWrapper serializedTopic = new ByteArrayWrapper(serialize(topic)); if (topic instanceof ChannelTopic) { - Collection collection = channelMapping.get(holder); - if (collection == null) { - collection = new CopyOnWriteArraySet<>(); - channelMapping.put(holder, collection); - } + Collection collection = resolveMessageListeners(this.channelMapping, serializedTopic); collection.add(listener); - channels.add(holder.getArray()); - - if (trace) - logger.trace("Adding listener '" + listener + "' on channel '" + topic.getTopic() + "'"); + channels.add(serializedTopic.getArray()); + logTrace(() -> "Adding listener '" + listener + "' on channel '" + topic.getTopic() + "'"); } - else if (topic instanceof PatternTopic) { - Collection collection = patternMapping.get(holder); - if (collection == null) { - collection = new CopyOnWriteArraySet<>(); - patternMapping.put(holder, collection); - } + Collection collection = resolveMessageListeners(this.patternMapping, serializedTopic); collection.add(listener); - patterns.add(holder.getArray()); - - if (trace) - logger.trace("Adding listener '" + listener + "' for pattern '" + topic.getTopic() + "'"); + patterns.add(serializedTopic.getArray()); + logTrace(() -> "Adding listener '" + listener + "' for pattern '" + topic.getTopic() + "'"); } - else { throw new IllegalArgumentException("Unknown topic type '" + topic.getClass() + "'"); } @@ -665,31 +645,66 @@ else if (topic instanceof PatternTopic) { boolean wasListening = isListening(); if (isRunning()) { + lazyListen(); // check the current listening state if (wasListening) { + CompletableFuture future = new CompletableFuture<>(); - getRequiredSubscriber().addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, - channels, () -> future.complete(null))); + getRequiredSubscriber().addSynchronization(newSubscriptionSynchronization(patterns, channels, + () -> future.complete(null))); getRequiredSubscriber().subscribeChannel(channels.toArray(new byte[channels.size()][])); getRequiredSubscriber().subscribePattern(patterns.toArray(new byte[patterns.size()][])); try { future.join(); - } catch (CompletionException e) { + } catch (CompletionException cause) { - if (e.getCause() instanceof DataAccessException) { - throw new RedisListenerExecutionFailedException(e.getMessage(), e.getCause()); + if (cause.getCause() instanceof DataAccessException) { + throw new RedisListenerExecutionFailedException(cause.getMessage(), cause.getCause()); } - throw e; + throw cause; } } } } + private void logDebug(Supplier message) { + + if (this.logger.isDebugEnabled()) { + this.logger.debug(message.get()); + } + } + + private void logTrace(Supplier message) { + + if (this.logger.isTraceEnabled()) { + this.logger.trace(message.get()); + } + } + + private SynchronizingMessageListener.SubscriptionSynchronizion newSubscriptionSynchronization( + Collection patterns, Collection channels, Runnable doneCallback) { + + return new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, channels, doneCallback); + } + + private Collection resolveMessageListeners( + Map> mapping, ByteArrayWrapper topic) { + + Collection messageListeners = mapping.get(topic); + + if (messageListeners == null) { + messageListeners = new CopyOnWriteArraySet<>(); + mapping.put(topic, messageListeners); + } + + return messageListeners; + } + private void removeListener(@Nullable MessageListener listener, Collection topics) { Assert.notNull(topics, "Topics must not be null"); @@ -725,14 +740,14 @@ private void removeListener(@Nullable MessageListener listener, Collection * The default implementation logs the exception at error level. This can be overridden in subclasses. * - * @param ex the exception to handle + * @param cause the exception to handle */ - protected void handleListenerException(Throwable ex) { + protected void handleListenerException(Throwable cause) { + if (isActive()) { // Regular case: failed while active. // Invoke ErrorHandler if available. - invokeErrorHandler(ex); + invokeErrorHandler(cause); } else { // Rare case: listener thread failed after container shutdown. // Log at debug level, to avoid spamming the shutdown logger. - logger.debug("Listener exception after container shutdown", ex); + logger.debug("Listener exception after container shutdown", cause); } } /** * Invoke the registered ErrorHandler, if any. Log at error level otherwise. * - * @param ex the uncaught error that arose during message processing. + * @param cause the uncaught error that arose during message processing. * @see #setErrorHandler */ - protected void invokeErrorHandler(Throwable ex) { + protected void invokeErrorHandler(Throwable cause) { + if (this.errorHandler != null) { - this.errorHandler.handleError(ex); + this.errorHandler.handleError(cause); } else if (logger.isWarnEnabled()) { - logger.warn("Execution of message listener failed, and no ErrorHandler has been set", ex); + logger.warn("Execution of message listener failed, and no ErrorHandler has been set", cause); } } @@ -924,6 +943,7 @@ protected void handleSubscriptionException(CompletableFuture future, BackO if (isRunning()) { // log only if the container is still running to prevent close errors from logging logger.error("SubscriptionTask aborted with exception:", ex); } + future.completeExceptionally(ex); } @@ -939,7 +959,6 @@ private boolean potentiallyRecover(BackOffExecution backOffExecution, Runnable r } try { - if (subscriptionExecutor instanceof ScheduledExecutorService) { ((ScheduledExecutorService) subscriptionExecutor).schedule(retryRunnable, recoveryInterval, TimeUnit.MILLISECONDS); @@ -961,6 +980,7 @@ private boolean potentiallyRecover(BackOffExecution backOffExecution, Runnable r } private void propagate(@Nullable T value, @Nullable Throwable throwable, CompletableFuture target) { + if (throwable != null) { target.completeExceptionally(throwable); } else { @@ -972,8 +992,8 @@ private void dispatchSubscriptionNotification(Collection listen SubscriptionConsumer listenerConsumer) { if (!CollectionUtils.isEmpty(listeners)) { - byte[] source = pattern.clone(); + byte[] source = pattern.clone(); Executor executor = getRequiredTaskExecutor(); for (MessageListener messageListener : listeners) { @@ -987,34 +1007,30 @@ private void dispatchSubscriptionNotification(Collection listen private void dispatchMessage(Collection listeners, Message message, @Nullable byte[] pattern) { byte[] source = (pattern != null ? pattern.clone() : message.getChannel()); - Executor executor = getRequiredTaskExecutor(); + for (MessageListener messageListener : listeners) { executor.execute(() -> processMessage(messageListener, message, source)); } } private boolean hasTopics() { - return !channelMapping.isEmpty() || !patternMapping.isEmpty(); + return !this.channelMapping.isEmpty() || !this.patternMapping.isEmpty(); } private Subscriber getRequiredSubscriber() { - if (this.subscriber == null) { - throw new IllegalStateException( - "Subscriber not created; Configure RedisConnectionFactory to create a Subscriber"); - } + Assert.state(this.subscriber != null, + "Subscriber not created; Configure RedisConnectionFactory to create a Subscriber"); - return subscriber; + return this.subscriber; } private Executor getRequiredTaskExecutor() { - if (this.taskExecutor == null) { - throw new IllegalStateException("No executor configured"); - } + Assert.state(this.taskExecutor != null, "No executor configured"); - return taskExecutor; + return this.taskExecutor; } @SuppressWarnings("ConstantConditions") @@ -1212,7 +1228,7 @@ public CompletableFuture initialize(BackOffExecution backOffExecution, Col void eventuallyPerformSubscription(RedisConnection connection, BackOffExecution backOffExecution, CompletableFuture subscriptionDone, Collection patterns, Collection channels) { - addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, channels, + addSynchronization(newSubscriptionSynchronization(patterns, channels, () -> subscriptionDone.complete(null))); doSubscribe(connection, patterns, channels); @@ -1408,23 +1424,24 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff CompletableFuture subscriptionDone, Collection patterns, Collection channels) { Collection initiallySubscribeToChannels; + if (!patterns.isEmpty() && !channels.isEmpty()) { initiallySubscribeToChannels = Collections.emptySet(); + // perform channel subscription later as the first call to (p)subscribe blocks the client - addSynchronization( - new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, Collections.emptySet(), () -> { - try { - subscribeChannel(channels.toArray(new byte[0][])); - } catch (Exception e) { - handleSubscriptionException(subscriptionDone, backOffExecution, e); - } - })); + addSynchronization(newSubscriptionSynchronization(patterns, Collections.emptySet(), () -> { + try { + subscribeChannel(channels.toArray(new byte[0][])); + } catch (Exception cause) { + handleSubscriptionException(subscriptionDone, backOffExecution, cause); + } + })); } else { initiallySubscribeToChannels = channels; } - addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, channels, + addSynchronization(newSubscriptionSynchronization(patterns, channels, () -> subscriptionDone.complete(null))); executor.execute(() -> { @@ -1433,11 +1450,10 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff doSubscribe(connection, patterns, initiallySubscribeToChannels); closeConnection(); unsubscribeFuture.complete(null); - } catch (Throwable t) { - handleSubscriptionException(subscriptionDone, backOffExecution, t); + } catch (Throwable cause) { + handleSubscriptionException(subscriptionDone, backOffExecution, cause); } }); } } - }