Skip to content

Commit 08ed1c4

Browse files
committed
Capture subscriber initialization exceptions in init future.
We now capture exceptions during the connection initialization and subscription to avoid getting the container into an invalid state. Closes #2335
1 parent e2e68aa commit 08ed1c4

File tree

2 files changed

+75
-16
lines changed

2 files changed

+75
-16
lines changed

src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.springframework.context.SmartLifecycle;
4545
import org.springframework.core.task.SimpleAsyncTaskExecutor;
4646
import org.springframework.core.task.TaskExecutor;
47+
import org.springframework.dao.DataAccessException;
4748
import org.springframework.data.redis.RedisConnectionFailureException;
4849
import org.springframework.data.redis.connection.ConnectionUtils;
4950
import org.springframework.data.redis.connection.Message;
@@ -53,6 +54,7 @@
5354
import org.springframework.data.redis.connection.Subscription;
5455
import org.springframework.data.redis.connection.SubscriptionListener;
5556
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
57+
import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException;
5658
import org.springframework.data.redis.serializer.RedisSerializer;
5759
import org.springframework.data.redis.serializer.StringRedisSerializer;
5860
import org.springframework.lang.Nullable;
@@ -181,7 +183,6 @@ public void afterPropertiesSet() {
181183
subscriptionExecutor = taskExecutor;
182184
}
183185

184-
185186
this.subscriber = createSubscriber(connectionFactory, this.subscriptionExecutor);
186187

187188
afterPropertiesSet = true;
@@ -269,6 +270,11 @@ private void lazyListen() {
269270
} catch (InterruptedException e) {
270271
Thread.currentThread().interrupt();
271272
} catch (ExecutionException e) {
273+
274+
if (e.getCause() instanceof DataAccessException) {
275+
throw new RedisListenerExecutionFailedException(e.getMessage(), e.getCause());
276+
}
277+
272278
throw new CompletionException(e.getCause());
273279
} catch (TimeoutException e) {
274280
throw new IllegalStateException("Subscription registration timeout exceeded.", e);
@@ -670,7 +676,16 @@ else if (topic instanceof PatternTopic) {
670676
getRequiredSubscriber().subscribeChannel(channels.toArray(new byte[channels.size()][]));
671677
getRequiredSubscriber().subscribePattern(patterns.toArray(new byte[patterns.size()][]));
672678

673-
future.join();
679+
try {
680+
future.join();
681+
} catch (CompletionException e) {
682+
683+
if (e.getCause() instanceof DataAccessException) {
684+
throw new RedisListenerExecutionFailedException(e.getMessage(), e.getCause());
685+
}
686+
687+
throw e;
688+
}
674689
}
675690
}
676691
}
@@ -1166,23 +1181,25 @@ public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Col
11661181

11671182
synchronized (localMonitor) {
11681183

1169-
RedisConnection connection = connectionFactory.getConnection();
1170-
this.connection = connection;
1171-
1172-
if (connection.isSubscribed()) {
1184+
CompletableFuture<Void> initFuture = new CompletableFuture<>();
1185+
try {
1186+
RedisConnection connection = connectionFactory.getConnection();
1187+
this.connection = connection;
11731188

1174-
CompletableFuture<Void> failure = new CompletableFuture<>();
1175-
failure.completeExceptionally(
1176-
new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));
1177-
return failure;
1178-
}
1189+
if (connection.isSubscribed()) {
11791190

1180-
CompletableFuture<Void> initFuture = new CompletableFuture<>();
1191+
initFuture.completeExceptionally(
1192+
new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));
1193+
return initFuture;
1194+
}
11811195

1182-
try {
1183-
eventuallyPerformSubscription(connection, backOffExecution, initFuture, patterns, channels);
1184-
} catch (Throwable t) {
1185-
handleSubscriptionException(initFuture, backOffExecution, t);
1196+
try {
1197+
eventuallyPerformSubscription(connection, backOffExecution, initFuture, patterns, channels);
1198+
} catch (Throwable t) {
1199+
handleSubscriptionException(initFuture, backOffExecution, t);
1200+
}
1201+
} catch (RuntimeException e) {
1202+
initFuture.completeExceptionally(e);
11861203
}
11871204

11881205
return initFuture;

src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
import org.junit.jupiter.api.Test;
2626

2727
import org.springframework.core.task.SyncTaskExecutor;
28+
import org.springframework.data.redis.RedisConnectionFailureException;
2829
import org.springframework.data.redis.connection.RedisConnection;
2930
import org.springframework.data.redis.connection.RedisConnectionFactory;
3031
import org.springframework.data.redis.connection.Subscription;
3132
import org.springframework.data.redis.connection.SubscriptionListener;
3233
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
3334
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
35+
import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException;
3436

3537
/**
3638
* Unit tests for {@link RedisMessageListenerContainer}.
@@ -106,6 +108,46 @@ void containerShouldStopGracefullyOnUnsubscribeErrors() {
106108
verify(connectionMock).close();
107109
}
108110

111+
@Test // GH-2335
112+
void containerStartShouldReportFailureOnRedisUnavailability() {
113+
114+
when(connectionFactoryMock.getConnection()).thenThrow(new RedisConnectionFailureException("Booh!"));
115+
116+
doAnswer(it -> {
117+
118+
Runnable r = it.getArgument(0);
119+
r.run();
120+
return null;
121+
}).when(executorMock).execute(any());
122+
123+
container.addMessageListener(adapter, new ChannelTopic("a"));
124+
assertThatExceptionOfType(RedisListenerExecutionFailedException.class).isThrownBy(() -> container.start());
125+
126+
assertThat(container.isRunning()).isTrue();
127+
assertThat(container.isListening()).isFalse();
128+
}
129+
130+
@Test // GH-2335
131+
void containerListenShouldReportFailureOnRedisUnavailability() {
132+
133+
when(connectionFactoryMock.getConnection()).thenThrow(new RedisConnectionFailureException("Booh!"));
134+
135+
doAnswer(it -> {
136+
137+
Runnable r = it.getArgument(0);
138+
r.run();
139+
return null;
140+
}).when(executorMock).execute(any());
141+
142+
container.start();
143+
144+
assertThatExceptionOfType(RedisListenerExecutionFailedException.class)
145+
.isThrownBy(() -> container.addMessageListener(adapter, new ChannelTopic("a")));
146+
147+
assertThat(container.isRunning()).isTrue();
148+
assertThat(container.isListening()).isFalse();
149+
}
150+
109151
@Test // GH-964
110152
void failsOnDuplicateInit() {
111153
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> container.afterPropertiesSet());

0 commit comments

Comments
 (0)