Skip to content

DATAREDIS-531 - Scan commands should use a dedicated bound connection. #218

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.8.0.BUILD-SNAPSHOT</version>
<version>1.8.0.DATAREDIS-531-SNAPSHOT</version>

<name>Spring Data Redis</name>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3790,6 +3790,10 @@ protected ScanIteration<byte[]> doScan(long cursorId, ScanOptions options) {
JedisConverters.stringListToByteList().convert(result.getResult()));
}

protected void doClose() {
JedisConnection.this.close();
};

}.open();

}
Expand Down Expand Up @@ -3828,6 +3832,10 @@ protected ScanIteration<Tuple> doScan(byte[] key, long cursorId, ScanOptions opt
JedisConverters.tuplesToTuples().convert(result.getResult()));
}

protected void doClose() {
JedisConnection.this.close();
};

}.open();
}

Expand Down Expand Up @@ -3863,6 +3871,10 @@ protected ScanIteration<byte[]> doScan(byte[] key, long cursorId, ScanOptions op
redis.clients.jedis.ScanResult<byte[]> result = jedis.sscan(key, JedisConverters.toBytes(cursorId), params);
return new ScanIteration<byte[]>(Long.valueOf(result.getStringCursor()), result.getResult());
}

protected void doClose() {
JedisConnection.this.close();
};
}.open();
}

Expand Down Expand Up @@ -3898,6 +3910,11 @@ protected ScanIteration<Entry<byte[], byte[]>> doScan(byte[] key, long cursorId,
ScanResult<Entry<byte[], byte[]>> result = jedis.hscan(key, JedisConverters.toBytes(cursorId), params);
return new ScanIteration<Map.Entry<byte[], byte[]>>(Long.valueOf(result.getStringCursor()), result.getResult());
}

protected void doClose() {
JedisConnection.this.close();
};

}.open();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3719,6 +3719,11 @@ protected ScanIteration<byte[]> doScan(long cursorId, ScanOptions options) {

return new ScanIteration<byte[]>(Long.valueOf(nextCursorId), (keys));
}

protected void doClose() {
LettuceConnection.this.close();
}

}.open();

}
Expand Down Expand Up @@ -3759,6 +3764,10 @@ protected ScanIteration<Entry<byte[], byte[]>> doScan(byte[] key, long cursorId,
return new ScanIteration<Entry<byte[], byte[]>>(Long.valueOf(nextCursorId), values.entrySet());
}

protected void doClose() {
LettuceConnection.this.close();
}

}.open();
}

Expand Down Expand Up @@ -3798,6 +3807,11 @@ protected ScanIteration<byte[]> doScan(byte[] key, long cursorId, ScanOptions op
List<byte[]> values = failsafeReadScanValues(valueScanCursor.getValues(), null);
return new ScanIteration<byte[]>(Long.valueOf(nextCursorId), values);
}

protected void doClose() {
LettuceConnection.this.close();
}

}.open();
}

Expand Down Expand Up @@ -3839,6 +3853,11 @@ protected ScanIteration<Tuple> doScan(byte[] key, long cursorId, ScanOptions opt
List<Tuple> values = failsafeReadScanValues(result, LettuceConverters.scoredValuesToTupleList());
return new ScanIteration<Tuple>(Long.valueOf(nextCursorId), values);
}

protected void doClose() {
LettuceConnection.this.close();
}

}.open();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public Map<byte[], byte[]> doInRedis(RedisConnection connection) {
public Cursor<Entry<HK, HV>> scan(K key, final ScanOptions options) {

final byte[] rawKey = rawKey(key);
return execute(new RedisCallback<Cursor<Map.Entry<HK, HV>>>() {
return template.executeWithStickyConnection(new RedisCallback<Cursor<Map.Entry<HK, HV>>>() {

@Override
public Cursor<Entry<HK, HV>> doInRedis(RedisConnection connection) throws DataAccessException {
Expand Down Expand Up @@ -268,7 +268,7 @@ public HV setValue(HV value) {
});
}

}, true);
});

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public Long doInRedis(RedisConnection connection) {
public Cursor<V> scan(K key, final ScanOptions options) {

final byte[] rawKey = rawKey(key);
return execute(new RedisCallback<Cursor<V>>() {
return template.executeWithStickyConnection(new RedisCallback<Cursor<V>>() {

@Override
public Cursor<V> doInRedis(RedisConnection connection) throws DataAccessException {
Expand All @@ -267,7 +267,7 @@ public V convert(byte[] source) {
}
});
}
}, true);
});

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -409,13 +409,13 @@ public Long doInRedis(RedisConnection connection) {
public Cursor<TypedTuple<V>> scan(K key, final ScanOptions options) {

final byte[] rawKey = rawKey(key);
Cursor<Tuple> cursor = execute(new RedisCallback<Cursor<Tuple>>() {
Cursor<Tuple> cursor = template.executeWithStickyConnection(new RedisCallback<Cursor<Tuple>>() {

@Override
public Cursor<Tuple> doInRedis(RedisConnection connection) throws DataAccessException {
return connection.zScan(rawKey, options);
}
}, true);
});

return new ConvertingCursor<Tuple, TypedTuple<V>>(cursor, new Converter<Tuple, TypedTuple<V>>() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public interface HashOperations<H, HK, HV> {
RedisOperations<H, ?> getOperations();

/**
* Use a {@link Cursor} to iterate over entries in hash at {@code key}.
* Use a {@link Cursor} to iterate over entries in hash at {@code key}. <br />
* <strong>Important:</strong> Call {@link Cursor#close()} when done to avoid resource leak.
*
* @since 1.4
* @param key must not be {@literal null}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/
package org.springframework.data.redis.core;

import java.io.Closeable;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.query.SortQuery;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.core.types.RedisClientInfo;
Expand Down Expand Up @@ -129,6 +131,16 @@ public interface RedisOperations<K, V> {
<T> T execute(RedisScript<T> script, RedisSerializer<?> argsSerializer, RedisSerializer<T> resultSerializer,
List<K> keys, Object... args);

/**
* Allocates and binds a new {@link RedisConnection} to the actual return type of the method. It is up to the caller
* to free resources after use.
*
* @param callback must not be {@literal null}.
* @return
* @since 1.8
*/
<T extends Closeable> T executeWithStickyConnection(RedisCallback<T> callback);

Boolean hasKey(K key);

void delete(K key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.data.redis.core;

import java.io.Closeable;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -304,6 +305,23 @@ public <T> T execute(RedisScript<T> script, RedisSerializer<?> argsSerializer, R
return scriptExecutor.execute(script, argsSerializer, resultSerializer, keys, args);
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.core.RedisOperations#executeWithStickyConnection(org.springframework.data.redis.core.RedisCallback)
*/
public <T extends Closeable> T executeWithStickyConnection(RedisCallback<T> callback) {

Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(callback, "Callback object must not be null");

RedisConnectionFactory factory = getConnectionFactory();

RedisConnection connection = preProcessConnection(RedisConnectionUtils.doGetConnection(factory, true, false, false),
false);

return callback.doInRedis(connection);
}

private Object executeSession(SessionCallback<?> session) {
return session.execute(this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2014 the original author or authors.
* Copyright 2011-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -75,8 +75,9 @@ public interface SetOperations<K, V> {
RedisOperations<K, V> getOperations();

/**
* Iterate over elements in set at {@code key}.
*
* Iterate over elements in set at {@code key}. <br />
* <strong>Important:</strong> Call {@link Cursor#close()} when done to avoid resource leak.
*
* @since 1.4
* @param key
* @param options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ public interface TypedTuple<V> extends Comparable<TypedTuple<V>> {
RedisOperations<K, V> getOperations();

/**
* Iterate over elements in zset at {@code key}. <br />
* <strong>Important:</strong> Call {@link Cursor#close()} when done to avoid resource leak.
*
* @since 1.4
* @param key
* @param options
Expand Down
Loading