Skip to content

Commit 58d23de

Browse files
christophstroblmp911de
authored andcommitted
DATAREDIS-531 - Scan commands should use a dedicated bound connection.
SCAN, SSCAN, HSCAN and ZSCAN now reference a dedicated connection that is bound to the Cursor. This allows creation of multiple cursors for different threads without the risk of potentially sharing a single connection. As before the caller is responsible for closing the Cursor correctly after usage. Original pull request: #218.
1 parent 5fd719b commit 58d23de

20 files changed

+473
-34
lines changed

src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3790,6 +3790,10 @@ protected ScanIteration<byte[]> doScan(long cursorId, ScanOptions options) {
37903790
JedisConverters.stringListToByteList().convert(result.getResult()));
37913791
}
37923792

3793+
protected void doClose() {
3794+
JedisConnection.this.close();
3795+
};
3796+
37933797
}.open();
37943798

37953799
}
@@ -3828,6 +3832,10 @@ protected ScanIteration<Tuple> doScan(byte[] key, long cursorId, ScanOptions opt
38283832
JedisConverters.tuplesToTuples().convert(result.getResult()));
38293833
}
38303834

3835+
protected void doClose() {
3836+
JedisConnection.this.close();
3837+
};
3838+
38313839
}.open();
38323840
}
38333841

@@ -3863,6 +3871,10 @@ protected ScanIteration<byte[]> doScan(byte[] key, long cursorId, ScanOptions op
38633871
redis.clients.jedis.ScanResult<byte[]> result = jedis.sscan(key, JedisConverters.toBytes(cursorId), params);
38643872
return new ScanIteration<byte[]>(Long.valueOf(result.getStringCursor()), result.getResult());
38653873
}
3874+
3875+
protected void doClose() {
3876+
JedisConnection.this.close();
3877+
};
38663878
}.open();
38673879
}
38683880

@@ -3898,6 +3910,11 @@ protected ScanIteration<Entry<byte[], byte[]>> doScan(byte[] key, long cursorId,
38983910
ScanResult<Entry<byte[], byte[]>> result = jedis.hscan(key, JedisConverters.toBytes(cursorId), params);
38993911
return new ScanIteration<Map.Entry<byte[], byte[]>>(Long.valueOf(result.getStringCursor()), result.getResult());
39003912
}
3913+
3914+
protected void doClose() {
3915+
JedisConnection.this.close();
3916+
};
3917+
39013918
}.open();
39023919
}
39033920

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3719,6 +3719,11 @@ protected ScanIteration<byte[]> doScan(long cursorId, ScanOptions options) {
37193719

37203720
return new ScanIteration<byte[]>(Long.valueOf(nextCursorId), (keys));
37213721
}
3722+
3723+
protected void doClose() {
3724+
LettuceConnection.this.close();
3725+
}
3726+
37223727
}.open();
37233728

37243729
}
@@ -3759,6 +3764,10 @@ protected ScanIteration<Entry<byte[], byte[]>> doScan(byte[] key, long cursorId,
37593764
return new ScanIteration<Entry<byte[], byte[]>>(Long.valueOf(nextCursorId), values.entrySet());
37603765
}
37613766

3767+
protected void doClose() {
3768+
LettuceConnection.this.close();
3769+
}
3770+
37623771
}.open();
37633772
}
37643773

@@ -3798,6 +3807,11 @@ protected ScanIteration<byte[]> doScan(byte[] key, long cursorId, ScanOptions op
37983807
List<byte[]> values = failsafeReadScanValues(valueScanCursor.getValues(), null);
37993808
return new ScanIteration<byte[]>(Long.valueOf(nextCursorId), values);
38003809
}
3810+
3811+
protected void doClose() {
3812+
LettuceConnection.this.close();
3813+
}
3814+
38013815
}.open();
38023816
}
38033817

@@ -3839,6 +3853,11 @@ protected ScanIteration<Tuple> doScan(byte[] key, long cursorId, ScanOptions opt
38393853
List<Tuple> values = failsafeReadScanValues(result, LettuceConverters.scoredValuesToTupleList());
38403854
return new ScanIteration<Tuple>(Long.valueOf(nextCursorId), values);
38413855
}
3856+
3857+
protected void doClose() {
3858+
LettuceConnection.this.close();
3859+
}
3860+
38423861
}.open();
38433862
}
38443863

src/main/java/org/springframework/data/redis/core/DefaultHashOperations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ public Map<byte[], byte[]> doInRedis(RedisConnection connection) {
235235
public Cursor<Entry<HK, HV>> scan(K key, final ScanOptions options) {
236236

237237
final byte[] rawKey = rawKey(key);
238-
return execute(new RedisCallback<Cursor<Map.Entry<HK, HV>>>() {
238+
return template.executeWithStickyConnection(new RedisCallback<Cursor<Map.Entry<HK, HV>>>() {
239239

240240
@Override
241241
public Cursor<Entry<HK, HV>> doInRedis(RedisConnection connection) throws DataAccessException {
@@ -268,7 +268,7 @@ public HV setValue(HV value) {
268268
});
269269
}
270270

271-
}, true);
271+
});
272272

273273
}
274274
}

src/main/java/org/springframework/data/redis/core/DefaultSetOperations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ public Long doInRedis(RedisConnection connection) {
255255
public Cursor<V> scan(K key, final ScanOptions options) {
256256

257257
final byte[] rawKey = rawKey(key);
258-
return execute(new RedisCallback<Cursor<V>>() {
258+
return template.executeWithStickyConnection(new RedisCallback<Cursor<V>>() {
259259

260260
@Override
261261
public Cursor<V> doInRedis(RedisConnection connection) throws DataAccessException {
@@ -267,7 +267,7 @@ public V convert(byte[] source) {
267267
}
268268
});
269269
}
270-
}, true);
270+
});
271271

272272
}
273273
}

src/main/java/org/springframework/data/redis/core/DefaultZSetOperations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,13 +409,13 @@ public Long doInRedis(RedisConnection connection) {
409409
public Cursor<TypedTuple<V>> scan(K key, final ScanOptions options) {
410410

411411
final byte[] rawKey = rawKey(key);
412-
Cursor<Tuple> cursor = execute(new RedisCallback<Cursor<Tuple>>() {
412+
Cursor<Tuple> cursor = template.executeWithStickyConnection(new RedisCallback<Cursor<Tuple>>() {
413413

414414
@Override
415415
public Cursor<Tuple> doInRedis(RedisConnection connection) throws DataAccessException {
416416
return connection.zScan(rawKey, options);
417417
}
418-
}, true);
418+
});
419419

420420
return new ConvertingCursor<Tuple, TypedTuple<V>>(cursor, new Converter<Tuple, TypedTuple<V>>() {
421421

src/main/java/org/springframework/data/redis/core/HashOperations.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ public interface HashOperations<H, HK, HV> {
150150
RedisOperations<H, ?> getOperations();
151151

152152
/**
153-
* Use a {@link Cursor} to iterate over entries in hash at {@code key}.
153+
* Use a {@link Cursor} to iterate over entries in hash at {@code key}. <br />
154+
* <strong>Important:</strong> Call {@link Cursor#close()} when done to avoid resource leak.
154155
*
155156
* @since 1.4
156157
* @param key must not be {@literal null}.

src/main/java/org/springframework/data/redis/core/RedisOperations.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
*/
1616
package org.springframework.data.redis.core;
1717

18+
import java.io.Closeable;
1819
import java.util.Collection;
1920
import java.util.Date;
2021
import java.util.List;
2122
import java.util.Set;
2223
import java.util.concurrent.TimeUnit;
2324

2425
import org.springframework.data.redis.connection.DataType;
26+
import org.springframework.data.redis.connection.RedisConnection;
2527
import org.springframework.data.redis.core.query.SortQuery;
2628
import org.springframework.data.redis.core.script.RedisScript;
2729
import org.springframework.data.redis.core.types.RedisClientInfo;
@@ -129,6 +131,16 @@ public interface RedisOperations<K, V> {
129131
<T> T execute(RedisScript<T> script, RedisSerializer<?> argsSerializer, RedisSerializer<T> resultSerializer,
130132
List<K> keys, Object... args);
131133

134+
/**
135+
* Allocates and binds a new {@link RedisConnection} to the actual return type of the method. It is up to the caller
136+
* to free resources after use.
137+
*
138+
* @param callback must not be {@literal null}.
139+
* @return
140+
* @since 1.8
141+
*/
142+
<T extends Closeable> T executeWithStickyConnection(RedisCallback<T> callback);
143+
132144
Boolean hasKey(K key);
133145

134146
void delete(K key);

src/main/java/org/springframework/data/redis/core/RedisTemplate.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.springframework.data.redis.core;
1717

18+
import java.io.Closeable;
1819
import java.lang.reflect.Proxy;
1920
import java.util.ArrayList;
2021
import java.util.Collection;
@@ -304,6 +305,23 @@ public <T> T execute(RedisScript<T> script, RedisSerializer<?> argsSerializer, R
304305
return scriptExecutor.execute(script, argsSerializer, resultSerializer, keys, args);
305306
}
306307

308+
/*
309+
* (non-Javadoc)
310+
* @see org.springframework.data.redis.core.RedisOperations#executeWithStickyConnection(org.springframework.data.redis.core.RedisCallback)
311+
*/
312+
public <T extends Closeable> T executeWithStickyConnection(RedisCallback<T> callback) {
313+
314+
Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
315+
Assert.notNull(callback, "Callback object must not be null");
316+
317+
RedisConnectionFactory factory = getConnectionFactory();
318+
319+
RedisConnection connection = preProcessConnection(RedisConnectionUtils.doGetConnection(factory, true, false, false),
320+
false);
321+
322+
return callback.doInRedis(connection);
323+
}
324+
307325
private Object executeSession(SessionCallback<?> session) {
308326
return session.execute(this);
309327
}

src/main/java/org/springframework/data/redis/core/SetOperations.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2011-2014 the original author or authors.
2+
* Copyright 2011-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -75,8 +75,9 @@ public interface SetOperations<K, V> {
7575
RedisOperations<K, V> getOperations();
7676

7777
/**
78-
* Iterate over elements in set at {@code key}.
79-
*
78+
* Iterate over elements in set at {@code key}. <br />
79+
* <strong>Important:</strong> Call {@link Cursor#close()} when done to avoid resource leak.
80+
*
8081
* @since 1.4
8182
* @param key
8283
* @param options

src/main/java/org/springframework/data/redis/core/ZSetOperations.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ public interface TypedTuple<V> extends Comparable<TypedTuple<V>> {
136136
RedisOperations<K, V> getOperations();
137137

138138
/**
139+
* Iterate over elements in zset at {@code key}. <br />
140+
* <strong>Important:</strong> Call {@link Cursor#close()} when done to avoid resource leak.
141+
*
139142
* @since 1.4
140143
* @param key
141144
* @param options

0 commit comments

Comments
 (0)