|
27 | 27 | import java.util.Collection;
|
28 | 28 | import java.util.Collections;
|
29 | 29 | import java.util.Date;
|
| 30 | +import java.util.concurrent.ConcurrentHashMap; |
| 31 | +import java.util.concurrent.ConcurrentMap; |
| 32 | +import java.util.concurrent.CountDownLatch; |
| 33 | +import java.util.concurrent.atomic.AtomicInteger; |
30 | 34 | import java.util.function.Consumer;
|
| 35 | +import java.util.stream.Stream; |
31 | 36 |
|
32 | 37 | import org.junit.jupiter.api.BeforeEach;
|
33 | 38 |
|
@@ -377,15 +382,47 @@ void cacheShouldFailOnNonConvertibleCacheKey() {
|
377 | 382 | assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> cache.put(key, sample));
|
378 | 383 | }
|
379 | 384 |
|
380 |
| - void doWithConnection(Consumer<RedisConnection> callback) { |
381 |
| - RedisConnection connection = connectionFactory.getConnection(); |
382 |
| - try { |
383 |
| - callback.accept(connection); |
384 |
| - } finally { |
385 |
| - connection.close(); |
386 |
| - } |
| 385 | + @ParameterizedRedisTest // GH-2079 |
| 386 | + void multipleThreadsLoadValueOnce() { |
| 387 | + |
| 388 | + int threadCount = 5; |
| 389 | + |
| 390 | + ConcurrentMap<Integer, Integer> valuesByThreadId = new ConcurrentHashMap<>(threadCount); |
| 391 | + |
| 392 | + CountDownLatch waiter = new CountDownLatch(threadCount); |
| 393 | + |
| 394 | + AtomicInteger threadIds = new AtomicInteger(0); |
| 395 | + |
| 396 | + AtomicInteger currentValueForKey = new AtomicInteger(0); |
| 397 | + |
| 398 | + Stream.generate(threadIds::getAndIncrement) |
| 399 | + .limit(threadCount) |
| 400 | + .parallel() |
| 401 | + .forEach((threadId) -> { |
| 402 | + waiter.countDown(); |
| 403 | + try { |
| 404 | + waiter.await(); |
| 405 | + } catch (InterruptedException e) { |
| 406 | + e.printStackTrace(); |
| 407 | + } |
| 408 | + Integer valueForThread = cache.get("key", currentValueForKey::incrementAndGet); |
| 409 | + valuesByThreadId.put(threadId, valueForThread); |
| 410 | + }); |
| 411 | + |
| 412 | + valuesByThreadId.forEach((thread, valueForThread) -> { |
| 413 | + assertThat(valueForThread).isEqualTo(currentValueForKey.get()); |
| 414 | + }); |
387 | 415 | }
|
388 | 416 |
|
| 417 | + void doWithConnection(Consumer<RedisConnection> callback) { |
| 418 | + RedisConnection connection = connectionFactory.getConnection(); |
| 419 | + try { |
| 420 | + callback.accept(connection); |
| 421 | + } finally { |
| 422 | + connection.close(); |
| 423 | + } |
| 424 | + } |
| 425 | + |
389 | 426 | @Data
|
390 | 427 | @NoArgsConstructor
|
391 | 428 | @AllArgsConstructor
|
|
0 commit comments