15
15
*/
16
16
package org .springframework .data .redis .cache ;
17
17
18
+ import java .nio .ByteBuffer ;
18
19
import java .nio .charset .StandardCharsets ;
19
20
import java .time .Duration ;
21
+ import java .util .concurrent .CompletableFuture ;
20
22
import java .util .concurrent .TimeUnit ;
23
+ import java .util .function .BiFunction ;
21
24
import java .util .function .Consumer ;
22
25
import java .util .function .Function ;
26
+ import java .util .function .Supplier ;
23
27
24
28
import org .springframework .dao .PessimisticLockingFailureException ;
29
+ import org .springframework .data .redis .connection .ReactiveRedisConnection ;
30
+ import org .springframework .data .redis .connection .ReactiveRedisConnectionFactory ;
25
31
import org .springframework .data .redis .connection .RedisConnection ;
26
32
import org .springframework .data .redis .connection .RedisConnectionFactory ;
27
33
import org .springframework .data .redis .connection .RedisStringCommands .SetOption ;
28
34
import org .springframework .data .redis .core .types .Expiration ;
35
+ import org .springframework .data .redis .util .ByteUtils ;
29
36
import org .springframework .lang .Nullable ;
30
37
import org .springframework .util .Assert ;
31
38
39
+ import reactor .core .publisher .Mono ;
40
+
32
41
/**
33
42
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
34
43
* and {@literal cluster} environments, and uses a given {@link RedisConnectionFactory} to obtain the actual
@@ -114,8 +123,8 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
114
123
Assert .notNull (key , "Key must not be null" );
115
124
116
125
byte [] result = shouldExpireWithin (ttl )
117
- ? execute (name , connection -> connection .stringCommands ().getEx (key , Expiration .from (ttl )))
118
- : execute (name , connection -> connection .stringCommands ().get (key ));
126
+ ? execute (name , connection -> connection .stringCommands ().getEx (key , Expiration .from (ttl )))
127
+ : execute (name , connection -> connection .stringCommands ().get (key ));
119
128
120
129
statistics .incGets (name );
121
130
@@ -128,6 +137,77 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
128
137
return result ;
129
138
}
130
139
140
+ @ Override
141
+ public Mono <byte []> retrieve (String name , byte [] key , @ Nullable Duration ttl ) {
142
+
143
+ Assert .notNull (name , "Name must not be null" );
144
+ Assert .notNull (key , "Key must not be null" );
145
+
146
+ Mono <byte []> result = nonBlockingExecutionStrategy (name ).apply (key , ttl );
147
+
148
+ result = result .doOnSuccess (byteBuffer -> {
149
+ if (byteBuffer != null ) {
150
+ statistics .incHits (name );
151
+ }
152
+ else {
153
+ statistics .incMisses (name );
154
+ }
155
+ }).doFirst (() -> statistics .incGets (name ));
156
+
157
+ return result ;
158
+ }
159
+
160
+ private BiFunction <byte [], Duration , Mono <byte []>> nonBlockingExecutionStrategy (String cacheName ) {
161
+ return isReactiveAvailable () ? reactiveExecutionStrategy (cacheName ) : asyncExecutionStrategy (cacheName );
162
+ }
163
+
164
+ // Execution Strategy applied when Jedis (non-Reactive driver) is used.
165
+ // Treats API consistently (that is, using Reactive types, such as Mono) at the RedisCacheWriter level
166
+ // whether "technically Reactive" or not; clearly Jedis is not "Reactive".
167
+ private BiFunction <byte [], Duration , Mono <byte []>> asyncExecutionStrategy (String cacheName ) {
168
+
169
+ return (key , ttl ) -> {
170
+
171
+ Supplier <byte []> getKey = () -> execute (cacheName , connection -> connection .stringCommands ().get (key ));
172
+
173
+ Supplier <byte []> getKeyWithExpiration = () -> execute (cacheName , connection ->
174
+ connection .stringCommands ().getEx (key , Expiration .from (ttl )));
175
+
176
+ // NOTE: CompletableFuture.supplyAsync(:Supplier) is necessary in this case to prevent blocking
177
+ // on Mono.subscribe(:Consumer).
178
+ return shouldExpireWithin (ttl )
179
+ ? Mono .fromFuture (CompletableFuture .supplyAsync (getKeyWithExpiration ))
180
+ : Mono .fromFuture (CompletableFuture .supplyAsync (getKey ));
181
+ };
182
+ }
183
+
184
+ // Execution Strategy applied when Lettuce (Reactive driver) is used.
185
+ // Be careful to do this in a "non-blocking way", but still taking the "named" cache lock into consideration.
186
+ private BiFunction <byte [], Duration , Mono <byte []>> reactiveExecutionStrategy (String cacheName ) {
187
+
188
+ return (key , ttl ) -> {
189
+
190
+ ByteBuffer wrappedKey = ByteBuffer .wrap (key );
191
+
192
+ Mono <ByteBuffer > result = shouldExpireWithin (ttl )
193
+ ? executeReactively (connection -> connection .stringCommands ().getEx (wrappedKey , Expiration .from (ttl )))
194
+ : executeReactively (connection -> connection .stringCommands ().get (wrappedKey ));
195
+
196
+ // Do the same lock check as the regular Cache.get(key); be careful of blocking!
197
+ result = result .doFirst (() -> executeLockFree (connection ->
198
+ checkAndPotentiallyWaitUntilUnlocked (cacheName , connection )));
199
+
200
+ @ SuppressWarnings ("all" )
201
+ Mono <byte []> byteArrayResult = result .map (DefaultRedisCacheWriter ::nullSafeGetBytes );
202
+
203
+ return byteArrayResult ;
204
+ };
205
+ }
206
+
207
+ @ Nullable
208
+ private static byte [] nullSafeGetBytes (@ Nullable ByteBuffer value ) {
209
+ return value != null ? ByteUtils .getBytes (value ) : null ;
210
+ }
131
211
@ Override
132
212
public void put (String name , byte [] key , byte [] value , @ Nullable Duration ttl ) {
133
213
@@ -308,6 +388,18 @@ private void executeLockFree(Consumer<RedisConnection> callback) {
308
388
}
309
389
}
310
390
391
+ private <T > T executeReactively (Function <ReactiveRedisConnection , T > callback ) {
392
+
393
+ ReactiveRedisConnection connection = getReactiveRedisConnectionFactory ().getReactiveConnection ();
394
+
395
+ try {
396
+ return callback .apply (connection );
397
+ }
398
+ finally {
399
+ connection .closeLater ();
400
+ }
401
+ }
402
+
311
403
private void checkAndPotentiallyWaitUntilUnlocked (String name , RedisConnection connection ) {
312
404
313
405
if (!isLockingCacheWriter ()) {
@@ -333,11 +425,19 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
333
425
}
334
426
}
335
427
428
+ private boolean isReactiveAvailable () {
429
+ return this .connectionFactory instanceof ReactiveRedisConnectionFactory ;
430
+ }
431
+
432
+ private ReactiveRedisConnectionFactory getReactiveRedisConnectionFactory () {
433
+ return (ReactiveRedisConnectionFactory ) this .connectionFactory ;
434
+ }
435
+
336
436
private static byte [] createCacheLockKey (String name ) {
337
437
return (name + "~lock" ).getBytes (StandardCharsets .UTF_8 );
338
438
}
339
439
340
- private boolean isTrue (@ Nullable Boolean value ) {
440
+ private static boolean isTrue (@ Nullable Boolean value ) {
341
441
return Boolean .TRUE .equals (value );
342
442
}
343
443
0 commit comments