diff --git a/pom.xml b/pom.xml
index 4c38669587..13a727c484 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.dataspring-data-redis
- 2.7.0-SNAPSHOT
+ 2.7.0-GH-964-SNAPSHOTSpring Data Redis
diff --git a/src/main/asciidoc/reference/redis-messaging.adoc b/src/main/asciidoc/reference/redis-messaging.adoc
index 86da25dfa3..917bbbee22 100644
--- a/src/main/asciidoc/reference/redis-messaging.adoc
+++ b/src/main/asciidoc/reference/redis-messaging.adoc
@@ -47,7 +47,7 @@ Due to its blocking nature, low-level subscription is not attractive, as it requ
`RedisMessageListenerContainer` acts as a message listener container. It is used to receive messages from a Redis channel and drive the `MessageListener` instances that are injected into it. The listener container is responsible for all threading of message reception and dispatches into the listener for processing. A message listener container is the intermediary between an MDP and a messaging provider and takes care of registering to receive messages, resource acquisition and release, exception conversion, and the like. This lets you as an application developer write the (possibly complex) business logic associated with receiving a message (and reacting to it) and delegates boilerplate Redis infrastructure concerns to the framework.
- A `MessageListener` can additionally implement `SubscriptionListener` to receive notifications upon subscription/unsubscribe confirmation. Listening to subscription notifications can be useful when synchronizing invocations.
+A `MessageListener` can additionally implement `SubscriptionListener` to receive notifications upon subscription/unsubscribe confirmation. Listening to subscription notifications can be useful when synchronizing invocations.
Furthermore, to minimize the application footprint, `RedisMessageListenerContainer` lets one connection and one thread be shared by multiple listeners even though they do not share a subscription. Thus, no matter how many listeners or channels an application tracks, the runtime cost remains the same throughout its lifetime. Moreover, the container allows runtime configuration changes so that you can add or remove listeners while an application is running without the need for a restart. Additionally, the container uses a lazy subscription approach, using a `RedisConnection` only when needed. If all the listeners are unsubscribed, cleanup is automatically performed, and the thread is released.
diff --git a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java
index f8eca5e292..4b511d62ea 100644
--- a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java
+++ b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2011-2021 the original author or authors.
+ * Copyright 2011-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,10 +21,19 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -47,25 +56,40 @@
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.lang.Nullable;
-import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ErrorHandler;
+import org.springframework.util.ObjectUtils;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.BackOffExecution;
+import org.springframework.util.backoff.FixedBackOff;
/**
* Container providing asynchronous behaviour for Redis message listeners. Handles the low level details of listening,
* converting and message dispatching.
*
- * As oppose to the low level Redis (one connection per subscription), the container uses only one connection that is
- * 'multiplexed' for all registered listeners, the message dispatch being done through the task executor.
+ * As opposed to the low level Redis (one connection per subscription), the container uses only one connection that is
+ * 'multiplexed' for all registered listeners, the message dispatch being done through the
+ * {@link #setTaskExecutor(Executor) task executor}. It is recommended to configure the task executor (and subscription
+ * executor when using a blocking Redis connector) instead of using the default {@link SimpleAsyncTaskExecutor} for
+ * reuse of thread pools.
*
- * Note the container uses the connection in a lazy fashion (the connection is used only if at least one listener is
- * configured).
+ * The container uses a single Redis connection in a lazy fashion (the connection is used only if at least one listener
+ * is configured). Listeners can be registered eagerly before {@link #start() starting} the container to subscribe to
+ * all registered topics upon startup. Listeners are guaranteed to be subscribed after the {@link #start()} method
+ * returns.
*
- * Adding and removing listeners at the same time has undefined results. It is strongly recommended to synchronize/order
- * these methods accordingly. {@link MessageListener Listeners} that wish to receive subscription/unsubscription
- * callbacks in response to subscribe/unsubscribe commands can implement {@link SubscriptionListener}.
+ * Subscriptions are retried gracefully using {@link BackOff} that can be configured through
+ * {@link #setRecoveryInterval(long)} until reaching the maximum number of attempts. Listener errors are handled through
+ * a {@link ErrorHandler} if configured.
+ *
+ * This class can be used concurrently after initializing the container with {@link #afterPropertiesSet()} and
+ * {@link #start()} allowing concurrent calls to {@link #addMessageListener} and {@link #removeMessageListener} without
+ * external synchronization.
+ *
+ * {@link MessageListener Listeners} that wish to receive subscription/unsubscription callbacks in response to
+ * subscribe/unsubscribe commands can implement {@link SubscriptionListener}.
*
* @author Costin Leau
* @author Jennifer Hickey
@@ -89,15 +113,13 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
/**
* The default recovery interval: 5000 ms = 5 seconds.
*/
- public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
+ public static final long DEFAULT_RECOVERY_INTERVAL = FixedBackOff.DEFAULT_INTERVAL;
/**
* The default subscription wait time: 2000 ms = 2 seconds.
*/
public static final long DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME = 2000L;
- private long initWait = TimeUnit.SECONDS.toMillis(5);
-
private @Nullable Executor subscriptionExecutor;
private @Nullable Executor taskExecutor;
@@ -108,16 +130,17 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
private @Nullable ErrorHandler errorHandler;
- private final Object monitor = new Object();
+ private @Nullable Subscriber subscriber;
+
+ private final AtomicBoolean started = new AtomicBoolean();
+
// whether the container is running (or not)
- private volatile boolean running = false;
- // whether the container has been initialized
- private volatile boolean initialized = false;
- // whether the container uses a connection or not
- // (as the container might be running but w/o listeners, it won't use any resources)
- private volatile boolean listening = false;
+ private final AtomicReference state = new AtomicReference<>(State.notListening());
+
+ // whether the container has been initialized via afterPropertiesSet
+ private boolean afterPropertiesSet = false;
- private volatile boolean manageExecutor = false;
+ private boolean manageExecutor = false;
// lookup maps
// to avoid creation of hashes for each message, the maps use raw byte arrays (wrapped to respect the equals/hashcode
@@ -130,16 +153,78 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
// lookup map between listeners and channels
private final Map> listenerTopics = new ConcurrentHashMap<>();
- private final SubscriptionTask subscriptionTask = new SubscriptionTask();
-
private volatile RedisSerializer serializer = RedisSerializer.string();
- private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
+ private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);
private long maxSubscriptionRegistrationWaitingTime = DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME;
+ private volatile CompletableFuture listenFuture = new CompletableFuture<>();
+
+ private volatile CompletableFuture unsubscribeFuture = new CompletableFuture<>();
+
+ /**
+ * Container listening state.
+ */
+ static class State {
+
+ private final boolean prepareListening;
+ private final boolean listening;
+
+ private State(boolean prepareListening, boolean listening) {
+ this.prepareListening = prepareListening;
+ this.listening = listening;
+ }
+
+ /**
+ * Initial state. Next state is {@link #prepareListening()}.
+ */
+ public static State notListening() {
+ return new State(false, false);
+ }
+
+ /**
+ * Prepare listening after {@link #notListening()}. Next states are either {@link #notListening()} upon failure or
+ * {@link #listening()}.
+ */
+ public static State prepareListening() {
+ return new State(true, false);
+ }
+
+ /**
+ * Active listening state after {@link #prepareListening()}. Next is {@link #prepareUnsubscribe()}.
+ */
+ public static State listening() {
+ return new State(true, true);
+ }
+
+ /**
+ * Prepare unsubscribe after {@link #listening()}. Next state is {@link #notListening()}.
+ */
+ public static State prepareUnsubscribe() {
+ return new State(false, true);
+ }
+
+ private boolean isListenerActivated() {
+ return isListening() || isPrepareListening();
+ }
+
+ public boolean isListening() {
+ return listening;
+ }
+
+ public boolean isPrepareListening() {
+ return prepareListening;
+ }
+ }
+
@Override
public void afterPropertiesSet() {
+
+ if (this.connectionFactory == null) {
+ throw new IllegalArgumentException("RedisConnectionFactory is not set");
+ }
+
if (taskExecutor == null) {
manageExecutor = true;
taskExecutor = createDefaultTaskExecutor();
@@ -149,7 +234,9 @@ public void afterPropertiesSet() {
subscriptionExecutor = taskExecutor;
}
- initialized = true;
+ this.subscriber = createSubscriber(connectionFactory, this.subscriptionExecutor);
+
+ afterPropertiesSet = true;
}
/**
@@ -165,9 +252,14 @@ protected TaskExecutor createDefaultTaskExecutor() {
return new SimpleAsyncTaskExecutor(threadNamePrefix);
}
+ /**
+ * Destroy the container and stop it.
+ *
+ * @throws Exception
+ */
@Override
public void destroy() throws Exception {
- initialized = false;
+ afterPropertiesSet = false;
stop();
@@ -182,128 +274,214 @@ public void destroy() throws Exception {
}
}
- @Override
- public boolean isAutoStartup() {
- return true;
- }
-
- @Override
- public void stop(Runnable callback) {
- stop();
- callback.run();
- }
-
- @Override
- public int getPhase() {
- // start the latest
- return Integer.MAX_VALUE;
- }
-
- @Override
- public boolean isRunning() {
- return running;
- }
-
+ /**
+ * Startup the container and subscribe to topics if {@link MessageListener listeners} were registered prior to
+ * starting the container.
+ *
+ * This method is a potentially blocking method that blocks until a previous {@link #stop()} is finished and until all
+ * previously registered listeners are successfully subscribed.
+ *
+ * Multiple calls to this method are ignored if the container is already running. Concurrent calls to this method are
+ * synchronized until the container is started up.
+ *
+ * @see #setRecoveryInterval(long)
+ * @see #setMaxSubscriptionRegistrationWaitingTime(long)
+ * @see #stop()
+ */
@Override
public void start() {
- if (!running) {
- running = true;
- // wait for the subscription to start before returning
- // technically speaking we can only be notified right before the subscription starts
- synchronized (monitor) {
- lazyListen();
- if (listening) {
- try {
- // wait up to 5 seconds for Subscription thread
- monitor.wait(initWait);
- } catch (InterruptedException e) {
- // stop waiting
- Thread.currentThread().interrupt();
- running = false;
- return;
- }
- }
- }
+
+ if (started.compareAndSet(false, true)) {
if (logger.isDebugEnabled()) {
- logger.debug("Started RedisMessageListenerContainer");
+ logger.debug("Starting RedisMessageListenerContainer...");
}
+
+ lazyListen();
}
}
- @Override
- public void stop() {
- if (isRunning()) {
- running = false;
- subscriptionTask.cancel();
+ /**
+ * Lazily initiate subscriptions if the container has listeners.
+ */
+ private void lazyListen() {
+
+ CompletableFuture containerListenFuture = this.listenFuture;
+ State state = this.state.get();
+
+ CompletableFuture futureToAwait;
+ if (state.isPrepareListening()) {
+ futureToAwait = containerListenFuture;
+ } else {
+ futureToAwait = lazyListen(backOff.start());
}
- if (logger.isDebugEnabled()) {
- logger.debug("Stopped RedisMessageListenerContainer");
+ try {
+ futureToAwait.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ throw new CompletionException(e.getCause());
+ } catch (TimeoutException e) {
+ throw new IllegalStateException("Subscription registration timeout exceeded", e);
}
}
/**
- * Process a message received from the provider.
- *
- * @param message
- * @param pattern
+ * Method inspecting whether listening for messages (and thus using a thread) is actually needed and triggering it.
*/
- protected void processMessage(MessageListener listener, Message message, byte[] pattern) {
- executeListener(listener, message, pattern);
+ private CompletableFuture lazyListen(BackOffExecution backOffExecution) {
+
+ if (!hasTopics()) {
+ logger.debug("Postpone listening for Redis messages until actual listeners are added");
+ return CompletableFuture.completedFuture(null);
+ }
+
+ CompletableFuture containerListenFuture = this.listenFuture;
+ while (!doSubscribe(backOffExecution)) {
+ // busy-loop, allow for synchronization against doUnsubscribe therefore we want to retry.
+ containerListenFuture = this.listenFuture;
+ }
+
+ return containerListenFuture;
}
- /**
- * Execute the specified listener.
- *
- * @see #handleListenerException
- */
- protected void executeListener(MessageListener listener, Message message, byte[] pattern) {
- try {
- listener.onMessage(message, pattern);
- } catch (Throwable ex) {
- handleListenerException(ex);
+ private boolean doSubscribe(BackOffExecution backOffExecution) {
+
+ CompletableFuture containerListenFuture = this.listenFuture;
+ CompletableFuture containerUnsubscribeFuture = this.unsubscribeFuture;
+ State state = this.state.get();
+
+ // someone has called stop while we were in here.
+ if (!state.isPrepareListening() && state.isListening()) {
+ containerUnsubscribeFuture.join();
+ }
+
+ if (!this.state.compareAndSet(state, State.prepareListening())) {
+ return false;
}
+
+ CompletableFuture listenFuture = getRequiredSubscriber().initialize(backOffExecution,
+ patternMapping.keySet().stream().map(ByteArrayWrapper::getArray).collect(Collectors.toList()),
+ channelMapping.keySet().stream().map(ByteArrayWrapper::getArray).collect(Collectors.toList()));
+ listenFuture.whenComplete((unused, throwable) -> {
+
+ if (throwable == null) {
+ logger.debug("RedisMessageListenerContainer listeners registered successfully");
+ this.state.set(State.listening());
+ } else {
+ logger.debug("Failed to start RedisMessageListenerContainer listeners", throwable);
+ this.state.set(State.notListening());
+ }
+
+ propagate(unused, throwable, containerListenFuture);
+
+ // re-arm listen future for a later lazy-listen attempt
+ if (throwable != null) {
+ this.listenFuture = new CompletableFuture<>();
+ }
+ });
+
+ logger.debug("Subscribing to topics for RedisMessageListenerContainer");
+
+ return true;
}
/**
- * Return whether this container is currently active, that is, whether it has been set up but not shut down yet.
+ * Stop the message listener container and cancel any subscriptions if the container is {@link #isListening()
+ * listening}. Stopping releases any allocated connections.
+ *
+ * This method is a potentially blocking method that blocks until a previous {@link #start()} is finished and until
+ * the connection is closed if the container was listening.
+ *
+ * Multiple calls to this method are ignored if the container was already stopped. Concurrent calls to this method are
+ * synchronized until the container is stopped.
*/
- public final boolean isActive() {
- return initialized;
+ @Override
+ public void stop() {
+ stop(() -> {});
}
/**
- * Handle the given exception that arose during listener execution.
+ * Stop the message listener container and cancel any subscriptions if the container is {@link #isListening()
+ * listening}. Stopping releases any allocated connections.
*
- * The default implementation logs the exception at error level. This can be overridden in subclasses.
+ * This method is a potentially blocking method that blocks until a previous {@link #start()} is finished and until
+ * the connection is closed if the container was listening.
+ *
+ * Multiple calls to this method are ignored if the container was already stopped. Concurrent calls to this method are
+ * synchronized until the container is stopped.
*
- * @param ex the exception to handle
+ * @param callback callback to notify when the container actually stops.
*/
- protected void handleListenerException(Throwable ex) {
- if (isActive()) {
- // Regular case: failed while active.
- // Invoke ErrorHandler if available.
- invokeErrorHandler(ex);
+ @Override
+ public void stop(Runnable callback) {
+
+ if (this.started.compareAndSet(true, false)) {
+
+ stopListening();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stopped RedisMessageListenerContainer");
+ }
+
+ callback.run();
+ }
+ }
+
+ private void stopListening() {
+ while (!doUnsubscribe()) {
+ // busy-loop, allow for synchronization against doSubscribe therefore we want to retry.
+ }
+ }
+
+ private boolean doUnsubscribe() {
+
+ CompletableFuture listenFuture = this.listenFuture;
+ State state = this.state.get();
+ if (!state.isListenerActivated()) {
+ return true;
+ }
+
+ try {
+ listenFuture.join();
+ } catch (Exception e) {
+ // ignore, just await completion here.
+ }
+
+ if (this.state.compareAndSet(state, State.prepareUnsubscribe())) {
+
+ getRequiredSubscriber().cancel();
+
+ this.state.set(State.notListening());
+
+ this.listenFuture = new CompletableFuture<>();
+ this.unsubscribeFuture = new CompletableFuture<>();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stopped RedisMessageListenerContainer");
+ }
+
+ return true;
} else {
- // Rare case: listener thread failed after container shutdown.
- // Log at debug level, to avoid spamming the shutdown logger.
- logger.debug("Listener exception after container shutdown", ex);
+ return false;
}
}
+ @Override
+ public boolean isRunning() {
+ return this.started.get();
+ }
+
+ public boolean isListening() {
+ return this.state.get().isListening();
+ }
+
/**
- * Invoke the registered ErrorHandler, if any. Log at error level otherwise.
- *
- * @param ex the uncaught error that arose during message processing.
- * @see #setErrorHandler
+ * Return whether this container is currently active, that is, whether it has been set up but not shut down yet.
*/
- protected void invokeErrorHandler(Throwable ex) {
- if (this.errorHandler != null) {
- this.errorHandler.handleError(ex);
- } else if (logger.isWarnEnabled()) {
- logger.warn("Execution of message listener failed, and no ErrorHandler has been set.", ex);
- }
+ public final boolean isActive() {
+ return afterPropertiesSet;
}
/**
@@ -395,7 +573,6 @@ public void setMessageListeners(Map extends MessageListener, Collection exte
*/
public void addMessageListener(MessageListener listener, Collection extends Topic> topics) {
addListener(listener, topics);
- lazyListen();
}
/**
@@ -414,44 +591,45 @@ public void addMessageListener(MessageListener listener, Topic topic) {
* (matching) messages as soon as possible.
*
* Note that this method obeys the Redis (p)unsubscribe semantics - meaning an empty/null collection will remove
- * listener from all channels. Similarly a null listener will unsubscribe all listeners from the given topic.
+ * listener from all channels.
*
* @param listener message listener
* @param topics message listener topics
*/
- public void removeMessageListener(MessageListener listener, Collection extends Topic> topics) {
+ public void removeMessageListener(@Nullable MessageListener listener, Collection extends Topic> topics) {
removeListener(listener, topics);
}
/**
- * Removes a message listener from the from the given topic. If the container is running, the listener stops receiving
+ * Removes a message listener from the given topic. If the container is running, the listener stops receiving
* (matching) messages as soon as possible.
*
* Note that this method obeys the Redis (p)unsubscribe semantics - meaning an empty/null collection will remove
- * listener from all channels. Similarly a null listener will unsubscribe all listeners from the given topic.
+ * listener from all channels.
*
* @param listener message listener
* @param topic message topic
*/
- public void removeMessageListener(MessageListener listener, Topic topic) {
+ public void removeMessageListener(@Nullable MessageListener listener, Topic topic) {
removeMessageListener(listener, Collections.singleton(topic));
}
/**
* Removes the given message listener completely (from all topics). If the container is running, the listener stops
- * receiving (matching) messages as soon as possible. Similarly a null listener will unsubscribe all listeners from
- * the given topic.
+ * receiving (matching) messages as soon as possible.
*
* @param listener message listener
*/
public void removeMessageListener(MessageListener listener) {
- removeMessageListener(listener, Collections. emptySet());
+
+ Assert.notNull(listener, "MessageListener must not be null");
+ removeMessageListener(listener, Collections.emptySet());
}
private void initMapping(Map extends MessageListener, Collection extends Topic>> listeners) {
// stop the listener if currently running
if (isRunning()) {
- subscriptionTask.cancel();
+ stop();
}
patternMapping.clear();
@@ -465,41 +643,13 @@ private void initMapping(Map extends MessageListener, Collection extends Top
}
// resume activity
- if (initialized) {
+ if (afterPropertiesSet) {
start();
}
}
- /**
- * Method inspecting whether listening for messages (and thus using a thread) is actually needed and triggering it.
- */
- private void lazyListen() {
- boolean debug = logger.isDebugEnabled();
- boolean started = false;
-
- if (isRunning()) {
- if (!listening) {
- synchronized (monitor) {
- if (!listening) {
- if (channelMapping.size() > 0 || patternMapping.size() > 0) {
- subscriptionExecutor.execute(subscriptionTask);
- listening = true;
- started = true;
- }
- }
- }
- if (debug) {
- if (started) {
- logger.debug("Started listening for Redis messages");
- } else {
- logger.debug("Postpone listening for Redis messages until actual listeners are added");
- }
- }
- }
- }
- }
-
private void addListener(MessageListener listener, Collection extends Topic> topics) {
+
Assert.notNull(listener, "a valid listener is required");
Assert.notEmpty(topics, "at least one topic is required");
@@ -518,7 +668,7 @@ private void addListener(MessageListener listener, Collection extends Topic> t
for (Topic topic : topics) {
- ByteArrayWrapper holder = new ByteArrayWrapper(serializer.serialize(topic.getTopic()));
+ ByteArrayWrapper holder = new ByteArrayWrapper(serialize(topic));
if (topic instanceof ChannelTopic) {
Collection collection = channelMapping.get(holder);
@@ -550,20 +700,43 @@ else if (topic instanceof PatternTopic) {
throw new IllegalArgumentException("Unknown topic type '" + topic.getClass() + "'");
}
}
+ boolean wasListening = isListening();
- // check the current listening state
- if (listening) {
- subscriptionTask.subscribeChannel(channels.toArray(new byte[channels.size()][]));
- subscriptionTask.subscribePattern(patterns.toArray(new byte[patterns.size()][]));
+ if (isRunning()) {
+ lazyListen();
+
+ // check the current listening state
+ if (wasListening) {
+ CompletableFuture future = new CompletableFuture<>();
+
+ getRequiredSubscriber().addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns,
+ channels, () -> future.complete(null)));
+ getRequiredSubscriber().subscribeChannel(channels.toArray(new byte[channels.size()][]));
+ getRequiredSubscriber().subscribePattern(patterns.toArray(new byte[patterns.size()][]));
+
+ future.join();
+ }
}
}
- private void removeListener(MessageListener listener, Collection extends Topic> topics) {
+ private void removeListener(@Nullable MessageListener listener, Collection extends Topic> topics) {
+
+ Assert.notNull(topics, "Topics must not be null");
+
+ if (listener != null && listenerTopics.get(listener) == null) {
+ // Listener not subscribed
+ return;
+ }
+
+ if (topics.isEmpty()) {
+ topics = listenerTopics.get(listener);
+ }
+
boolean trace = logger.isTraceEnabled();
// check stop listening case
- if (listener == null && CollectionUtils.isEmpty(topics)) {
- subscriptionTask.cancel();
+ if (CollectionUtils.isEmpty(topics)) {
+ stopListening();
return;
}
@@ -581,13 +754,13 @@ private void removeListener(MessageListener listener, Collection extends Topic
}
for (Topic topic : topics) {
- ByteArrayWrapper holder = new ByteArrayWrapper(serializer.serialize(topic.getTopic()));
+ ByteArrayWrapper holder = new ByteArrayWrapper(serialize(topic));
if (topic instanceof ChannelTopic) {
remove(listener, topic, holder, channelMapping, channelsToRemove);
if (trace) {
- String msg = (listener != null ? "listener '" + listener + "'" : "all listeners");
+ String msg = "listener '" + listener + "'";
logger.trace("Removing " + msg + " from channel '" + topic.getTopic() + "'");
}
}
@@ -596,7 +769,7 @@ else if (topic instanceof PatternTopic) {
remove(listener, topic, holder, patternMapping, patternsToRemove);
if (trace) {
- String msg = (listener != null ? "listener '" + listener + "'" : "all listeners");
+ String msg = "listener '" + listener + "'";
logger.trace("Removing " + msg + " from pattern '" + topic.getTopic() + "'");
}
}
@@ -605,13 +778,12 @@ else if (topic instanceof PatternTopic) {
// double check whether there are still subscriptions available otherwise cancel the connection
// as most drivers forfeit the connection on unsubscribe
if (listenerTopics.isEmpty()) {
- subscriptionTask.cancel();
+ stopListening();
}
-
// check the current listening state
- else if (listening) {
- subscriptionTask.unsubscribeChannel(channelsToRemove.toArray(new byte[channelsToRemove.size()][]));
- subscriptionTask.unsubscribePattern(patternsToRemove.toArray(new byte[patternsToRemove.size()][]));
+ else if (isListening()) {
+ getRequiredSubscriber().unsubscribeChannel(channelsToRemove.toArray(new byte[channelsToRemove.size()][]));
+ getRequiredSubscriber().unsubscribePattern(patternsToRemove.toArray(new byte[patternsToRemove.size()][]));
}
}
@@ -623,15 +795,8 @@ private void remove(MessageListener listener, Topic topic, ByteArrayWrapper hold
if (listeners != null) {
// remove only one listener
- if (listener != null) {
- listeners.remove(listener);
- listenersToRemove = Collections.singletonList(listener);
- }
-
- // no listener given - remove all of them
- else {
- listenersToRemove = listeners;
- }
+ listeners.remove(listener);
+ listenersToRemove = Collections.singletonList(listener);
// start removing listeners
for (MessageListener messageListener : listenersToRemove) {
@@ -644,7 +809,7 @@ private void remove(MessageListener listener, Topic topic, ByteArrayWrapper hold
}
}
// if we removed everything, remove the empty holder collection
- if (listener == null || listeners.isEmpty()) {
+ if (listeners.isEmpty()) {
mapping.remove(holder);
topicToRemove.add(holder.getArray());
}
@@ -652,319 +817,244 @@ private void remove(MessageListener listener, Topic topic, ByteArrayWrapper hold
}
/**
- * Handle subscription task exception. Will attempt to restart the subscription if the Exception is a connection
- * failure (for example, Redis was restarted).
+ * Specify the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds.
*
- * @param ex Throwable exception
+ * @see #handleSubscriptionException
+ * @see #setRecoveryBackoff(BackOff)
*/
- protected void handleSubscriptionException(Throwable ex) {
- listening = false;
- subscriptionTask.closeConnection();
- if (ex instanceof RedisConnectionFailureException) {
- if (isRunning()) {
- logger.error("Connection failure occurred. Restarting subscription task after " + recoveryInterval + " ms");
- sleepBeforeRecoveryAttempt();
- lazyListen();
- }
- } else {
- logger.error("SubscriptionTask aborted with exception:", ex);
- }
+ public void setRecoveryInterval(long recoveryInterval) {
+ setRecoveryBackoff(new FixedBackOff(recoveryInterval, FixedBackOff.UNLIMITED_ATTEMPTS));
}
/**
- * Sleep according to the specified recovery interval. Called between recovery attempts.
+ * Specify the interval {@link BackOff} recovery attempts.
+ *
+ * @see #handleSubscriptionException
+ * @see #setRecoveryInterval(long)
+ * @since 3.0
*/
- protected void sleepBeforeRecoveryAttempt() {
- if (this.recoveryInterval > 0) {
- try {
- Thread.sleep(this.recoveryInterval);
- } catch (InterruptedException interEx) {
- logger.debug("Thread interrupted while sleeping the recovery interval");
- Thread.currentThread().interrupt();
- }
- }
+ public void setRecoveryBackoff(BackOff recoveryInterval) {
+
+ Assert.notNull(recoveryInterval, "Recovery interval must not be null");
+ this.backOff = recoveryInterval;
+ }
+
+ public long getMaxSubscriptionRegistrationWaitingTime() {
+ return maxSubscriptionRegistrationWaitingTime;
}
/**
- * Runnable used for Redis subscription. Implemented as a dedicated class to provide as many hints as possible to the
- * underlying thread pool.
+ * Specify the max time to wait for subscription registrations, in milliseconds The default is
+ * {@code 2000ms}, that is, 2 second. The timeout applies for awaiting the subscription registration. Note that
+ * subscriptions can be created asynchronously and an expired timeout does not cancel the timeout.
*
- * @author Costin Leau
+ * @param maxSubscriptionRegistrationWaitingTime the maximum subscription registration wait time
+ * @see #DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME
+ * @see #start()
*/
- private class SubscriptionTask implements SchedulingAwareRunnable {
-
- /**
- * Runnable used, on a parallel thread, to do the initial pSubscribe. This is required since, during initialization,
- * both subscribe and pSubscribe might be needed but since the first call is blocking, the second call needs to
- * executed in parallel.
- *
- * @author Costin Leau
- */
- private class PatternSubscriptionTask implements SchedulingAwareRunnable {
+ public void setMaxSubscriptionRegistrationWaitingTime(long maxSubscriptionRegistrationWaitingTime) {
+ this.maxSubscriptionRegistrationWaitingTime = maxSubscriptionRegistrationWaitingTime;
+ }
- private long WAIT = 500;
- private long ROUNDS = 3;
+ private Subscriber createSubscriber(RedisConnectionFactory connectionFactory, Executor executor) {
+ return ConnectionUtils.isAsync(connectionFactory) ? new Subscriber(connectionFactory)
+ : new BlockingSubscriber(connectionFactory, executor);
+ }
- public boolean isLongLived() {
- return false;
- }
+ /**
+ * Process a message received from the provider.
+ *
+ * @param listener the message listener to notify.
+ * @param message the received message.
+ * @param source the source, either the channel or pattern.
+ * @see #handleListenerException
+ */
+ protected void processMessage(MessageListener listener, Message message, byte[] source) {
+ try {
+ listener.onMessage(message, source);
+ } catch (Throwable ex) {
+ handleListenerException(ex);
+ }
+ }
- public void run() {
- // wait for subscription to be initialized
- boolean done = false;
- // wait 3 rounds for subscription to be initialized
- for (int i = 0; i < ROUNDS && !done; i++) {
- if (connection != null) {
- synchronized (localMonitor) {
- if (connection.isSubscribed()) {
- done = true;
- connection.getSubscription().pSubscribe(unwrap(patternMapping.keySet()));
- } else {
- try {
- Thread.sleep(WAIT);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- return;
- }
- }
- }
- }
- }
- }
+ /**
+ * Handle the given exception that arose during listener execution.
+ *
+ * The default implementation logs the exception at error level. This can be overridden in subclasses.
+ *
+ * @param ex the exception to handle
+ */
+ protected void handleListenerException(Throwable ex) {
+ if (isActive()) {
+ // Regular case: failed while active.
+ // Invoke ErrorHandler if available.
+ invokeErrorHandler(ex);
+ } else {
+ // Rare case: listener thread failed after container shutdown.
+ // Log at debug level, to avoid spamming the shutdown logger.
+ logger.debug("Listener exception after container shutdown", ex);
}
+ }
- private volatile @Nullable RedisConnection connection;
- private boolean subscriptionTaskRunning = false;
- private final Object localMonitor = new Object();
- private long subscriptionWait = TimeUnit.SECONDS.toMillis(5);
-
- public boolean isLongLived() {
- return true;
+ /**
+ * Invoke the registered ErrorHandler, if any. Log at error level otherwise.
+ *
+ * @param ex the uncaught error that arose during message processing.
+ * @see #setErrorHandler
+ */
+ protected void invokeErrorHandler(Throwable ex) {
+ if (this.errorHandler != null) {
+ this.errorHandler.handleError(ex);
+ } else if (logger.isWarnEnabled()) {
+ logger.warn("Execution of message listener failed, and no ErrorHandler has been set.", ex);
}
+ }
- public void run() {
-
- synchronized (localMonitor) {
- subscriptionTaskRunning = true;
- }
-
- try {
- connection = connectionFactory.getConnection();
- if (connection.isSubscribed()) {
- throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening");
- }
+ /**
+ * Handle subscription task exception. Will attempt to restart the subscription if the Exception is a connection
+ * failure (for example, Redis was restarted).
+ *
+ * @param ex Throwable exception
+ */
+ protected void handleSubscriptionException(CompletableFuture future, BackOffExecution backOffExecution,
+ Throwable ex) {
- boolean asyncConnection = ConnectionUtils.isAsync(connectionFactory);
+ getRequiredSubscriber().closeConnection();
- // NB: sync drivers' Xsubscribe calls block, so we notify the RDMLC before performing the actual subscription.
- if (!asyncConnection) {
- synchronized (monitor) {
- monitor.notify();
- }
- }
+ if (ex instanceof RedisConnectionFailureException && isRunning()) {
- SubscriptionPresentCondition subscriptionPresent = eventuallyPerformSubscription();
+ BackOffExecution loggingBackOffExecution = () -> {
- if (asyncConnection) {
- SpinBarrier.waitFor(subscriptionPresent, getMaxSubscriptionRegistrationWaitingTime());
+ long recoveryInterval = backOffExecution.nextBackOff();
- synchronized (monitor) {
- monitor.notify();
- }
+ if (recoveryInterval != BackOffExecution.STOP) {
+ logger.error(
+ "Connection failure occurred:" + ex + ". Restarting subscription task after " + recoveryInterval + " ms");
}
- } catch (Throwable t) {
- handleSubscriptionException(t);
- } finally {
- // this block is executed once the subscription thread has ended, this may or may not mean
- // the connection has been unsubscribed, depending on driver
- synchronized (localMonitor) {
- subscriptionTaskRunning = false;
- localMonitor.notify();
- }
- }
- }
-
- /**
- * Performs a potentially asynchronous registration of a subscription.
- *
- * @return #SubscriptionPresentCondition that can serve as a handle to check whether the subscription is ready.
- */
- private SubscriptionPresentCondition eventuallyPerformSubscription() {
- SubscriptionPresentCondition condition = null;
+ return recoveryInterval;
+ };
- if (channelMapping.isEmpty()) {
+ Runnable recoveryFunction = () -> {
- condition = new PatternSubscriptionPresentCondition();
- connection.pSubscribe(new DispatchMessageListener(), unwrap(patternMapping.keySet()));
- } else {
-
- if (patternMapping.isEmpty()) {
- condition = new SubscriptionPresentCondition();
- } else {
- // schedule the rest of the subscription
- subscriptionExecutor.execute(new PatternSubscriptionTask());
- condition = new PatternSubscriptionPresentCondition();
- }
+ CompletableFuture lazyListen = lazyListen(backOffExecution);
+ lazyListen.whenComplete(propagate(future));
+ };
- connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet()));
+ if (potentiallyRecover(loggingBackOffExecution, recoveryFunction)) {
+ return;
}
- return condition;
+ logger.error("SubscriptionTask aborted with exception:", ex);
+ future.completeExceptionally(new IllegalStateException("Subscription attempts exceeded", ex));
+ return;
}
- /**
- * Checks whether the current connection has an associated subscription.
- *
- * @author Thomas Darimont
- */
- private class SubscriptionPresentCondition implements Condition {
+ logger.error("SubscriptionTask aborted with exception:", ex);
+ future.completeExceptionally(ex);
+ }
- public boolean passes() {
- return connection.isSubscribed();
- }
- }
+ /**
+ * Sleep according to the specified recovery interval. Called between recovery attempts.
+ */
+ private boolean potentiallyRecover(BackOffExecution backOffExecution, Runnable retryRunnable) {
- /**
- * Checks whether the current connection has an associated pattern subscription.
- *
- * @author Thomas Darimont
- * @see org.springframework.data.redis.listener.RedisMessageListenerContainer.SubscriptionTask.SubscriptionPresentTestCondition
- */
- private class PatternSubscriptionPresentCondition extends SubscriptionPresentCondition {
+ long recoveryInterval = backOffExecution.nextBackOff();
- @Override
- public boolean passes() {
- return super.passes() && !CollectionUtils.isEmpty(connection.getSubscription().getPatterns());
- }
+ if (recoveryInterval == BackOffExecution.STOP) {
+ return false;
}
- private byte[][] unwrap(Collection holders) {
- if (CollectionUtils.isEmpty(holders)) {
- return new byte[0][];
- }
-
- byte[][] unwrapped = new byte[holders.size()][];
+ try {
- int index = 0;
- for (ByteArrayWrapper arrayHolder : holders) {
- unwrapped[index++] = arrayHolder.getArray();
+ if (subscriptionExecutor instanceof ScheduledExecutorService) {
+ ((ScheduledExecutorService) subscriptionExecutor).schedule(retryRunnable, recoveryInterval,
+ TimeUnit.MILLISECONDS);
+ } else {
+ Thread.sleep(recoveryInterval);
+ retryRunnable.run();
}
- return unwrapped;
+ return true;
+ } catch (InterruptedException interEx) {
+ logger.debug("Thread interrupted while sleeping the recovery interval");
+ Thread.currentThread().interrupt();
+ return false;
}
+ }
- void cancel() {
-
- if (!listening || connection == null) {
- return;
- }
+ private BiConsumer super T, ? super Throwable> propagate(CompletableFuture target) {
+ return (value, throwable) -> propagate(value, throwable, target);
+ }
- listening = false;
+ private void propagate(@Nullable T value, @Nullable Throwable throwable, CompletableFuture target) {
+ if (throwable != null) {
+ target.completeExceptionally(throwable);
+ } else {
+ target.complete(value);
+ }
+ }
- if (logger.isTraceEnabled()) {
- logger.trace("Cancelling Redis subscription...");
- }
+ private void dispatchSubscriptionNotification(Collection listeners, byte[] pattern, long count,
+ SubscriptionConsumer listenerConsumer) {
- Subscription sub = connection.getSubscription();
+ if (!CollectionUtils.isEmpty(listeners)) {
+ byte[] source = pattern.clone();
- if (sub != null) {
+ Executor executor = getRequiredTaskExecutor();
- synchronized (localMonitor) {
+ for (MessageListener messageListener : listeners) {
+ if (messageListener instanceof SubscriptionListener) {
+ executor.execute(() -> listenerConsumer.accept((SubscriptionListener) messageListener, source, count));
+ }
+ }
+ }
+ }
- if (logger.isTraceEnabled()) {
- logger.trace("Unsubscribing from all channels");
- }
+ /**
+ * Represents an operation that accepts three input arguments {@link SubscriptionListener},
+ * {@code channel or pattern}, and {@code count} and returns no result.
+ */
+ interface SubscriptionConsumer {
+ void accept(SubscriptionListener listener, byte[] channelOrPattern, long count);
+ }
- try {
- sub.close();
- } catch (Exception e) {
- logger.warn("Unable to unsubscribe from subscriptions", e);
- }
+ private void dispatchMessage(Collection listeners, Message message, @Nullable byte[] pattern) {
- if (subscriptionTaskRunning) {
- try {
- localMonitor.wait(subscriptionWait);
- } catch (InterruptedException e) {
- // Stop waiting
- Thread.currentThread().interrupt();
- }
- }
+ byte[] source = (pattern != null ? pattern.clone() : message.getChannel());
- if (!subscriptionTaskRunning) {
- closeConnection();
- } else {
- logger.warn("Unable to close connection. Subscription task still running");
- }
- }
- }
+ Executor executor = getRequiredTaskExecutor();
+ for (MessageListener messageListener : listeners) {
+ executor.execute(() -> processMessage(messageListener, message, source));
}
+ }
- void closeConnection() {
+ private boolean hasTopics() {
+ return !channelMapping.isEmpty() || !patternMapping.isEmpty();
+ }
- if (connection != null) {
- logger.trace("Closing connection");
- try {
- connection.close();
- } catch (Exception e) {
- logger.warn("Error closing subscription connection", e);
- }
- connection = null;
- }
+ private Subscriber getRequiredSubscriber() {
+
+ if (this.subscriber == null) {
+ throw new IllegalStateException(
+ "Subscriber not created. Configure RedisConnectionFactory to create a Subscriber.");
}
- void subscribeChannel(byte[]... channels) {
+ return subscriber;
+ }
- if (channels != null && channels.length > 0) {
- if (connection != null) {
- synchronized (localMonitor) {
- Subscription sub = connection.getSubscription();
- if (sub != null) {
- sub.subscribe(channels);
- }
- }
- }
- }
- }
+ private Executor getRequiredTaskExecutor() {
- void subscribePattern(byte[]... patterns) {
- if (patterns != null && patterns.length > 0) {
- if (connection != null) {
- synchronized (localMonitor) {
- Subscription sub = connection.getSubscription();
- if (sub != null) {
- sub.pSubscribe(patterns);
- }
- }
- }
- }
+ if (this.taskExecutor == null) {
+ throw new IllegalStateException("No executor configured");
}
- void unsubscribeChannel(byte[]... channels) {
- if (channels != null && channels.length > 0) {
- if (connection != null) {
- synchronized (localMonitor) {
- Subscription sub = connection.getSubscription();
- if (sub != null) {
- sub.unsubscribe(channels);
- }
- }
- }
- }
- }
+ return taskExecutor;
+ }
- void unsubscribePattern(byte[]... patterns) {
- if (patterns != null && patterns.length > 0) {
- if (connection != null) {
- synchronized (localMonitor) {
- Subscription sub = connection.getSubscription();
- if (sub != null) {
- sub.pUnsubscribe(patterns);
- }
- }
- }
- }
- }
+ @SuppressWarnings("ConstantConditions")
+ private byte[] serialize(Topic topic) {
+ return serializer.serialize(topic.getTopic());
}
/**
@@ -1021,110 +1111,263 @@ public void onPatternUnsubscribed(byte[] pattern, long count) {
}
}
- private void dispatchSubscriptionNotification(Collection listeners, byte[] pattern, long count,
- SubscriptionConsumer listenerConsumer) {
+ /**
+ * Topic subscriber controller. Keeps track of the actual Redis connection and provides entry points to initially
+ * subscribe to Redis topics and update subscriptions (add/remove).
+ *
+ * Actual subscription notifications are routed through {@link DispatchMessageListener} to multicast events to the
+ * actual listeners without blocking the event loop.
+ *
+ * @author Mark Paluch
+ * @since 3.0
+ */
+ class Subscriber {
- if (!CollectionUtils.isEmpty(listeners)) {
- byte[] source = pattern.clone();
+ private volatile @Nullable RedisConnection connection;
- for (MessageListener messageListener : listeners) {
- if (messageListener instanceof SubscriptionListener) {
- taskExecutor.execute(() -> listenerConsumer.accept((SubscriptionListener) messageListener, source, count));
+ private final RedisConnectionFactory connectionFactory;
+ private final Object localMonitor = new Object();
+ private final DispatchMessageListener delegateListener = new DispatchMessageListener();
+ private final SynchronizingMessageListener synchronizingMessageListener = new SynchronizingMessageListener(
+ delegateListener, delegateListener);
+
+ Subscriber(RedisConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ /**
+ * Perform the initial subscription.
+ *
+ * @param backOffExecution backoff execution to track the progress for retries.
+ * @param patterns patterns to subscribe to.
+ * @param channels channels to subscribe to.
+ * @return a future that is completed either successfully after establishing all subscriptions or exceptionally
+ * after an error or when running out of {@link BackOffExecution#STOP retries}.
+ */
+ public CompletableFuture initialize(BackOffExecution backOffExecution, Collection patterns,
+ Collection channels) {
+
+ synchronized (localMonitor) {
+
+ RedisConnection connection = connectionFactory.getConnection();
+ this.connection = connection;
+
+ if (connection.isSubscribed()) {
+
+ CompletableFuture failure = new CompletableFuture<>();
+ failure.completeExceptionally(
+ new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));
+ return failure;
}
+
+ CompletableFuture initFuture = new CompletableFuture<>();
+
+ try {
+ eventuallyPerformSubscription(connection, backOffExecution, initFuture, patterns, channels);
+ } catch (Throwable t) {
+ handleSubscriptionException(initFuture, backOffExecution, t);
+ }
+
+ return initFuture;
}
}
- }
-
- /**
- * Represents an operation that accepts three input arguments {@link SubscriptionListener},
- * {@code channel or pattern}, and {@code count} and returns no result.
- */
- interface SubscriptionConsumer {
- void accept(SubscriptionListener listener, byte[] channelOrPattern, long count);
- }
- private void dispatchMessage(Collection listeners, Message message, @Nullable byte[] pattern) {
+ /**
+ * Performs a potentially asynchronous registration of a subscription.
+ */
+ void eventuallyPerformSubscription(RedisConnection connection, BackOffExecution backOffExecution,
+ CompletableFuture subscriptionDone, Collection patterns, Collection channels) {
- byte[] source = (pattern != null ? pattern.clone() : message.getChannel());
+ addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, channels,
+ () -> subscriptionDone.complete(null)));
- for (MessageListener messageListener : listeners) {
- taskExecutor.execute(() -> processMessage(messageListener, message, source));
+ doSubscribe(connection, patterns, channels);
}
- }
- /**
- * Specify the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds.
- *
- * @see #handleSubscriptionException
- */
- public void setRecoveryInterval(long recoveryInterval) {
- this.recoveryInterval = recoveryInterval;
- }
+ /**
+ * Perform the actual subscription. Can be overridden by subclasses.
+ *
+ * @param connection the connection to use.
+ * @param patterns patterns to subscribe to.
+ * @param channels channels to subscribe to.
+ */
+ void doSubscribe(RedisConnection connection, Collection patterns, Collection channels) {
- public long getMaxSubscriptionRegistrationWaitingTime() {
- return maxSubscriptionRegistrationWaitingTime;
- }
+ if (!patterns.isEmpty()) {
+ connection.pSubscribe(synchronizingMessageListener, patterns.toArray(new byte[0][]));
+ }
- /**
- * Specify the max time to wait for subscription registrations, in milliseconds. The default is 2000ms, that
- * is, 2 second.
- *
- * @param maxSubscriptionRegistrationWaitingTime
- * @see #DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME
- */
- public void setMaxSubscriptionRegistrationWaitingTime(long maxSubscriptionRegistrationWaitingTime) {
- this.maxSubscriptionRegistrationWaitingTime = maxSubscriptionRegistrationWaitingTime;
- }
+ if (!channels.isEmpty()) {
+ if (patterns.isEmpty()) {
+ connection.subscribe(synchronizingMessageListener, channels.toArray(new byte[0][]));
+ } else {
+ subscribeChannel(channels.toArray(new byte[0][]));
+ }
+ }
+ }
- /**
- * @author Jennifer Hickey
- * @author Thomas Darimont Note: Placed here to avoid API exposure.
- */
- private static abstract class SpinBarrier {
+ void addSynchronization(SynchronizingMessageListener.SubscriptionSynchronizion synchronizer) {
+ this.synchronizingMessageListener.addSynchronization(synchronizer);
+ }
/**
- * Periodically tests, in 100ms intervals, for a condition until it is met or a timeout occurs.
- *
- * @param condition The condition to periodically test
- * @param timeout The timeout
- * @return true if condition passes, false if condition does not pass within timeout
+ * Cancel all subscriptions and close the connection.
*/
- static boolean waitFor(Condition condition, long timeout) {
+ public void cancel() {
+
+ synchronized (localMonitor) {
+
+ RedisConnection connection = this.connection;
+ if (connection == null) {
+ return;
+ }
- long startTime = System.currentTimeMillis();
+ if (logger.isTraceEnabled()) {
+ logger.trace("Cancelling Redis subscription...");
+ }
+
+ Subscription sub = connection.getSubscription();
+
+ if (sub != null) {
- try {
- while (!timedOut(startTime, timeout)) {
- if (condition.passes()) {
- return true;
+ if (logger.isTraceEnabled()) {
+ logger.trace("Unsubscribing from all channels");
}
- Thread.sleep(100);
+ try {
+ sub.close();
+ } catch (Exception e) {
+ logger.warn("Unable to unsubscribe from subscriptions", e);
+ }
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+
+ closeConnection();
}
+ }
- return false;
+ /**
+ * Close the current Redis connection.
+ */
+ public void closeConnection() {
+
+ synchronized (localMonitor) {
+
+ RedisConnection connection = this.connection;
+ this.connection = null;
+
+ if (connection != null) {
+ logger.trace("Closing connection");
+ try {
+ connection.close();
+ } catch (Exception e) {
+ logger.warn("Error closing subscription connection", e);
+ }
+ }
+ }
}
- private static boolean timedOut(long startTime, long timeout) {
- return (startTime + timeout) < System.currentTimeMillis();
+ /**
+ * Update an existing subscription by subscribing to additional {@code channels}.
+ *
+ * @param channels channels to subscribe to.
+ */
+ public void subscribeChannel(byte[]... channels) {
+ doWithSubscription(channels, Subscription::subscribe);
+ }
+
+ /**
+ * Update an existing subscription by subscribing to additional {@code patterns}.
+ *
+ * @param patterns patterns to subscribe to.
+ */
+ public void subscribePattern(byte[]... patterns) {
+ doWithSubscription(patterns, Subscription::pSubscribe);
+ }
+
+ /**
+ * Update an existing subscription by unsubscribing from {@code channels}.
+ *
+ * @param channels channels to unsubscribe from.
+ */
+ public void unsubscribeChannel(byte[]... channels) {
+ doWithSubscription(channels, Subscription::unsubscribe);
+ }
+
+ /**
+ * Update an existing subscription by unsubscribing from {@code patterns}.
+ *
+ * @param patterns patterns to unsubscribe from.
+ */
+ public void unsubscribePattern(byte[]... patterns) {
+ doWithSubscription(patterns, Subscription::pUnsubscribe);
+ }
+
+ private void doWithSubscription(byte[][] data, BiConsumer function) {
+
+ if (ObjectUtils.isEmpty(data)) {
+ return;
+ }
+
+ synchronized (localMonitor) {
+ RedisConnection connection = this.connection;
+ if (connection != null) {
+ Subscription sub = connection.getSubscription();
+ if (sub != null) {
+ function.accept(sub, data);
+ }
+ }
+ }
}
}
/**
- * A condition to test periodically, used in conjunction with
- * {@link org.springframework.data.redis.listener.RedisMessageListenerContainer.SpinBarrier}
+ * Blocking variant of a subscriber for connectors that block within the (p)subscribe method.
*
- * @author Jennifer Hickey
- * @author Thomas Darimont Note: Placed here to avoid API exposure.
+ * @author Mark Paluch
+ * @since 3.0
*/
- private static interface Condition {
+ class BlockingSubscriber extends Subscriber {
- /**
- * @return true if condition passes
- */
- boolean passes();
+ private final Executor executor;
+
+ BlockingSubscriber(RedisConnectionFactory connectionFactory, Executor executor) {
+ super(connectionFactory);
+ this.executor = executor;
+ }
+
+ @Override
+ protected void eventuallyPerformSubscription(RedisConnection connection, BackOffExecution backOffExecution,
+ CompletableFuture subscriptionDone, Collection patterns, Collection channels) {
+
+ Collection initiallySubscribeToChannels;
+ if (!patterns.isEmpty() && !channels.isEmpty()) {
+
+ initiallySubscribeToChannels = Collections.emptySet();
+ // perform channel subscription later as the first call to (p)subscribe blocks the client
+ addSynchronization(
+ new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, Collections.emptySet(), () -> {
+ try {
+ subscribeChannel(channels.toArray(new byte[0][]));
+ } catch (Exception e) {
+ handleSubscriptionException(subscriptionDone, backOffExecution, e);
+ }
+ }));
+ } else {
+ initiallySubscribeToChannels = channels;
+ }
+
+ addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, channels,
+ () -> subscriptionDone.complete(null)));
+
+ executor.execute(() -> {
+
+ try {
+ doSubscribe(connection, patterns, initiallySubscribeToChannels);
+ } catch (Throwable t) {
+ handleSubscriptionException(subscriptionDone, backOffExecution, t);
+ }
+ });
+ }
}
+
}
diff --git a/src/main/java/org/springframework/data/redis/listener/SynchronizingMessageListener.java b/src/main/java/org/springframework/data/redis/listener/SynchronizingMessageListener.java
new file mode 100644
index 0000000000..f51391ba38
--- /dev/null
+++ b/src/main/java/org/springframework/data/redis/listener/SynchronizingMessageListener.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.redis.listener;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.connection.SubscriptionListener;
+import org.springframework.data.redis.connection.util.ByteArrayWrapper;
+import org.springframework.lang.Nullable;
+
+/**
+ * Synchronizing {@link MessageListener} and {@link SubscriptionListener} that allows notifying a {@link Runnable}
+ * (through {@link SubscriptionSynchronizion}) upon completing subscriptions to channels or patterns.
+ *
+ * @author Mark Paluch
+ * @since 3.0
+ */
+class SynchronizingMessageListener implements MessageListener, SubscriptionListener {
+
+ private final MessageListener messageListener;
+ private final SubscriptionListener subscriptionListener;
+ private final List synchronizations = new CopyOnWriteArrayList<>();
+
+ public SynchronizingMessageListener(MessageListener messageListener, SubscriptionListener subscriptionListener) {
+ this.messageListener = messageListener;
+ this.subscriptionListener = subscriptionListener;
+ }
+
+ /**
+ * Register a {@link SubscriptionSynchronizion}.
+ *
+ * @param synchronization must not be {@literal null}.
+ */
+ public void addSynchronization(SubscriptionSynchronizion synchronization) {
+ this.synchronizations.add(synchronization);
+ }
+
+ @Override
+ public void onMessage(Message message, @Nullable byte[] pattern) {
+ messageListener.onMessage(message, pattern);
+ }
+
+ @Override
+ public void onChannelSubscribed(byte[] channel, long count) {
+
+ subscriptionListener.onChannelSubscribed(channel, count);
+ handleSubscription(channel, SubscriptionSynchronizion::onChannelSubscribed);
+ }
+
+ @Override
+ public void onChannelUnsubscribed(byte[] channel, long count) {
+ subscriptionListener.onChannelUnsubscribed(channel, count);
+ }
+
+ @Override
+ public void onPatternSubscribed(byte[] pattern, long count) {
+
+ subscriptionListener.onPatternSubscribed(pattern, count);
+ handleSubscription(pattern, SubscriptionSynchronizion::onPatternSubscribed);
+ }
+
+ @Override
+ public void onPatternUnsubscribed(byte[] pattern, long count) {
+ subscriptionListener.onPatternUnsubscribed(pattern, count);
+ }
+
+ void handleSubscription(byte[] topic,
+ BiFunction synchronizerCallback) {
+
+ if (synchronizations.isEmpty()) {
+ return;
+ }
+
+ ByteArrayWrapper binaryChannel = new ByteArrayWrapper(topic);
+ List finalized = new ArrayList<>(synchronizations.size());
+
+ for (SubscriptionSynchronizion synchronizer : synchronizations) {
+
+ if (synchronizerCallback.apply(synchronizer, binaryChannel)) {
+ finalized.add(synchronizer);
+ }
+ }
+
+ synchronizations.removeAll(finalized);
+ }
+
+ /**
+ * Synchronization to await subscriptions for channels and patterns.
+ */
+ static class SubscriptionSynchronizion {
+
+ private static final AtomicIntegerFieldUpdater DONE = AtomicIntegerFieldUpdater
+ .newUpdater(SubscriptionSynchronizion.class, "done");
+
+ private static final int NOT_DONE = 0;
+ private static final int DONE_DONE = 0;
+
+ private volatile int done = NOT_DONE;
+ private final Set remainingPatterns;
+ private final Set remainingChannels;
+
+ private final Runnable doneCallback;
+
+ public SubscriptionSynchronizion(Collection remainingPatterns, Collection remainingChannels,
+ Runnable doneCallback) {
+
+ if (remainingPatterns.isEmpty()) {
+ this.remainingPatterns = Collections.emptySet();
+ } else {
+ this.remainingPatterns = ConcurrentHashMap.newKeySet(remainingPatterns.size());
+ this.remainingPatterns
+ .addAll(remainingPatterns.stream().map(ByteArrayWrapper::new).collect(Collectors.toList()));
+ }
+
+ if (remainingChannels.isEmpty()) {
+ this.remainingChannels = Collections.emptySet();
+ } else {
+ this.remainingChannels = ConcurrentHashMap.newKeySet(remainingChannels.size());
+ this.remainingChannels
+ .addAll(remainingChannels.stream().map(ByteArrayWrapper::new).collect(Collectors.toList()));
+ }
+
+ this.doneCallback = doneCallback;
+ }
+
+ boolean onChannelSubscribed(ByteArrayWrapper channel) {
+
+ if (DONE.get(this) == NOT_DONE) {
+ remainingChannels.remove(channel);
+ return postSubscribe();
+ }
+
+ return false;
+ }
+
+ boolean onPatternSubscribed(ByteArrayWrapper pattern) {
+
+ if (DONE.get(this) == NOT_DONE) {
+ remainingPatterns.remove(pattern);
+ return postSubscribe();
+ }
+
+ return false;
+ }
+
+ /**
+ * @return whether the synchronization is finished and can be removed.
+ */
+ private boolean postSubscribe() {
+
+ if (remainingChannels.isEmpty() && remainingPatterns.isEmpty() && DONE.compareAndSet(this, NOT_DONE, DONE_DONE)) {
+ this.doneCallback.run();
+
+ return true;
+ }
+
+ return false;
+ }
+ }
+}
diff --git a/src/test/java/org/springframework/data/redis/core/RedisKeyValueAdapterUnitTests.java b/src/test/java/org/springframework/data/redis/core/RedisKeyValueAdapterUnitTests.java
index 59550b2277..ab00326dc4 100644
--- a/src/test/java/org/springframework/data/redis/core/RedisKeyValueAdapterUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/core/RedisKeyValueAdapterUnitTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2021 the original author or authors.
+ * Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.springframework.data.redis.core;
import static org.assertj.core.api.Assertions.*;
@@ -38,6 +37,7 @@
import org.springframework.data.annotation.Id;
import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisKeyValueAdapter.EnableKeyspaceEvents;
import org.springframework.data.redis.core.convert.Bucket;
@@ -72,6 +72,22 @@ void setUp() throws Exception {
template.setConnectionFactory(jedisConnectionFactoryMock);
template.afterPropertiesSet();
+ doAnswer(it -> {
+
+ SubscriptionListener listener = it.getArgument(0);
+ listener.onChannelSubscribed(it.getArgument(1), 0);
+
+ return null;
+ }).when(redisConnectionMock).subscribe(any(), any());
+
+ doAnswer(it -> {
+
+ SubscriptionListener listener = it.getArgument(0);
+ listener.onPatternSubscribed(it.getArgument(1), 0);
+
+ return null;
+ }).when(redisConnectionMock).pSubscribe(any(), any());
+
when(jedisConnectionFactoryMock.getConnection()).thenReturn(redisConnectionMock);
Properties keyspaceEventsConfig = new Properties();
diff --git a/src/test/java/org/springframework/data/redis/listener/PubSubResubscribeTests.java b/src/test/java/org/springframework/data/redis/listener/PubSubResubscribeTests.java
index 76d12a5187..d60b8c745f 100644
--- a/src/test/java/org/springframework/data/redis/listener/PubSubResubscribeTests.java
+++ b/src/test/java/org/springframework/data/redis/listener/PubSubResubscribeTests.java
@@ -33,7 +33,6 @@
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;
-import org.springframework.data.redis.SettingsUtils;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.extension.JedisConnectionFactoryExtension;
@@ -77,9 +76,6 @@ public PubSubResubscribeTests(RedisConnectionFactory connectionFactory) {
public static Collection