Skip to content

Commit f4676ed

Browse files
mp911dechristophstrobl
authored andcommitted
DATAREDIS-1138 - Fix detection whether XREAD is blocking.
We now correctly check if a BLOCK option is configured using a timeout of zero or higher. Previously we only checked if the configured value is greater than zero and didn't consider that a timeout of zero blocks indefinitely. Original Pull Request: #528
1 parent def86fc commit f4676ed

File tree

4 files changed

+49
-6
lines changed

4 files changed

+49
-6
lines changed

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ public Flux<CommandResponse<ReadCommand, Flux<ByteBufferRecord>>> read(Publisher
322322

323323
StreamReadOptions readOptions = command.getReadOptions();
324324

325-
if (readOptions.getBlock() != null && readOptions.getBlock() > 0) {
325+
if (readOptions.isBlocking()) {
326326
return new CommandResponse<>(command, connection.executeDedicated(cmd -> doRead(command, readOptions, cmd)));
327327
}
328328

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ public List<ByteRecord> xRead(StreamReadOptions readOptions, StreamOffset<byte[]
516516
XReadArgs.StreamOffset<byte[]>[] streamOffsets = toStreamOffsets(streams);
517517
XReadArgs args = StreamConverters.toReadArgs(readOptions);
518518

519-
if (isBlocking(readOptions)) {
519+
if (readOptions.isBlocking()) {
520520

521521
try {
522522
if (isPipelined()) {
@@ -568,7 +568,7 @@ public List<ByteRecord> xReadGroup(Consumer consumer, StreamReadOptions readOpti
568568
XReadArgs args = StreamConverters.toReadArgs(readOptions);
569569
io.lettuce.core.Consumer<byte[]> lettuceConsumer = toConsumer(consumer);
570570

571-
if (isBlocking(readOptions)) {
571+
if (readOptions.isBlocking()) {
572572

573573
try {
574574
if (isPipelined()) {
@@ -699,9 +699,6 @@ private DataAccessException convertLettuceAccessException(Exception ex) {
699699
return connection.convertLettuceAccessException(ex);
700700
}
701701

702-
private static boolean isBlocking(StreamReadOptions readOptions) {
703-
return readOptions.getBlock() != null && readOptions.getBlock() > 0;
704-
}
705702

706703
@SuppressWarnings("unchecked")
707704
private static XReadArgs.StreamOffset<byte[]>[] toStreamOffsets(StreamOffset<byte[]>[] streams) {

src/main/java/org/springframework/data/redis/connection/stream/StreamReadOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,12 @@ public StreamReadOptions count(long count) {
105105

106106
return new StreamReadOptions(block, count, noack);
107107
}
108+
109+
/**
110+
* @return {@literal true} if the arguments indicate a blocking read.
111+
* @since 2.3
112+
*/
113+
public boolean isBlocking() {
114+
return getBlock() != null && getBlock() >= 0;
115+
}
108116
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.redis.connection.stream;
17+
18+
import static org.assertj.core.api.Assertions.*;
19+
20+
import java.time.Duration;
21+
22+
import org.junit.Test;
23+
24+
/**
25+
* Unit tests for {@link StreamReadOptions}.
26+
*
27+
* @author Mark Paluch
28+
*/
29+
public class StreamReadOptionsUnitTests {
30+
31+
@Test // DATAREDIS-1138
32+
public void shouldConsiderBlocking() {
33+
34+
assertThat(StreamReadOptions.empty().isBlocking()).isFalse();
35+
assertThat(StreamReadOptions.empty().block(Duration.ofSeconds(1)).isBlocking()).isTrue();
36+
assertThat(StreamReadOptions.empty().block(Duration.ZERO).isBlocking()).isTrue();
37+
}
38+
}

0 commit comments

Comments
 (0)