diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java index befdfffe50..1c9dfb92c1 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java @@ -116,6 +116,7 @@ * @author Chris Bono * @author John Blum * @author Zhian Chen + * @author UHyeon Jeong */ public class LettuceConnectionFactory implements RedisConnectionFactory, ReactiveRedisConnectionFactory, InitializingBean, DisposableBean, SmartLifecycle { @@ -979,6 +980,9 @@ public void stop() { dispose(reactiveConnectionProvider); reactiveConnectionProvider = null; + dispose(clusterCommandExecutor); + clusterCommandExecutor = null; + if (client != null) { try { Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod(); @@ -1012,20 +1016,7 @@ public void afterPropertiesSet() { @Override public void destroy() { - stop(); - this.client = null; - - ClusterCommandExecutor clusterCommandExecutor = this.clusterCommandExecutor; - - if (clusterCommandExecutor != null) { - try { - clusterCommandExecutor.destroy(); - this.clusterCommandExecutor = null; - } catch (Exception ex) { - log.warn("Cannot properly close cluster command executor", ex); - } - } this.state.set(State.DESTROYED); } @@ -1043,6 +1034,16 @@ private void dispose(@Nullable LettuceConnectionProvider connectionProvider) { } } + private void dispose(@Nullable ClusterCommandExecutor commandExecutor) { + if (commandExecutor != null) { + try { + commandExecutor.destroy(); + } catch (Exception ex) { + log.warn("Cannot properly close cluster command executor", ex); + } + } + } + @Override public RedisConnection getConnection() { diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java index 086c9ce763..641a0e2e82 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java @@ -31,11 +31,13 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.springframework.beans.factory.DisposableBean; +import org.springframework.context.SmartLifecycle; import org.springframework.data.redis.connection.PoolException; import org.springframework.util.Assert; @@ -56,13 +58,16 @@ * @author Mark Paluch * @author Christoph Strobl * @author Asmir Mustafic + * @author UHyeon Jeong * @since 2.0 * @see #getConnection(Class) */ -class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean { +class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean, + SmartLifecycle { private static final Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class); + private final AtomicReference state = new AtomicReference<>(State.CREATED); private final LettuceConnectionProvider connectionProvider; private final GenericObjectPoolConfig> poolConfig; private final Map, GenericObjectPool>> poolRef = new ConcurrentHashMap<>( @@ -76,6 +81,10 @@ class LettucePoolingConnectionProvider implements LettuceConnectionProvider, Red private final Map, AsyncPool>> asyncPools = new ConcurrentHashMap<>(32); private final BoundedPoolConfig asyncPoolConfig; + enum State { + CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED; + } + LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider, LettucePoolingClientConfiguration clientConfiguration) { @@ -206,39 +215,51 @@ public CompletableFuture releaseAsync(StatefulConnection connection) @Override public void destroy() throws Exception { + stop(); + state.set(State.DESTROYED); + } - List> futures = new ArrayList<>(); - if (!poolRef.isEmpty() || !asyncPoolRef.isEmpty()) { - log.warn("LettucePoolingConnectionProvider contains unreleased connections"); - } - if (!inProgressAsyncPoolRef.isEmpty()) { + @Override + public void start() { + state.set(State.STARTED); + } - log.warn("LettucePoolingConnectionProvider has active connection retrievals"); - inProgressAsyncPoolRef.forEach((k, v) -> futures.add(k.thenApply(StatefulConnection::closeAsync))); - } + @Override + public void stop() { + if (state.compareAndSet(State.STARTED, State.STOPPING)) { + List> futures = new ArrayList<>(); + if (!poolRef.isEmpty() || !asyncPoolRef.isEmpty()) { + log.warn("LettucePoolingConnectionProvider contains unreleased connections"); + } - if (!poolRef.isEmpty()) { + if (!inProgressAsyncPoolRef.isEmpty()) { - poolRef.forEach((connection, pool) -> pool.returnObject(connection)); - poolRef.clear(); - } + log.warn("LettucePoolingConnectionProvider has active connection retrievals"); + inProgressAsyncPoolRef.forEach((k, v) -> futures.add(k.thenApply(StatefulConnection::closeAsync))); + } - if (!asyncPoolRef.isEmpty()) { + if (!poolRef.isEmpty()) { - asyncPoolRef.forEach((connection, pool) -> futures.add(pool.release(connection))); - asyncPoolRef.clear(); - } + poolRef.forEach((connection, pool) -> pool.returnObject(connection)); + poolRef.clear(); + } + + if (!asyncPoolRef.isEmpty()) { + + asyncPoolRef.forEach((connection, pool) -> futures.add(pool.release(connection))); + asyncPoolRef.clear(); + } - pools.forEach((type, pool) -> pool.close()); + pools.forEach((type, pool) -> pool.close()); - CompletableFuture + CompletableFuture .allOf(futures.stream().map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors())) - .toArray(CompletableFuture[]::new)) // + .toArray(CompletableFuture[]::new)) // .thenCompose(ignored -> { CompletableFuture[] poolClose = asyncPools.values().stream().map(AsyncPool::closeAsync) - .map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors())).toArray(CompletableFuture[]::new); + .map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors())).toArray(CompletableFuture[]::new); return CompletableFuture.allOf(poolClose); }) // @@ -248,6 +269,18 @@ public void destroy() throws Exception { }) // .join(); - pools.clear(); + pools.clear(); + } + state.set(State.STOPPED); + } + + @Override + public boolean isRunning() { + return State.STARTED.equals(this.state.get()); + } + + @Override + public boolean isAutoStartup() { + return true; } }