diff --git a/pom.xml b/pom.xml index 2486f880c8..ad2140fcf9 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 1.8.0.BUILD-SNAPSHOT + 1.8.0.DATAREDIS-531-SNAPSHOT Spring Data Redis diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java index fac83394d5..ee69e20fa4 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java @@ -46,6 +46,7 @@ import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.FutureResult; import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisNode; import org.springframework.data.redis.connection.RedisPipelineException; import org.springframework.data.redis.connection.RedisSubscribedConnectionException; @@ -79,7 +80,6 @@ import redis.clients.jedis.Jedis; import redis.clients.jedis.Pipeline; import redis.clients.jedis.Protocol; -import redis.clients.jedis.Protocol.Command; import redis.clients.jedis.Queable; import redis.clients.jedis.Response; import redis.clients.jedis.ScanParams; @@ -87,6 +87,7 @@ import redis.clients.jedis.SortingParams; import redis.clients.jedis.Transaction; import redis.clients.jedis.ZParams; +import redis.clients.jedis.Protocol.Command; import redis.clients.jedis.exceptions.JedisDataException; import redis.clients.util.Pool; @@ -142,6 +143,9 @@ public class JedisConnection extends AbstractRedisConnection { private final Client client; private Transaction transaction; private final Pool pool; + private final RedisConnectionFactory connectionFactory; + private boolean closed = false; + /** * flag indicating whether the connection needs to be dropped or not */ @@ -181,7 +185,7 @@ public JedisStatusResult(Response resultHolder) { /** * Constructs a new JedisConnection instance. * - * @param jedis Jedis entity + * @param jedis Jedis entity, must not be {@literal null}. */ public JedisConnection(Jedis jedis) { this(jedis, null, 0); @@ -190,17 +194,34 @@ public JedisConnection(Jedis jedis) { /** * Constructs a new JedisConnection instance backed by a jedis pool. * - * @param jedis - * @param pool can be null, if no pool is used + * @param jedis must not be {@literal null}. + * @param pool may be {@literal null}, if no pool is used. + * @param dbIndex */ public JedisConnection(Jedis jedis, Pool pool, int dbIndex) { + this(jedis, pool, null, dbIndex); + } + + /** + * Constructs a new {@link JedisConnection} instance optionally backed by a jedis pool and a reference to its + * {@link RedisConnectionFactory}. + * + * @param jedis must not be {@literal null}. + * @param pool may be {@literal null}, if no pool is used. + * @param connectionFactory may be {@literal null}. + * @param dbIndex + * @since 1.8 + */ + public JedisConnection(Jedis jedis, Pool pool, RedisConnectionFactory connectionFactory, int dbIndex) { + + Assert.notNull(jedis, "Jedis must not be null!"); + this.jedis = jedis; // extract underlying connection for batch operations - client = (Client) ReflectionUtils.getField(CLIENT_FIELD, jedis); - + this.client = (Client) ReflectionUtils.getField(CLIENT_FIELD, jedis); this.pool = pool; - this.dbIndex = dbIndex; + this.connectionFactory = connectionFactory; // select the db // if this fail, do manual clean-up before propagating the exception @@ -267,7 +288,11 @@ public String toString() { } public void close() throws DataAccessException { + + this.closed = true; + super.close(); + // return the connection to the pool if (pool != null) { if (!broken) { @@ -3785,9 +3810,16 @@ protected ScanIteration doScan(long cursorId, ScanOptions options) { } ScanParams params = JedisConverters.toScanParams(options); - redis.clients.jedis.ScanResult result = jedis.scan(Long.toString(cursorId), params); - return new ScanIteration(Long.valueOf(result.getStringCursor()), - JedisConverters.stringListToByteList().convert(result.getResult())); + + JedisConnectionReference ref = getOrCreateConnectionReference(); + try { + + redis.clients.jedis.ScanResult result = ref.jedis.scan(Long.toString(cursorId), params); + return new ScanIteration(Long.valueOf(result.getStringCursor()), + JedisConverters.stringListToByteList().convert(result.getResult())); + } finally { + ref.release(); + } } }.open(); @@ -3823,9 +3855,15 @@ protected ScanIteration doScan(byte[] key, long cursorId, ScanOptions opt ScanParams params = JedisConverters.toScanParams(options); - ScanResult result = jedis.zscan(key, JedisConverters.toBytes(cursorId), params); - return new ScanIteration(Long.valueOf(result.getStringCursor()), - JedisConverters.tuplesToTuples().convert(result.getResult())); + JedisConnectionReference ref = getOrCreateConnectionReference(); + try { + + ScanResult result = ref.jedis.zscan(key, JedisConverters.toBytes(cursorId), params); + return new ScanIteration(Long.valueOf(result.getStringCursor()), + JedisConverters.tuplesToTuples().convert(result.getResult())); + } finally { + ref.release(); + } } }.open(); @@ -3860,8 +3898,14 @@ protected ScanIteration doScan(byte[] key, long cursorId, ScanOptions op ScanParams params = JedisConverters.toScanParams(options); - redis.clients.jedis.ScanResult result = jedis.sscan(key, JedisConverters.toBytes(cursorId), params); - return new ScanIteration(Long.valueOf(result.getStringCursor()), result.getResult()); + JedisConnectionReference ref = getOrCreateConnectionReference(); + try { + + redis.clients.jedis.ScanResult result = ref.jedis.sscan(key, JedisConverters.toBytes(cursorId), params); + return new ScanIteration(Long.valueOf(result.getStringCursor()), result.getResult()); + } finally { + ref.release(); + } } }.open(); } @@ -3895,8 +3939,14 @@ protected ScanIteration> doScan(byte[] key, long cursorId, ScanParams params = JedisConverters.toScanParams(options); - ScanResult> result = jedis.hscan(key, JedisConverters.toBytes(cursorId), params); - return new ScanIteration>(Long.valueOf(result.getStringCursor()), result.getResult()); + JedisConnectionReference ref = getOrCreateConnectionReference(); + try { + + ScanResult> result = ref.jedis.hscan(key, JedisConverters.toBytes(cursorId), params); + return new ScanIteration>(Long.valueOf(result.getStringCursor()), result.getResult()); + } finally { + ref.release(); + } } }.open(); } @@ -4125,7 +4175,78 @@ public void migrate(byte[] key, RedisNode target, int dbIndex, MigrateOption opt } catch (Exception ex) { throw convertJedisAccessException(ex); } + } + /** + * Returns {@link JedisConnectionReference} pointing either to {@code this} connection or a created connection by + * {@link RedisConnectionFactory} if this connection is closed. + *

+ * Connection references allow executing Redis commands even if the original connection is closed. This is required if + * operations hold a reference to this {@link JedisConnection} and are executed after this connection was + * closed/released back to the pool. + *

+ * Creating connection references after closing this connection requires the {@code connectionFactory} to be set. + * + * @return a {@link JedisConnectionReference} pointing to an active {@link JedisConnection}. + */ + private JedisConnectionReference getOrCreateConnectionReference() { + + if (!closed) { + return new JedisConnectionReference(this); + } + + if (connectionFactory == null) { + throw new IllegalStateException( + "Cannot obtain a Jedis connection: Connection closed and no connection factory reference provided"); + } + + return new JedisConnectionFactoryReference(connectionFactory); } + /** + * Connection reference for {@link Jedis}. Connection references can be released to clean up resources, if necessary. + * + * @author Mark Paluch + * @since 1.8 + */ + private static class JedisConnectionReference { + + final Jedis jedis; + final JedisConnection jedisConnection; + + JedisConnectionReference(JedisConnection jedisConnection) { + + this.jedisConnection = jedisConnection; + this.jedis = jedisConnection.jedis; + } + + public void release() { + doRelease(); + } + + /** + * Customization hook for cleaning up resources on when calling {@link #release()}. + */ + void doRelease() { + } + } + + /** + * {@link JedisConnectionReference} implementation to obtain a connection from a {@link RedisConnectionFactory}. + * Connections are release when {@link #release() releasing} this reference. + * + * @author Mark Paluch + * @since 1.8 + */ + private static class JedisConnectionFactoryReference extends JedisConnectionReference { + + JedisConnectionFactoryReference(RedisConnectionFactory redisConnectionFactory) { + super((JedisConnection) redisConnectionFactory.getConnection()); + } + + @Override + void doRelease() { + jedisConnection.close(); + } + } } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java index 4db269dafe..c04b0da61b 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java @@ -341,8 +341,8 @@ public RedisConnection getConnection() { } Jedis jedis = fetchJedisConnector(); - JedisConnection connection = (usePool ? new JedisConnection(jedis, pool, dbIndex) - : new JedisConnection(jedis, null, dbIndex)); + JedisConnection connection = (usePool ? new JedisConnection(jedis, pool, this, dbIndex) + : new JedisConnection(jedis, null, this, dbIndex)); connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults); return postProcessConnection(connection); } diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionIntegrationTests.java index 64615097c7..c9c0f8b3a2 100644 --- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2015 the original author or authors. + * Copyright 2011-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,17 @@ package org.springframework.data.redis.connection.jedis; -import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.Matchers.*; import static org.junit.Assert.*; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; @@ -45,6 +50,8 @@ import org.springframework.data.redis.connection.RedisSentinelConfiguration; import org.springframework.data.redis.connection.ReturnType; import org.springframework.data.redis.connection.StringRedisConnection.StringTuple; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.test.util.RedisSentinelRule; import org.springframework.data.redis.test.util.RedisSentinelRule.SentinelsAvailable; import org.springframework.data.redis.test.util.RelaxedJUnit4ClassRunner; @@ -62,6 +69,7 @@ * @author Thomas Darimont * @author Christoph Strobl * @author David Liu + * @author Mark Paluch */ @RunWith(RelaxedJUnit4ClassRunner.class) @ContextConfiguration @@ -416,4 +424,103 @@ public void zRangeByScoreTest() { assertEquals("two", new String(zRangeByScore.iterator().next())); } + + /** + * @see DATAREDIS-531 + */ + @Test + public void scanShouldOperateOnActiveConnection() throws IOException { + + connection.set("key", "value"); + + Cursor scan = connection.scan(ScanOptions.NONE); + List list = collectToList(scan); + + assertThat(list, contains("key")); + } + + /** + * @see DATAREDIS-531 + */ + @Test + public void scanShouldOperateOnClosedConnection() throws IOException { + + connection.set("key", "value"); + connection.close(); + + Cursor scan = connection.scan(ScanOptions.NONE); + List list = collectToList(scan); + + assertThat(list, contains("key")); + } + + /** + * @see DATAREDIS-531 + */ + @Test + public void hscanShouldOperateOnClosedConnection() throws IOException { + + connection.hSet("key", "field", "value"); + connection.close(); + + Cursor> scan = connection.hScan("key", ScanOptions.NONE); + + List list = new ArrayList(); + + while (scan.hasNext()) { + Map.Entry entry = scan.next(); + list.add(entry.getKey()); + } + scan.close(); + + assertThat(list, contains("field")); + } + + /** + * @see DATAREDIS-531 + */ + @Test + public void sscanShouldOperateOnClosedConnection() throws IOException { + + connection.sAdd("key", "value"); + connection.close(); + + Cursor scan = connection.sScan("key".getBytes(), ScanOptions.NONE); + List list = collectToList(scan); + + assertThat(list, contains("value")); + } + + /** + * @see DATAREDIS-531 + */ + @Test + public void zscanShouldOperateOnClosedConnection() throws IOException { + + connection.zAdd("key", 10, "value"); + connection.close(); + + Cursor scan = connection.zScan("key", ScanOptions.NONE); + + List list = new ArrayList(); + + while (scan.hasNext()) { + StringTuple entry = scan.next(); + list.add(entry.getValueAsString()); + } + scan.close(); + + assertThat(list, contains("value")); + } + + private List collectToList(Cursor scan) throws IOException { + + List list = new ArrayList(); + + while (scan.hasNext()) { + list.add(new String(scan.next())); + } + scan.close(); + return list; + } }