Skip to content

Commit 29796c7

Browse files
committed
DATAREDIS-481 - Polishing.
Reduce visibility of DefaultRedisCacheWriter as trivial implementation. Replace ConnectionCallback with Java 8 functions. Increase visibility of the RedisCache constructor to allow subclassing. Move factory methods to create DefaultRedisCacheWriter to RedisCacheWriter interface. Refactor RedisCacheManagerConfigurator to stateful RedisCacheManagerBuilder. Remove RedisCache.setTransactionAware override in RedisCacheManagerConfigurator removing behavior not compliant with AbstractTransactionSupportingCacheManager. Terminate interrupted cache wait loop with exception. Javadoc, formatting, reorder methods.
1 parent 865bf4e commit 29796c7

File tree

8 files changed

+349
-223
lines changed

8 files changed

+349
-223
lines changed

src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java

Lines changed: 88 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -15,36 +15,36 @@
1515
*/
1616
package org.springframework.data.redis.cache;
1717

18-
import java.nio.charset.Charset;
18+
import java.nio.charset.StandardCharsets;
1919
import java.time.Duration;
2020
import java.util.concurrent.TimeUnit;
21+
import java.util.function.Consumer;
22+
import java.util.function.Function;
2123

22-
import org.springframework.data.redis.connection.RedisClusterConnection;
24+
import org.springframework.dao.PessimisticLockingFailureException;
2325
import org.springframework.data.redis.connection.RedisConnection;
2426
import org.springframework.data.redis.connection.RedisConnectionFactory;
2527
import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
26-
import org.springframework.data.redis.connection.ReturnType;
2728
import org.springframework.data.redis.core.types.Expiration;
2829
import org.springframework.util.Assert;
2930

3031
/**
3132
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
3233
* and {@literal cluster} environments. Works upon a given {@link RedisConnectionFactory} to obtain the actual
3334
* {@link RedisConnection}. <br />
34-
* {@link DefaultRedisCacheWriter} can be used in {@link #lockingRedisCacheWriter(RedisConnectionFactory) locking} or
35-
* {@link #nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While {@literal non-locking} aims for
36-
* maximum performance it may result in overlapping, non atomic, command execution for operations spanning multiple
37-
* Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents command overlap by setting
38-
* an explicit lock key and checking against presence of this key which leads to additional requests and potential
39-
* command wait times.
35+
* {@link DefaultRedisCacheWriter} can be used in
36+
* {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking} or
37+
* {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While
38+
* {@literal non-locking} aims for maximum performance it may result in overlapping, non atomic, command execution for
39+
* operations spanning multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents
40+
* command overlap by setting an explicit lock key and checking against presence of this key which leads to additional
41+
* requests and potential command wait times.
4042
*
4143
* @author Christoph Strobl
44+
* @author Mark Paluch
4245
* @since 2.0
4346
*/
44-
public class DefaultRedisCacheWriter implements RedisCacheWriter {
45-
46-
private static final byte[] CLEAN_SCRIPT = "local keys = redis.call('KEYS', ARGV[1]); local keysCount = table.getn(keys); if(keysCount > 0) then for _, key in ipairs(keys) do redis.call('del', key); end; end; return keysCount;"
47-
.getBytes(Charset.forName("UTF-8"));
47+
class DefaultRedisCacheWriter implements RedisCacheWriter {
4848

4949
private final RedisConnectionFactory connectionFactory;
5050
private final Duration sleepTime;
@@ -70,30 +70,10 @@ public class DefaultRedisCacheWriter implements RedisCacheWriter {
7070
this.sleepTime = sleepTime;
7171
}
7272

73-
/**
74-
* Create new {@link RedisCacheWriter} without locking behavior.
75-
*
76-
* @param connectionFactory must not be {@literal null}.
77-
* @return new instance of {@link DefaultRedisCacheWriter}.
78-
*/
79-
public static DefaultRedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connectionFactory) {
80-
81-
Assert.notNull(connectionFactory, "ConnectionFactory must not be null!");
82-
return new DefaultRedisCacheWriter(connectionFactory);
83-
}
84-
85-
/**
86-
* Create new {@link RedisCacheWriter} with locking behavior.
87-
*
88-
* @param connectionFactory must not be {@literal null}.
89-
* @return new instance of {@link DefaultRedisCacheWriter}.
73+
/*
74+
* (non-Javadoc)
75+
* @see org.springframework.data.redis.cache.RedisCacheWriter#put(java.lang.String, byte[], byte[], java.time.Duration)
9076
*/
91-
public static DefaultRedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectionFactory) {
92-
93-
Assert.notNull(connectionFactory, "ConnectionFactory must not be null!");
94-
return new DefaultRedisCacheWriter(connectionFactory, Duration.ofMillis(50));
95-
}
96-
9777
@Override
9878
public void put(String name, byte[] key, byte[] value, Duration ttl) {
9979

@@ -114,6 +94,10 @@ public void put(String name, byte[] key, byte[] value, Duration ttl) {
11494
});
11595
}
11696

97+
/*
98+
* (non-Javadoc)
99+
* @see org.springframework.data.redis.cache.RedisCacheWriter#get(java.lang.String, byte[])
100+
*/
117101
@Override
118102
public byte[] get(String name, byte[] key) {
119103

@@ -123,6 +107,10 @@ public byte[] get(String name, byte[] key) {
123107
return execute(name, connection -> connection.get(key));
124108
}
125109

110+
/*
111+
* (non-Javadoc)
112+
* @see org.springframework.data.redis.cache.RedisCacheWriter#putIfAbsent(java.lang.String, byte[], byte[], java.time.Duration)
113+
*/
126114
@Override
127115
public byte[] putIfAbsent(String name, byte[] key, byte[] value, Duration ttl) {
128116

@@ -156,6 +144,10 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, Duration ttl) {
156144
});
157145
}
158146

147+
/*
148+
* (non-Javadoc)
149+
* @see org.springframework.data.redis.cache.RedisCacheWriter#remove(java.lang.String, byte[])
150+
*/
159151
@Override
160152
public void remove(String name, byte[] key) {
161153

@@ -165,46 +157,10 @@ public void remove(String name, byte[] key) {
165157
execute(name, connection -> connection.del(key));
166158
}
167159

168-
/**
169-
* Explicitly set a write lock on a cache.
170-
*
171-
* @param name the name of the cache to lock.
172-
*/
173-
public void lock(String name) {
174-
execute(name, connection -> doLock(name, connection));
175-
}
176-
177-
private Boolean doLock(String name, RedisConnection connection) {
178-
return connection.setNX(createCacheLockKey(name), new byte[] {});
179-
}
180-
181-
/**
182-
* Explicitly remove a write lock from a cache.
183-
*
184-
* @param name the name of the cache to unlock.
185-
*/
186-
public void unlock(String name) {
187-
executeWithoutLockCheck(connection -> doUnlock(name, connection));
188-
}
189-
190-
private Long doUnlock(String name, RedisConnection connection) {
191-
return connection.del(createCacheLockKey(name));
192-
}
193-
194-
/**
195-
* Check if a cache has set a lock.
196-
*
197-
* @param name the name of the cache to check for presence of a lock.
198-
* @return {@literal true} if lock found.
160+
/*
161+
* (non-Javadoc)
162+
* @see org.springframework.data.redis.cache.RedisCacheWriter#clean(java.lang.String, byte[])
199163
*/
200-
public boolean isLoked(String name) {
201-
return executeWithoutLockCheck(connection -> doCheckLock(name, connection));
202-
}
203-
204-
private boolean doCheckLock(String name, RedisConnection connection) {
205-
return connection.exists(createCacheLockKey(name));
206-
}
207-
208164
@Override
209165
public void clean(String name, byte[] pattern) {
210166

@@ -216,17 +172,16 @@ public void clean(String name, byte[] pattern) {
216172
boolean wasLocked = false;
217173

218174
try {
219-
if (connection instanceof RedisClusterConnection) {
220175

221-
if (isLockingCacheWriter()) {
222-
doLock(name, connection);
223-
wasLocked = true;
224-
}
176+
if (isLockingCacheWriter()) {
177+
doLock(name, connection);
178+
wasLocked = true;
179+
}
180+
181+
byte[][] keys = connection.keys(pattern).toArray(new byte[0][]);
225182

226-
byte[][] keys = connection.keys(pattern).stream().toArray(size -> new byte[size][]);
183+
if (keys.length > 0) {
227184
connection.del(keys);
228-
} else {
229-
connection.eval(CLEAN_SCRIPT, ReturnType.INTEGER, 0, pattern);
230185
}
231186
} finally {
232187

@@ -239,67 +194,92 @@ public void clean(String name, byte[] pattern) {
239194
});
240195
}
241196

197+
/**
198+
* Explicitly set a write lock on a cache.
199+
*
200+
* @param name the name of the cache to lock.
201+
*/
202+
void lock(String name) {
203+
execute(name, connection -> doLock(name, connection));
204+
}
205+
206+
/**
207+
* Explicitly remove a write lock from a cache.
208+
*
209+
* @param name the name of the cache to unlock.
210+
*/
211+
void unlock(String name) {
212+
executeLockFree(connection -> doUnlock(name, connection));
213+
}
214+
215+
private Boolean doLock(String name, RedisConnection connection) {
216+
return connection.setNX(createCacheLockKey(name), new byte[0]);
217+
}
218+
219+
private Long doUnlock(String name, RedisConnection connection) {
220+
return connection.del(createCacheLockKey(name));
221+
}
222+
223+
boolean doCheckLock(String name, RedisConnection connection) {
224+
return connection.exists(createCacheLockKey(name));
225+
}
226+
242227
/**
243228
* @return {@literal true} if {@link RedisCacheWriter} uses locks.
244229
*/
245-
public boolean isLockingCacheWriter() {
230+
private boolean isLockingCacheWriter() {
246231
return !sleepTime.isZero() && !sleepTime.isNegative();
247232
}
248233

249-
<T> T execute(String name, ConnectionCallback<T> callback) {
234+
private <T> T execute(String name, Function<RedisConnection, T> callback) {
250235

251236
RedisConnection connection = connectionFactory.getConnection();
252237
try {
253238

254239
checkAndPotentiallyWaitUntilUnlocked(name, connection);
255-
return callback.doWithConnection(connection);
240+
return callback.apply(connection);
256241
} finally {
257242
connection.close();
258243
}
259244
}
260245

261-
private <T> T executeWithoutLockCheck(ConnectionCallback<T> callback) {
246+
private void executeLockFree(Consumer<RedisConnection> callback) {
262247

263248
RedisConnection connection = connectionFactory.getConnection();
264249

265250
try {
266-
return callback.doWithConnection(connection);
251+
callback.accept(connection);
267252
} finally {
268253
connection.close();
269254
}
270255
}
271256

272257
private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection connection) {
273258

274-
if (isLockingCacheWriter()) {
259+
if (!isLockingCacheWriter()) {
260+
return;
261+
}
275262

276-
long timeout = sleepTime.toMillis();
263+
try {
277264

278265
while (doCheckLock(name, connection)) {
279-
try {
280-
Thread.sleep(timeout);
281-
} catch (InterruptedException ex) {
282-
Thread.currentThread().interrupt();
283-
}
266+
Thread.sleep(sleepTime.toMillis());
284267
}
268+
} catch (InterruptedException ex) {
269+
270+
// Re-interrupt current thread, to allow other participants to react.
271+
Thread.currentThread().interrupt();
272+
273+
throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to unlock cache %s", name),
274+
ex);
285275
}
286276
}
287277

288-
private boolean shouldExpireWithin(Duration ttl) {
278+
private static boolean shouldExpireWithin(Duration ttl) {
289279
return ttl != null && !ttl.isZero() && !ttl.isNegative();
290280
}
291281

292-
byte[] createCacheLockKey(String name) {
293-
return (name + "~lock").getBytes(Charset.forName("UTF-8"));
294-
}
295-
296-
/**
297-
* @param <T>
298-
* @author Christoph Strobl
299-
* @since 2.0
300-
*/
301-
interface ConnectionCallback<T> {
302-
303-
T doWithConnection(RedisConnection connection);
282+
private static byte[] createCacheLockKey(String name) {
283+
return (name + "~lock").getBytes(StandardCharsets.UTF_8);
304284
}
305285
}

0 commit comments

Comments
 (0)