Skip to content

Commit c1e0745

Browse files
committed
Fix blocking bug in RedisCache.retrieve(..) when waiting on the cache lock.
Closes #2650
1 parent b587361 commit c1e0745

File tree

2 files changed

+56
-12
lines changed

2 files changed

+56
-12
lines changed

src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,17 @@ private BiFunction<byte[], Duration, Mono<byte[]>> reactiveExecutionStrategy(Str
189189

190190
ByteBuffer wrappedKey = ByteBuffer.wrap(key);
191191

192-
Mono<ByteBuffer> result = shouldExpireWithin(ttl)
192+
// Do the same lock check as the regular Cache.get(key); be careful of blocking!
193+
Mono<ByteBuffer> cacheLockCheckMono = Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
194+
executeLockFree(connection -> checkAndPotentiallyWaitUntilUnlocked(cacheName, connection));
195+
return ByteBuffer.wrap(new byte[0]);
196+
}));
197+
198+
Mono<ByteBuffer> getMono = shouldExpireWithin(ttl)
193199
? executeReactively(connection -> connection.stringCommands().getEx(wrappedKey, Expiration.from(ttl)))
194200
: executeReactively(connection -> connection.stringCommands().get(wrappedKey));
195201

196-
// Do the same lock check as the regular Cache.get(key); be careful of blocking!
197-
result = result.doFirst(() -> executeLockFree(connection ->
198-
checkAndPotentiallyWaitUntilUnlocked(cacheName, connection)));
202+
Mono<ByteBuffer> result = cacheLockCheckMono.then(getMono);
199203

200204
@SuppressWarnings("all")
201205
Mono<byte[]> byteArrayResult = result.map(DefaultRedisCacheWriter::nullSafeGetBytes);

src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,6 @@
2020
import static org.assertj.core.api.Assumptions.assumeThat;
2121
import static org.awaitility.Awaitility.await;
2222

23-
import io.netty.util.concurrent.DefaultThreadFactory;
24-
25-
import reactor.core.publisher.Mono;
26-
2723
import java.io.Serializable;
2824
import java.nio.charset.StandardCharsets;
2925
import java.time.Duration;
@@ -48,6 +44,7 @@
4844
import java.util.stream.IntStream;
4945

5046
import org.junit.jupiter.api.BeforeEach;
47+
5148
import org.springframework.cache.Cache.ValueWrapper;
5249
import org.springframework.cache.interceptor.SimpleKey;
5350
import org.springframework.cache.interceptor.SimpleKeyGenerator;
@@ -62,6 +59,10 @@
6259
import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest;
6360
import org.springframework.lang.Nullable;
6461

62+
import io.netty.util.concurrent.DefaultThreadFactory;
63+
64+
import reactor.core.publisher.Mono;
65+
6566
/**
6667
* Tests for {@link RedisCache} with {@link DefaultRedisCacheWriter} using different {@link RedisSerializer} and
6768
* {@link RedisConnectionFactory} pairs.
@@ -569,9 +570,9 @@ void cacheGetWithTimeToIdleExpirationAfterEntryExpiresShouldReturnNull() {
569570
assertThat(cache.get(this.cacheKey, Person.class)).isNull();
570571
}
571572

572-
@ParameterizedRedisTest // Gh-2650
573+
@ParameterizedRedisTest // GH-2650
573574
@SuppressWarnings("unchecked")
574-
void retrieveReturnsCachedValueCorrectly() throws Exception {
575+
void retrieveReturnsCachedValue() throws Exception {
575576

576577
doWithConnection(connection -> connection.stringCommands().set(this.binaryCacheKey, this.binarySample));
577578

@@ -581,10 +582,43 @@ void retrieveReturnsCachedValueCorrectly() throws Exception {
581582

582583
assertThat(value).isNotNull();
583584
assertThat(value.get()).isEqualTo(this.sample);
585+
assertThat(value).isDone();
586+
}
587+
588+
@ParameterizedRedisTest // GH-2650
589+
@SuppressWarnings("unchecked")
590+
void retrieveReturnsCachedValueWhenLockIsReleased() throws Exception {
591+
592+
String mockValue = "MockValue";
593+
String testValue = "TestValue";
594+
595+
byte[] binaryCacheValue = this.serializer.serialize(testValue);
596+
597+
doWithConnection(connection -> connection.stringCommands().set(this.binaryCacheKey, binaryCacheValue));
598+
599+
RedisCache cache = new RedisCache("cache", usingLockingRedisCacheWriter(Duration.ofMillis(5L)),
600+
usingRedisCacheConfiguration());
601+
602+
RedisCacheWriter cacheWriter = cache.getCacheWriter();
603+
604+
assertThat(cacheWriter).isInstanceOf(DefaultRedisCacheWriter.class);
605+
606+
((DefaultRedisCacheWriter) cacheWriter).lock("cache");
607+
608+
CompletableFuture<String> value = (CompletableFuture<String>) cache.retrieve(this.key);
609+
610+
assertThat(value).isNotNull();
611+
assertThat(value.getNow(mockValue)).isEqualTo(mockValue);
612+
assertThat(value).isNotDone();
613+
614+
((DefaultRedisCacheWriter) cacheWriter).unlock("cache");
615+
616+
assertThat(value.get(15L, TimeUnit.MILLISECONDS)).isEqualTo(testValue);
617+
assertThat(value).isDone();
584618
}
585619

586-
@ParameterizedRedisTest // Gh-2650
587-
void retrieveReturnsLoadedValueCorrectly() throws Exception {
620+
@ParameterizedRedisTest // GH-2650
621+
void retrieveReturnsLoadedValue() throws Exception {
588622

589623
RedisCache cache = new RedisCache("cache", usingLockingRedisCacheWriter(), usingRedisCacheConfiguration());
590624

@@ -608,6 +642,7 @@ void retrieveReturnsLoadedValueCorrectly() throws Exception {
608642
assertThat(loaded.get()).isFalse();
609643
assertThat(value.get()).isEqualTo(jon);
610644
assertThat(loaded.get()).isTrue();
645+
assertThat(value).isDone();
611646
}
612647

613648
private RedisCacheConfiguration usingRedisCacheConfiguration() {
@@ -629,6 +664,11 @@ private RedisCacheWriter usingLockingRedisCacheWriter() {
629664
return RedisCacheWriter.lockingRedisCacheWriter(this.connectionFactory);
630665
}
631666

667+
private RedisCacheWriter usingLockingRedisCacheWriter(Duration sleepTime) {
668+
return RedisCacheWriter.lockingRedisCacheWriter(this.connectionFactory, sleepTime,
669+
RedisCacheWriter.TtlFunction.persistent(), BatchStrategies.keys());
670+
}
671+
632672
private RedisCacheWriter usingNonLockingRedisCacheWriter() {
633673
return RedisCacheWriter.nonLockingRedisCacheWriter(this.connectionFactory);
634674
}

0 commit comments

Comments
 (0)