Skip to content

DATAREDIS-531 - Fix deferred ScanCursor command execution using Jedis. #217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.8.0.BUILD-SNAPSHOT</version>
<version>1.8.0.DATAREDIS-531-SNAPSHOT</version>

<name>Spring Data Redis</name>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.FutureResult;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.RedisPipelineException;
import org.springframework.data.redis.connection.RedisSubscribedConnectionException;
Expand Down Expand Up @@ -79,14 +80,14 @@
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.Queable;
import redis.clients.jedis.Response;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
import redis.clients.jedis.SortingParams;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.ZParams;
import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.util.Pool;

Expand Down Expand Up @@ -142,6 +143,9 @@ public class JedisConnection extends AbstractRedisConnection {
private final Client client;
private Transaction transaction;
private final Pool<Jedis> pool;
private final RedisConnectionFactory connectionFactory;
private boolean closed = false;

/**
* flag indicating whether the connection needs to be dropped or not
*/
Expand Down Expand Up @@ -181,7 +185,7 @@ public JedisStatusResult(Response<?> resultHolder) {
/**
* Constructs a new <code>JedisConnection</code> instance.
*
* @param jedis Jedis entity
* @param jedis Jedis entity, must not be {@literal null}.
*/
public JedisConnection(Jedis jedis) {
this(jedis, null, 0);
Expand All @@ -190,17 +194,34 @@ public JedisConnection(Jedis jedis) {
/**
* Constructs a new <code>JedisConnection</code> instance backed by a jedis pool.
*
* @param jedis
* @param pool can be null, if no pool is used
* @param jedis must not be {@literal null}.
* @param pool may be {@literal null}, if no pool is used.
* @param dbIndex
*/
public JedisConnection(Jedis jedis, Pool<Jedis> pool, int dbIndex) {
this(jedis, pool, null, dbIndex);
}

/**
* Constructs a new {@link JedisConnection} instance optionally backed by a jedis pool and a reference to its
* {@link RedisConnectionFactory}.
*
* @param jedis must not be {@literal null}.
* @param pool may be {@literal null}, if no pool is used.
* @param connectionFactory may be {@literal null}.
* @param dbIndex
* @since 1.8
*/
public JedisConnection(Jedis jedis, Pool<Jedis> pool, RedisConnectionFactory connectionFactory, int dbIndex) {

Assert.notNull(jedis, "Jedis must not be null!");

this.jedis = jedis;
// extract underlying connection for batch operations
client = (Client) ReflectionUtils.getField(CLIENT_FIELD, jedis);

this.client = (Client) ReflectionUtils.getField(CLIENT_FIELD, jedis);
this.pool = pool;

this.dbIndex = dbIndex;
this.connectionFactory = connectionFactory;

// select the db
// if this fail, do manual clean-up before propagating the exception
Expand Down Expand Up @@ -267,7 +288,11 @@ public String toString() {
}

public void close() throws DataAccessException {

this.closed = true;

super.close();

// return the connection to the pool
if (pool != null) {
if (!broken) {
Expand Down Expand Up @@ -3785,9 +3810,16 @@ protected ScanIteration<byte[]> doScan(long cursorId, ScanOptions options) {
}

ScanParams params = JedisConverters.toScanParams(options);
redis.clients.jedis.ScanResult<String> result = jedis.scan(Long.toString(cursorId), params);
return new ScanIteration<byte[]>(Long.valueOf(result.getStringCursor()),
JedisConverters.stringListToByteList().convert(result.getResult()));

JedisConnectionReference ref = getOrCreateConnectionReference();
try {

redis.clients.jedis.ScanResult<String> result = ref.jedis.scan(Long.toString(cursorId), params);
return new ScanIteration<byte[]>(Long.valueOf(result.getStringCursor()),
JedisConverters.stringListToByteList().convert(result.getResult()));
} finally {
ref.release();
}
}

}.open();
Expand Down Expand Up @@ -3823,9 +3855,15 @@ protected ScanIteration<Tuple> doScan(byte[] key, long cursorId, ScanOptions opt

ScanParams params = JedisConverters.toScanParams(options);

ScanResult<redis.clients.jedis.Tuple> result = jedis.zscan(key, JedisConverters.toBytes(cursorId), params);
return new ScanIteration<RedisZSetCommands.Tuple>(Long.valueOf(result.getStringCursor()),
JedisConverters.tuplesToTuples().convert(result.getResult()));
JedisConnectionReference ref = getOrCreateConnectionReference();
try {

ScanResult<redis.clients.jedis.Tuple> result = ref.jedis.zscan(key, JedisConverters.toBytes(cursorId), params);
return new ScanIteration<RedisZSetCommands.Tuple>(Long.valueOf(result.getStringCursor()),
JedisConverters.tuplesToTuples().convert(result.getResult()));
} finally {
ref.release();
}
}

}.open();
Expand Down Expand Up @@ -3860,8 +3898,14 @@ protected ScanIteration<byte[]> doScan(byte[] key, long cursorId, ScanOptions op

ScanParams params = JedisConverters.toScanParams(options);

redis.clients.jedis.ScanResult<byte[]> result = jedis.sscan(key, JedisConverters.toBytes(cursorId), params);
return new ScanIteration<byte[]>(Long.valueOf(result.getStringCursor()), result.getResult());
JedisConnectionReference ref = getOrCreateConnectionReference();
try {

redis.clients.jedis.ScanResult<byte[]> result = ref.jedis.sscan(key, JedisConverters.toBytes(cursorId), params);
return new ScanIteration<byte[]>(Long.valueOf(result.getStringCursor()), result.getResult());
} finally {
ref.release();
}
}
}.open();
}
Expand Down Expand Up @@ -3895,8 +3939,14 @@ protected ScanIteration<Entry<byte[], byte[]>> doScan(byte[] key, long cursorId,

ScanParams params = JedisConverters.toScanParams(options);

ScanResult<Entry<byte[], byte[]>> result = jedis.hscan(key, JedisConverters.toBytes(cursorId), params);
return new ScanIteration<Map.Entry<byte[], byte[]>>(Long.valueOf(result.getStringCursor()), result.getResult());
JedisConnectionReference ref = getOrCreateConnectionReference();
try {

ScanResult<Entry<byte[], byte[]>> result = ref.jedis.hscan(key, JedisConverters.toBytes(cursorId), params);
return new ScanIteration<Map.Entry<byte[], byte[]>>(Long.valueOf(result.getStringCursor()), result.getResult());
} finally {
ref.release();
}
}
}.open();
}
Expand Down Expand Up @@ -4125,7 +4175,78 @@ public void migrate(byte[] key, RedisNode target, int dbIndex, MigrateOption opt
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
}

/**
* Returns {@link JedisConnectionReference} pointing either to {@code this} connection or a created connection by
* {@link RedisConnectionFactory} if this connection is closed.
* <p>
* Connection references allow executing Redis commands even if the original connection is closed. This is required if
* operations hold a reference to this {@link JedisConnection} and are executed after this connection was
* closed/released back to the pool.
* <p>
* Creating connection references after closing this connection requires the {@code connectionFactory} to be set.
*
* @return a {@link JedisConnectionReference} pointing to an active {@link JedisConnection}.
*/
private JedisConnectionReference getOrCreateConnectionReference() {

if (!closed) {
return new JedisConnectionReference(this);
}

if (connectionFactory == null) {
throw new IllegalStateException(
"Cannot obtain a Jedis connection: Connection closed and no connection factory reference provided");
}

return new JedisConnectionFactoryReference(connectionFactory);
}

/**
* Connection reference for {@link Jedis}. Connection references can be released to clean up resources, if necessary.
*
* @author Mark Paluch
* @since 1.8
*/
private static class JedisConnectionReference {

final Jedis jedis;
final JedisConnection jedisConnection;

JedisConnectionReference(JedisConnection jedisConnection) {

this.jedisConnection = jedisConnection;
this.jedis = jedisConnection.jedis;
}

public void release() {
doRelease();
}

/**
* Customization hook for cleaning up resources on when calling {@link #release()}.
*/
void doRelease() {
}
}

/**
* {@link JedisConnectionReference} implementation to obtain a connection from a {@link RedisConnectionFactory}.
* Connections are release when {@link #release() releasing} this reference.
*
* @author Mark Paluch
* @since 1.8
*/
private static class JedisConnectionFactoryReference extends JedisConnectionReference {

JedisConnectionFactoryReference(RedisConnectionFactory redisConnectionFactory) {
super((JedisConnection) redisConnectionFactory.getConnection());
}

@Override
void doRelease() {
jedisConnection.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ public RedisConnection getConnection() {
}

Jedis jedis = fetchJedisConnector();
JedisConnection connection = (usePool ? new JedisConnection(jedis, pool, dbIndex)
: new JedisConnection(jedis, null, dbIndex));
JedisConnection connection = (usePool ? new JedisConnection(jedis, pool, this, dbIndex)
: new JedisConnection(jedis, null, this, dbIndex));
connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
return postProcessConnection(connection);
}
Expand Down
Loading