Skip to content

Commit f9e85c6

Browse files
DATAREDIS-481 - Hacking: Alternative RedisCache implementation.
1 parent f2cf9fc commit f9e85c6

File tree

6 files changed

+980
-0
lines changed

6 files changed

+980
-0
lines changed
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.redis.cache;
17+
18+
import java.nio.charset.Charset;
19+
import java.time.Duration;
20+
import java.util.concurrent.TimeUnit;
21+
22+
import org.springframework.data.redis.connection.RedisClusterConnection;
23+
import org.springframework.data.redis.connection.RedisConnection;
24+
import org.springframework.data.redis.connection.RedisConnectionFactory;
25+
import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
26+
import org.springframework.data.redis.connection.ReturnType;
27+
import org.springframework.data.redis.core.types.Expiration;
28+
import org.springframework.util.Assert;
29+
30+
/**
31+
* @author Christoph Strobl
32+
* @since 2.0
33+
*/
34+
class DefaultRedisCacheWriter implements RedisCacheWriter {
35+
36+
private static final byte[] CLEAN_SCRIPT = "local keys = redis.call('KEYS', ARGV[1]); local keysCount = table.getn(keys); if(keysCount > 0) then for _, key in ipairs(keys) do redis.call('del', key); end; end; return keysCount;"
37+
.getBytes(Charset.forName("UTF-8"));
38+
39+
private final RedisConnectionFactory connectionFactory;
40+
private final Duration lockTimeout;
41+
42+
/**
43+
* @param connectionFactory must not be {@literal null}.
44+
*/
45+
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory) {
46+
this(connectionFactory, Duration.ZERO);
47+
}
48+
49+
/**
50+
* @param connectionFactory must not be {@literal null}.
51+
* @param lockTimeout must not be {@literal null}. Use {@link Duration#ZERO} to disable locking.
52+
*/
53+
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration lockTimeout) {
54+
55+
Assert.notNull(connectionFactory, "ConnectionFactory must not be null!");
56+
Assert.notNull(lockTimeout, "LockTimeout must not be null!");
57+
58+
this.connectionFactory = connectionFactory;
59+
this.lockTimeout = lockTimeout;
60+
}
61+
62+
@Override
63+
public void put(String name, byte[] key, byte[] value, Duration ttl) {
64+
65+
execute(name, connection -> {
66+
67+
if (shouldExpireWithin(ttl)) {
68+
connection.set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());
69+
} else {
70+
connection.set(key, value);
71+
}
72+
73+
return "OK";
74+
});
75+
}
76+
77+
@Override
78+
public byte[] get(String name, byte[] key) {
79+
return execute(name, connection -> connection.get(key));
80+
}
81+
82+
@Override
83+
public byte[] putIfAbsent(String name, byte[] key, byte[] value, Duration ttl) {
84+
85+
return execute(name, connection -> {
86+
87+
if (connection.setNX(key, value)) {
88+
89+
if (shouldExpireWithin(ttl)) {
90+
connection.pExpire(key, ttl.toMillis());
91+
}
92+
return null;
93+
}
94+
95+
return connection.get(key);
96+
});
97+
}
98+
99+
@Override
100+
public void remove(String name, byte[] key) {
101+
execute(name, connection -> connection.del(key));
102+
}
103+
104+
public void lock(String name) {
105+
executeWithoutLockCheck(connection -> doLock(name, connection));
106+
}
107+
108+
private Boolean doLock(String name, RedisConnection connection) {
109+
return connection.setNX(createCacheLockKey(name), new byte[] {});
110+
}
111+
112+
public void unlock(String name) {
113+
executeWithoutLockCheck(connection -> doUnlock(name, connection));
114+
}
115+
116+
private Long doUnlock(String name, RedisConnection connection) {
117+
return connection.del(createCacheLockKey(name));
118+
}
119+
120+
public boolean isLoked(String name) {
121+
return executeWithoutLockCheck(connection -> doCheckLock(name, connection));
122+
}
123+
124+
private boolean doCheckLock(String name, RedisConnection connection) {
125+
return connection.exists(createCacheLockKey(name));
126+
}
127+
128+
@Override
129+
public void clean(String name, byte[] pattern) {
130+
131+
RedisConnection connection = connectionFactory.getConnection();
132+
133+
if (isLockingCacheWriter()) {
134+
doLock(name, connection);
135+
}
136+
137+
try {
138+
if (connection instanceof RedisClusterConnection) {
139+
140+
byte[][] keys = connection.keys(pattern).stream().toArray(size -> new byte[size][]);
141+
connection.del(keys);
142+
} else {
143+
connection.eval(CLEAN_SCRIPT, ReturnType.INTEGER, 0, pattern);
144+
}
145+
} finally {
146+
147+
if (isLockingCacheWriter()) {
148+
doUnlock(name, connection);
149+
}
150+
connection.close();
151+
}
152+
}
153+
154+
public <T> T execute(String name, ConnectionCallback<T> callback) {
155+
156+
RedisConnection connection = connectionFactory.getConnection();
157+
try {
158+
159+
checkAndPotentiallyWaitForLock(name, connection);
160+
return callback.doWithConnection(connection);
161+
} finally {
162+
connection.close();
163+
}
164+
}
165+
166+
private <T> T executeWithoutLockCheck(ConnectionCallback<T> callback) {
167+
168+
RedisConnection connection = connectionFactory.getConnection();
169+
170+
try {
171+
return callback.doWithConnection(connection);
172+
} finally {
173+
connection.close();
174+
}
175+
}
176+
177+
public boolean isLockingCacheWriter() {
178+
return !lockTimeout.isZero() && !lockTimeout.isNegative();
179+
}
180+
181+
private void checkAndPotentiallyWaitForLock(String name, RedisConnection connection) {
182+
183+
if (isLockingCacheWriter()) {
184+
185+
long timeout = lockTimeout.toMillis();
186+
187+
while (doCheckLock(name, connection)) {
188+
try {
189+
Thread.sleep(timeout);
190+
} catch (InterruptedException ex) {
191+
Thread.currentThread().interrupt();
192+
}
193+
}
194+
}
195+
}
196+
197+
private boolean shouldExpireWithin(Duration ttl) {
198+
return ttl != null && !ttl.isZero() && !ttl.isNegative();
199+
}
200+
201+
byte[] createCacheLockKey(String name) {
202+
return (name + "~lock").getBytes(Charset.forName("UTF-8"));
203+
}
204+
205+
interface ConnectionCallback<T> {
206+
T doWithConnection(RedisConnection connection);
207+
}
208+
}

0 commit comments

Comments
 (0)