future) {
+
try {
future.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
+ } catch (InterruptedException cause) {
Thread.currentThread().interrupt();
- } catch (ExecutionException | TimeoutException e) {
- // ignore
+ } catch (ExecutionException | TimeoutException ignore) {
}
}
@@ -443,7 +434,7 @@ public boolean isListening() {
* Return whether this container is currently active, that is, whether it has been set up but not shut down yet.
*/
public final boolean isActive() {
- return afterPropertiesSet;
+ return this.afterPropertiesSet;
}
/**
@@ -453,7 +444,7 @@ public final boolean isActive() {
*/
@Nullable
public RedisConnectionFactory getConnectionFactory() {
- return connectionFactory;
+ return this.connectionFactory;
}
/**
@@ -462,6 +453,7 @@ public RedisConnectionFactory getConnectionFactory() {
public void setConnectionFactory(RedisConnectionFactory connectionFactory) {
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
+
this.connectionFactory = connectionFactory;
}
@@ -471,14 +463,24 @@ public void setBeanName(String name) {
}
/**
- * Sets the task executor used for running the message listeners when messages are received. If no task executor is
- * set, an instance of {@link SimpleAsyncTaskExecutor} will be used by default. The task executor can be adjusted
- * depending on the work done by the listeners and the number of messages coming in.
+ * Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default
+ * there will be no ErrorHandler so that error-level logging is the only result.
+ */
+ public void setErrorHandler(ErrorHandler errorHandler) {
+ this.errorHandler = errorHandler;
+ }
+
+ /**
+ * Attaches the given listeners (and their topics) to the container.
+ *
+ * Note: it's possible to call this method while the container is running forcing a reinitialization of the container.
+ * Note however that this might cause some messages to be lost (while the container reinitializes) - hence calling
+ * this method at runtime is considered advanced usage.
*
- * @param taskExecutor The taskExecutor to set.
+ * @param listeners map of message listeners and their associated topics
*/
- public void setTaskExecutor(Executor taskExecutor) {
- this.taskExecutor = taskExecutor;
+ public void setMessageListeners(Map extends MessageListener, Collection extends Topic>> listeners) {
+ initMapping(listeners);
}
/**
@@ -496,34 +498,24 @@ public void setSubscriptionExecutor(Executor subscriptionExecutor) {
}
/**
- * Sets the serializer for converting the {@link Topic}s into low-level channels and patterns. By default,
- * {@link StringRedisSerializer} is used.
+ * Sets the task executor used for running the message listeners when messages are received. If no task executor is
+ * set, an instance of {@link SimpleAsyncTaskExecutor} will be used by default. The task executor can be adjusted
+ * depending on the work done by the listeners and the number of messages coming in.
*
- * @param serializer The serializer to set.
- */
- public void setTopicSerializer(RedisSerializer serializer) {
- this.serializer = serializer;
- }
-
- /**
- * Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default
- * there will be no ErrorHandler so that error-level logging is the only result.
+ * @param taskExecutor The taskExecutor to set.
*/
- public void setErrorHandler(ErrorHandler errorHandler) {
- this.errorHandler = errorHandler;
+ public void setTaskExecutor(Executor taskExecutor) {
+ this.taskExecutor = taskExecutor;
}
/**
- * Attaches the given listeners (and their topics) to the container.
- *
- * Note: it's possible to call this method while the container is running forcing a reinitialization of the container.
- * Note however that this might cause some messages to be lost (while the container reinitializes) - hence calling
- * this method at runtime is considered advanced usage.
+ * Sets the serializer for converting the {@link Topic}s into low-level channels and patterns. By default,
+ * {@link StringRedisSerializer} is used.
*
- * @param listeners map of message listeners and their associated topics
+ * @param serializer The serializer to set.
*/
- public void setMessageListeners(Map extends MessageListener, Collection extends Topic>> listeners) {
- initMapping(listeners);
+ public void setTopicSerializer(RedisSerializer serializer) {
+ this.serializer = serializer;
}
/**
@@ -585,10 +577,12 @@ public void removeMessageListener(@Nullable MessageListener listener, Topic topi
public void removeMessageListener(MessageListener listener) {
Assert.notNull(listener, "MessageListener must not be null");
+
removeMessageListener(listener, Collections.emptySet());
}
private void initMapping(Map extends MessageListener, Collection extends Topic>> listeners) {
+
// stop the listener if currently running
if (isRunning()) {
stop();
@@ -605,7 +599,7 @@ private void initMapping(Map extends MessageListener, Collection extends Top
}
// resume activity
- if (afterPropertiesSet) {
+ if (this.afterPropertiesSet) {
start();
}
}
@@ -618,46 +612,32 @@ private void addListener(MessageListener listener, Collection extends Topic> t
List channels = new ArrayList<>(topics.size());
List patterns = new ArrayList<>(topics.size());
- boolean trace = logger.isTraceEnabled();
-
// add listener mapping
Set set = listenerTopics.get(listener);
+
if (set == null) {
set = new CopyOnWriteArraySet<>();
listenerTopics.put(listener, set);
}
+
set.addAll(topics);
for (Topic topic : topics) {
- ByteArrayWrapper holder = new ByteArrayWrapper(serialize(topic));
+ ByteArrayWrapper serializedTopic = new ByteArrayWrapper(serialize(topic));
if (topic instanceof ChannelTopic) {
- Collection collection = channelMapping.get(holder);
- if (collection == null) {
- collection = new CopyOnWriteArraySet<>();
- channelMapping.put(holder, collection);
- }
+ Collection collection = resolveMessageListeners(this.channelMapping, serializedTopic);
collection.add(listener);
- channels.add(holder.getArray());
-
- if (trace)
- logger.trace("Adding listener '" + listener + "' on channel '" + topic.getTopic() + "'");
+ channels.add(serializedTopic.getArray());
+ logTrace(() -> "Adding listener '" + listener + "' on channel '" + topic.getTopic() + "'");
}
-
else if (topic instanceof PatternTopic) {
- Collection collection = patternMapping.get(holder);
- if (collection == null) {
- collection = new CopyOnWriteArraySet<>();
- patternMapping.put(holder, collection);
- }
+ Collection collection = resolveMessageListeners(this.patternMapping, serializedTopic);
collection.add(listener);
- patterns.add(holder.getArray());
-
- if (trace)
- logger.trace("Adding listener '" + listener + "' for pattern '" + topic.getTopic() + "'");
+ patterns.add(serializedTopic.getArray());
+ logTrace(() -> "Adding listener '" + listener + "' for pattern '" + topic.getTopic() + "'");
}
-
else {
throw new IllegalArgumentException("Unknown topic type '" + topic.getClass() + "'");
}
@@ -665,31 +645,66 @@ else if (topic instanceof PatternTopic) {
boolean wasListening = isListening();
if (isRunning()) {
+
lazyListen();
// check the current listening state
if (wasListening) {
+
CompletableFuture future = new CompletableFuture<>();
- getRequiredSubscriber().addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns,
- channels, () -> future.complete(null)));
+ getRequiredSubscriber().addSynchronization(newSubscriptionSynchronization(patterns, channels,
+ () -> future.complete(null)));
getRequiredSubscriber().subscribeChannel(channels.toArray(new byte[channels.size()][]));
getRequiredSubscriber().subscribePattern(patterns.toArray(new byte[patterns.size()][]));
try {
future.join();
- } catch (CompletionException e) {
+ } catch (CompletionException cause) {
- if (e.getCause() instanceof DataAccessException) {
- throw new RedisListenerExecutionFailedException(e.getMessage(), e.getCause());
+ if (cause.getCause() instanceof DataAccessException) {
+ throw new RedisListenerExecutionFailedException(cause.getMessage(), cause.getCause());
}
- throw e;
+ throw cause;
}
}
}
}
+ private void logDebug(Supplier message) {
+
+ if (this.logger.isDebugEnabled()) {
+ this.logger.debug(message.get());
+ }
+ }
+
+ private void logTrace(Supplier message) {
+
+ if (this.logger.isTraceEnabled()) {
+ this.logger.trace(message.get());
+ }
+ }
+
+ private SynchronizingMessageListener.SubscriptionSynchronizion newSubscriptionSynchronization(
+ Collection patterns, Collection channels, Runnable doneCallback) {
+
+ return new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, channels, doneCallback);
+ }
+
+ private Collection resolveMessageListeners(
+ Map> mapping, ByteArrayWrapper topic) {
+
+ Collection messageListeners = mapping.get(topic);
+
+ if (messageListeners == null) {
+ messageListeners = new CopyOnWriteArraySet<>();
+ mapping.put(topic, messageListeners);
+ }
+
+ return messageListeners;
+ }
+
private void removeListener(@Nullable MessageListener listener, Collection extends Topic> topics) {
Assert.notNull(topics, "Topics must not be null");
@@ -725,14 +740,14 @@ private void removeListener(@Nullable MessageListener listener, Collection ext
}
for (Topic topic : topics) {
+
ByteArrayWrapper holder = new ByteArrayWrapper(serialize(topic));
if (topic instanceof ChannelTopic) {
remove(listener, topic, holder, channelMapping, channelsToRemove);
if (trace) {
- String msg = "listener '" + listener + "'";
- logger.trace("Removing " + msg + " from channel '" + topic.getTopic() + "'");
+ logger.trace("Removing listener '" + listener + "' from channel '" + topic.getTopic() + "'");
}
}
@@ -740,8 +755,7 @@ else if (topic instanceof PatternTopic) {
remove(listener, topic, holder, patternMapping, patternsToRemove);
if (trace) {
- String msg = "listener '" + listener + "'";
- logger.trace("Removing " + msg + " from pattern '" + topic.getTopic() + "'");
+ logger.trace("Removing listener '" + listener + "' from pattern '" + topic.getTopic() + "'");
}
}
}
@@ -779,6 +793,7 @@ private void remove(MessageListener listener, Topic topic, ByteArrayWrapper hold
listenerTopics.remove(messageListener);
}
}
+
// if we removed everything, remove the empty holder collection
if (listeners.isEmpty()) {
mapping.remove(holder);
@@ -807,11 +822,12 @@ public void setRecoveryInterval(long recoveryInterval) {
public void setRecoveryBackoff(BackOff recoveryInterval) {
Assert.notNull(recoveryInterval, "Recovery interval must not be null");
+
this.backOff = recoveryInterval;
}
public long getMaxSubscriptionRegistrationWaitingTime() {
- return maxSubscriptionRegistrationWaitingTime;
+ return this.maxSubscriptionRegistrationWaitingTime;
}
/**
@@ -841,10 +857,11 @@ private Subscriber createSubscriber(RedisConnectionFactory connectionFactory, Ex
* @see #handleListenerException
*/
protected void processMessage(MessageListener listener, Message message, byte[] source) {
+
try {
listener.onMessage(message, source);
- } catch (Throwable ex) {
- handleListenerException(ex);
+ } catch (Throwable cause) {
+ handleListenerException(cause);
}
}
@@ -853,31 +870,33 @@ protected void processMessage(MessageListener listener, Message message, byte[]
*
* The default implementation logs the exception at error level. This can be overridden in subclasses.
*
- * @param ex the exception to handle
+ * @param cause the exception to handle
*/
- protected void handleListenerException(Throwable ex) {
+ protected void handleListenerException(Throwable cause) {
+
if (isActive()) {
// Regular case: failed while active.
// Invoke ErrorHandler if available.
- invokeErrorHandler(ex);
+ invokeErrorHandler(cause);
} else {
// Rare case: listener thread failed after container shutdown.
// Log at debug level, to avoid spamming the shutdown logger.
- logger.debug("Listener exception after container shutdown", ex);
+ logger.debug("Listener exception after container shutdown", cause);
}
}
/**
* Invoke the registered ErrorHandler, if any. Log at error level otherwise.
*
- * @param ex the uncaught error that arose during message processing.
+ * @param cause the uncaught error that arose during message processing.
* @see #setErrorHandler
*/
- protected void invokeErrorHandler(Throwable ex) {
+ protected void invokeErrorHandler(Throwable cause) {
+
if (this.errorHandler != null) {
- this.errorHandler.handleError(ex);
+ this.errorHandler.handleError(cause);
} else if (logger.isWarnEnabled()) {
- logger.warn("Execution of message listener failed, and no ErrorHandler has been set", ex);
+ logger.warn("Execution of message listener failed, and no ErrorHandler has been set", cause);
}
}
@@ -924,6 +943,7 @@ protected void handleSubscriptionException(CompletableFuture future, BackO
if (isRunning()) { // log only if the container is still running to prevent close errors from logging
logger.error("SubscriptionTask aborted with exception:", ex);
}
+
future.completeExceptionally(ex);
}
@@ -939,7 +959,6 @@ private boolean potentiallyRecover(BackOffExecution backOffExecution, Runnable r
}
try {
-
if (subscriptionExecutor instanceof ScheduledExecutorService) {
((ScheduledExecutorService) subscriptionExecutor).schedule(retryRunnable, recoveryInterval,
TimeUnit.MILLISECONDS);
@@ -961,6 +980,7 @@ private boolean potentiallyRecover(BackOffExecution backOffExecution, Runnable r
}
private void propagate(@Nullable T value, @Nullable Throwable throwable, CompletableFuture target) {
+
if (throwable != null) {
target.completeExceptionally(throwable);
} else {
@@ -972,8 +992,8 @@ private void dispatchSubscriptionNotification(Collection listen
SubscriptionConsumer listenerConsumer) {
if (!CollectionUtils.isEmpty(listeners)) {
- byte[] source = pattern.clone();
+ byte[] source = pattern.clone();
Executor executor = getRequiredTaskExecutor();
for (MessageListener messageListener : listeners) {
@@ -987,34 +1007,30 @@ private void dispatchSubscriptionNotification(Collection listen
private void dispatchMessage(Collection listeners, Message message, @Nullable byte[] pattern) {
byte[] source = (pattern != null ? pattern.clone() : message.getChannel());
-
Executor executor = getRequiredTaskExecutor();
+
for (MessageListener messageListener : listeners) {
executor.execute(() -> processMessage(messageListener, message, source));
}
}
private boolean hasTopics() {
- return !channelMapping.isEmpty() || !patternMapping.isEmpty();
+ return !this.channelMapping.isEmpty() || !this.patternMapping.isEmpty();
}
private Subscriber getRequiredSubscriber() {
- if (this.subscriber == null) {
- throw new IllegalStateException(
- "Subscriber not created; Configure RedisConnectionFactory to create a Subscriber");
- }
+ Assert.state(this.subscriber != null,
+ "Subscriber not created; Configure RedisConnectionFactory to create a Subscriber");
- return subscriber;
+ return this.subscriber;
}
private Executor getRequiredTaskExecutor() {
- if (this.taskExecutor == null) {
- throw new IllegalStateException("No executor configured");
- }
+ Assert.state(this.taskExecutor != null, "No executor configured");
- return taskExecutor;
+ return this.taskExecutor;
}
@SuppressWarnings("ConstantConditions")
@@ -1212,7 +1228,7 @@ public CompletableFuture initialize(BackOffExecution backOffExecution, Col
void eventuallyPerformSubscription(RedisConnection connection, BackOffExecution backOffExecution,
CompletableFuture subscriptionDone, Collection patterns, Collection channels) {
- addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, channels,
+ addSynchronization(newSubscriptionSynchronization(patterns, channels,
() -> subscriptionDone.complete(null)));
doSubscribe(connection, patterns, channels);
@@ -1408,23 +1424,24 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
CompletableFuture subscriptionDone, Collection patterns, Collection channels) {
Collection initiallySubscribeToChannels;
+
if (!patterns.isEmpty() && !channels.isEmpty()) {
initiallySubscribeToChannels = Collections.emptySet();
+
// perform channel subscription later as the first call to (p)subscribe blocks the client
- addSynchronization(
- new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, Collections.emptySet(), () -> {
- try {
- subscribeChannel(channels.toArray(new byte[0][]));
- } catch (Exception e) {
- handleSubscriptionException(subscriptionDone, backOffExecution, e);
- }
- }));
+ addSynchronization(newSubscriptionSynchronization(patterns, Collections.emptySet(), () -> {
+ try {
+ subscribeChannel(channels.toArray(new byte[0][]));
+ } catch (Exception cause) {
+ handleSubscriptionException(subscriptionDone, backOffExecution, cause);
+ }
+ }));
} else {
initiallySubscribeToChannels = channels;
}
- addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, channels,
+ addSynchronization(newSubscriptionSynchronization(patterns, channels,
() -> subscriptionDone.complete(null)));
executor.execute(() -> {
@@ -1433,11 +1450,10 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
doSubscribe(connection, patterns, initiallySubscribeToChannels);
closeConnection();
unsubscribeFuture.complete(null);
- } catch (Throwable t) {
- handleSubscriptionException(subscriptionDone, backOffExecution, t);
+ } catch (Throwable cause) {
+ handleSubscriptionException(subscriptionDone, backOffExecution, cause);
}
});
}
}
-
}