Skip to content

Commit 8c59586

Browse files
committed
DATAREDIS-1042 - Polishing.
Set MKSTREAM by default when calling createGroup on the operations API as that should be the default behavior in the first place. Reformat code. Add since tags. Original pull request: #527.
1 parent d7b8d9b commit 8c59586

File tree

14 files changed

+27
-128
lines changed

14 files changed

+27
-128
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1185,7 +1185,7 @@ public GroupCommand fromGroup(String groupName) {
11851185
return new GroupCommand(getKey(), action, groupName, consumerName, offset);
11861186
}
11871187

1188-
public boolean getMkStream() {
1188+
public boolean isMkStream() {
11891189
return this.mkStream;
11901190
}
11911191

@@ -1235,6 +1235,7 @@ default Mono<String> xGroupCreate(ByteBuffer key, String groupName, ReadOffset r
12351235
* @param readOffset the offset to start at.
12361236
* @param mkStream if true the group will create the stream if needed (MKSTREAM)
12371237
* @return the {@link Mono} emitting {@literal ok} if successful.
1238+
* @since 2.3
12381239
*/
12391240
default Mono<String> xGroupCreate(ByteBuffer key, String groupName, ReadOffset readOffset, boolean mkStream) {
12401241
return xGroup(Mono.just(GroupCommand.createGroup(groupName).forStream(key).at(readOffset).makeStream(mkStream))).next()

src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ default Long xDel(byte[] key, String... recordIds) {
473473
* @param readOffset the offset to start at.
474474
* @param mkStream if true the group will create the stream if not already present (MKSTREAM)
475475
* @return {@literal ok} if successful. {@literal null} when used in pipeline / transaction.
476+
* @since 2.3
476477
*/
477478
@Nullable
478479
String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset, boolean mkStream);

src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2101,7 +2101,7 @@ default Long xDel(String key, String... entryIds) {
21012101
/**
21022102
* Create a consumer group.
21032103
*
2104-
* @param key
2104+
* @param key the stream key.
21052105
* @param readOffset
21062106
* @param group name of the consumer group.
21072107
* @since 2.2
@@ -2113,12 +2113,13 @@ default Long xDel(String key, String... entryIds) {
21132113
/**
21142114
* Create a consumer group.
21152115
*
2116-
* @param key
2116+
* @param key the stream key.
21172117
* @param readOffset
21182118
* @param group name of the consumer group.
21192119
* @param mkStream if true the group will create the stream if needed (MKSTREAM)
21202120
* @since
21212121
* @return {@literal true} if successful. {@literal null} when used in pipeline / transaction.
2122+
* @since 2.3
21222123
*/
21232124
@Nullable
21242125
String xGroupCreate(String key, ReadOffset readOffset, String group, boolean mkStream);

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public Flux<CommandResponse<GroupCommand, String>> xGroup(Publisher<GroupCommand
192192

193193
return cmd.xgroupCreate(offset,
194194
ByteUtils.getByteBuffer(command.getGroupName()),
195-
XGroupCreateArgs.Builder.mkstream( command.getMkStream()))
195+
XGroupCreateArgs.Builder.mkstream(command.isMkStream()))
196196
.map(it ->
197197
new CommandResponse<>(command, it)
198198
);

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -236,18 +236,17 @@ public String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset,
236236
XReadArgs.StreamOffset<byte[]> streamOffset = XReadArgs.StreamOffset.from(key, readOffset.getOffset());
237237

238238
if (isPipelined()) {
239-
pipeline(connection
240-
.newLettuceResult(getAsyncConnection().xgroupCreate(streamOffset, LettuceConverters.toBytes(groupName),
241-
XGroupCreateArgs.Builder.mkstream(mkSteam))));
239+
pipeline(connection.newLettuceResult(getAsyncConnection().xgroupCreate(streamOffset,
240+
LettuceConverters.toBytes(groupName), XGroupCreateArgs.Builder.mkstream(mkSteam))));
242241
return null;
243242
}
244243
if (isQueueing()) {
245-
transaction(connection
246-
.newLettuceResult(getAsyncConnection().xgroupCreate(streamOffset, LettuceConverters.toBytes(groupName),
247-
XGroupCreateArgs.Builder.mkstream(mkSteam))));
244+
transaction(connection.newLettuceResult(getAsyncConnection().xgroupCreate(streamOffset,
245+
LettuceConverters.toBytes(groupName), XGroupCreateArgs.Builder.mkstream(mkSteam))));
248246
return null;
249247
}
250-
return getConnection().xgroupCreate(streamOffset, LettuceConverters.toBytes(groupName), XGroupCreateArgs.Builder.mkstream(mkSteam));
248+
return getConnection().xgroupCreate(streamOffset, LettuceConverters.toBytes(groupName),
249+
XGroupCreateArgs.Builder.mkstream(mkSteam));
251250
} catch (Exception ex) {
252251
throw convertLettuceAccessException(ex);
253252
}
@@ -712,7 +711,6 @@ private DataAccessException convertLettuceAccessException(Exception ex) {
712711
return connection.convertLettuceAccessException(ex);
713712
}
714713

715-
716714
@SuppressWarnings("unchecked")
717715
private static XReadArgs.StreamOffset<byte[]>[] toStreamOffsets(StreamOffset<byte[]>[] streams) {
718716

src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
*
5656
* @author Mark Paluch
5757
* @author Christoph Strobl
58-
* @author Tugdual Grall
5958
* @since 2.2
6059
*/
6160
class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperations<K, HK, HV> {
@@ -163,17 +162,12 @@ public Mono<Long> delete(K key, RecordId... recordIds) {
163162

164163
@Override
165164
public Mono<String> createGroup(K key, ReadOffset readOffset, String group) {
166-
return createMono(connection -> connection.xGroupCreate(rawKey(key), group, readOffset, false));
167-
}
168-
169-
@Override
170-
public Mono<String> createGroup(K key, ReadOffset readOffset, String group, boolean mkStream) {
171165

172166
Assert.notNull(key, "Key must not be null!");
173167
Assert.notNull(readOffset, "ReadOffset must not be null!");
174168
Assert.notNull(group, "Group must not be null!");
175169

176-
return createMono(connection -> connection.xGroupCreate(rawKey(key), group, readOffset, mkStream));
170+
return createMono(connection -> connection.xGroupCreate(rawKey(key), group, readOffset, true));
177171
}
178172

179173
@Override

src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
*
5151
* @author Mark Paluch
5252
* @author Christoph Strobl
53-
* @autor Tugdual Grall
5453
* @since 2.2
5554
*/
5655
class DefaultStreamOperations<K, HK, HV> extends AbstractOperations<K, Object> implements StreamOperations<K, HK, HV> {
@@ -160,21 +159,7 @@ public Long delete(K key, RecordId... recordIds) {
160159
public String createGroup(K key, ReadOffset readOffset, String group) {
161160

162161
byte[] rawKey = rawKey(key);
163-
return execute(connection -> connection.xGroupCreate(rawKey, group, readOffset), true);
164-
}
165-
166-
/*
167-
* (non-Javadoc)
168-
* @see org.springframework.data.redis.core.StreamOperations#createGroup(java.lang.Object, org.springframework.data.redis.connection.RedisStreamCommands.ReadOffset, java.lang.String, boolean)
169-
*/
170-
@Override
171-
public String createGroup(K key, ReadOffset readOffset, String group, boolean mkStream) {
172-
byte[] rawKey = rawKey(key);
173-
if (!mkStream) {
174-
return createGroup(key, readOffset, group);
175-
} else {
176-
return execute(connection -> connection.xGroupCreate(rawKey, group, readOffset, true), true);
177-
}
162+
return execute(connection -> connection.xGroupCreate(rawKey, group, readOffset, true), true);
178163
}
179164

180165
/*

src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
*
3838
* @author Mark Paluch
3939
* @author Christoph Strobl
40-
* @author Tugdual Grall
4140
* @since 2.2
4241
*/
4342
public interface ReactiveStreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
@@ -159,7 +158,8 @@ default Mono<Long> delete(Record<K, ?> record) {
159158
Mono<Long> delete(K key, RecordId... recordIds);
160159

161160
/**
162-
* Create a consumer group at the {@link ReadOffset#latest() latest offset}.
161+
* Create a consumer group at the {@link ReadOffset#latest() latest offset}. This command creates the stream if it
162+
* does not already exist.
163163
*
164164
* @param key the {@literal key} the stream is stored at.
165165
* @param group name of the consumer group.
@@ -171,20 +171,7 @@ default Mono<String> createGroup(K key, String group) {
171171
}
172172

173173
/**
174-
* Create a consumer group at the {@link ReadOffset#latest() latest offset}.
175-
*
176-
* @param key the {@literal key} the stream is stored at.
177-
* @param group name of the consumer group.
178-
* @param mkStream if true the group will create the stream if not already present (MKSTREAM)
179-
* @return the {@link Mono} emitting {@literal OK} if successful.. {@literal null} when used in pipeline /
180-
* transaction.
181-
*/
182-
default Mono<String> createGroup(K key, String group, boolean mkStream) {
183-
return createGroup(key, ReadOffset.latest(), group, mkStream);
184-
}
185-
186-
/**
187-
* Create a consumer group.
174+
* Create a consumer group. This command creates the stream if it does not already exist.
188175
*
189176
* @param key the {@literal key} the stream is stored at.
190177
* @param readOffset the {@link ReadOffset} to apply.
@@ -193,16 +180,6 @@ default Mono<String> createGroup(K key, String group, boolean mkStream) {
193180
*/
194181
Mono<String> createGroup(K key, ReadOffset readOffset, String group);
195182

196-
/**
197-
* Create a consumer group.
198-
*
199-
* @param key the {@literal key} the stream is stored at.
200-
* @param readOffset the {@link ReadOffset} to apply.
201-
* @param group name of the consumer group.
202-
* @param mkStream if true the group will create the stream if needed (MKSTREAM)
203-
* @return the {@link Mono} emitting {@literal OK} if successful.
204-
*/
205-
Mono<String> createGroup(K key, ReadOffset readOffset, String group, boolean mkStream);
206183
/**
207184
* Delete a consumer from a consumer group.
208185
*

src/main/java/org/springframework/data/redis/core/StreamOperations.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
*
3737
* @author Mark Paluch
3838
* @author Christoph Strobl
39-
* @author Tugdual Grall
4039
* @since 2.2
4140
*/
4241
public interface StreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
@@ -156,7 +155,8 @@ default Long delete(Record<K, ?> record) {
156155
Long delete(K key, RecordId... recordIds);
157156

158157
/**
159-
* Create a consumer group at the {@link ReadOffset#latest() latest offset}.
158+
* Create a consumer group at the {@link ReadOffset#latest() latest offset}. This command creates the stream if it
159+
* does not already exist.
160160
*
161161
* @param key the {@literal key} the stream is stored at.
162162
* @param group name of the consumer group.
@@ -167,7 +167,7 @@ default String createGroup(K key, String group) {
167167
}
168168

169169
/**
170-
* Create a consumer group.
170+
* Create a consumer group. This command creates the stream if it does not already exist.
171171
*
172172
* @param key the {@literal key} the stream is stored at.
173173
* @param readOffset the {@link ReadOffset} to apply.
@@ -177,18 +177,6 @@ default String createGroup(K key, String group) {
177177
@Nullable
178178
String createGroup(K key, ReadOffset readOffset, String group);
179179

180-
/**
181-
* Create a consumer group.
182-
*
183-
* @param key the {@literal key} the stream is stored at.
184-
* @param readOffset the {@link ReadOffset} to apply.
185-
* @param group name of the consumer group.
186-
* @param mkStream if true the group will create the stream if needed (MKSTREAM)
187-
* @return {@literal OK} if successful. {@literal null} when used in pipeline / transaction.
188-
*/
189-
@Nullable
190-
String createGroup(K key, ReadOffset readOffset, String group, boolean mkStream);
191-
192180
/**
193181
* Delete a consumer from a consumer group.
194182
*

src/main/kotlin/org/springframework/data/redis/core/ReactiveStreamOperationsExtensions.kt

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,6 @@ suspend fun <K : Any, HK : Any, HV : Any> ReactiveStreamOperations<K, HK, HV>.de
113113
suspend fun <K : Any, HK : Any, HV : Any> ReactiveStreamOperations<K, HK, HV>.createGroupAndAwait(key: K, group: String): String =
114114
createGroup(key, group).awaitSingle()
115115

116-
/**
117-
* Coroutines variant of [ReactiveStreamOperations.createGroup].
118-
*
119-
* @author Mark Paluch
120-
* @since 2.2
121-
*/
122-
suspend fun <K : Any, HK : Any, HV : Any> ReactiveStreamOperations<K, HK, HV>.createGroupAndAwait(key: K, group: String, mkStream: Boolean): String =
123-
createGroup(key, group, mkStream).awaitSingle()
124-
125116
/**
126117
* Coroutines variant of [ReactiveStreamOperations.createGroup].
127118
*
@@ -131,15 +122,6 @@ suspend fun <K : Any, HK : Any, HV : Any> ReactiveStreamOperations<K, HK, HV>.cr
131122
suspend fun <K : Any, HK : Any, HV : Any> ReactiveStreamOperations<K, HK, HV>.createGroupAndAwait(key: K, readOffset: ReadOffset, group: String): String =
132123
createGroup(key, readOffset, group).awaitSingle()
133124

134-
/**
135-
* Coroutines variant of [ReactiveStreamOperations.createGroup].
136-
*
137-
* @author Tugdual Grall
138-
* @since
139-
*/
140-
suspend fun <K : Any, HK : Any, HV : Any> ReactiveStreamOperations<K, HK, HV>.createGroupAndAwait(key: K, readOffset: ReadOffset, group: String, mkStream: Boolean): String =
141-
createGroup(key, readOffset, group, mkStream).awaitSingle()
142-
143125
/**
144126
* Coroutines variant of [ReactiveStreamOperations.deleteConsumer].
145127
*

src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3064,7 +3064,7 @@ public void xReadGroupShouldReadMessage() {
30643064
@WithRedisDriver({ RedisDriver.LETTUCE })
30653065
public void xGroupCreateShouldWorkWithAndWithoutExistingStream() {
30663066

3067-
actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group",true));
3067+
actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group", true));
30683068
actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
30693069

30703070
actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
@@ -3276,7 +3276,7 @@ public void xClaim() throws InterruptedException {
32763276
@WithRedisDriver({ RedisDriver.LETTUCE })
32773277
public void xinfo() {
32783278

3279-
actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group-without-stream",true));
3279+
actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group-without-stream", true));
32803280
actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
32813281
actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_3, VALUE_3)));
32823282
actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static org.junit.Assume.*;
2020

2121
import io.lettuce.core.XReadArgs;
22-
import org.springframework.data.redis.RedisSystemException;
2322
import reactor.test.StepVerifier;
2423

2524
import java.time.Duration;
@@ -29,7 +28,9 @@
2928
import org.junit.Before;
3029
import org.junit.Ignore;
3130
import org.junit.Test;
31+
3232
import org.springframework.data.domain.Range;
33+
import org.springframework.data.redis.RedisSystemException;
3334
import org.springframework.data.redis.RedisTestProfileValueSource;
3435
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
3536
import org.springframework.data.redis.connection.RedisZSetCommands.Limit;

src/test/java/org/springframework/data/redis/stream/StreamReceiverIntegrationTests.java

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
* Integration tests for {@link StreamReceiver}.
5656
*
5757
* @author Mark Paluch
58-
* @author Tugdual Grall
5958
*/
6059
public class StreamReceiverIntegrationTests {
6160

@@ -213,9 +212,8 @@ public void shouldReceiveAsConsumerGroupMessages() {
213212
Flux<MapRecord<String, String, String>> messages = receiver.receive(Consumer.from("my-group", "my-consumer-id"),
214213
StreamOffset.create("my-stream", ReadOffset.lastConsumed()));
215214

216-
// required to initialize stream
217-
redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value"));
218215
redisTemplate.opsForStream().createGroup("my-stream", ReadOffset.from("0-0"), "my-group");
216+
redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value"));
219217
redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key2", "value2"));
220218

221219
messages.as(StepVerifier::create) //
@@ -246,9 +244,8 @@ public void shouldStopReceivingOnError() {
246244
Flux<MapRecord<String, String, String>> messages = receiver.receive(Consumer.from("my-group", "my-consumer-id"),
247245
StreamOffset.create("my-stream", ReadOffset.lastConsumed()));
248246

249-
// required to initialize stream
250-
redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value"));
251247
redisTemplate.opsForStream().createGroup("my-stream", ReadOffset.from("0-0"), "my-group");
248+
redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value"));
252249

253250
messages.as(StepVerifier::create) //
254251
.expectNextCount(1) //
@@ -257,31 +254,6 @@ public void shouldStopReceivingOnError() {
257254
.verify(Duration.ofSeconds(5));
258255
}
259256

260-
@Test // DATAREDIS-864
261-
public void shouldCreateGroupWithOrWithoutExistingStream() {
262-
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory);
263-
264-
Flux<MapRecord<String, String, String>> messages = receiver.receive(Consumer.from("my-group", "my-consumer-id"),
265-
StreamOffset.create("my-stream", ReadOffset.lastConsumed()));
266-
267-
// required to initialize stream
268-
redisTemplate.opsForStream().createGroup("my-stream", ReadOffset.from("0-0"), "my-group", true);
269-
redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value"));
270-
redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key2", "value2"));
271-
272-
messages.as(StepVerifier::create) //
273-
.consumeNextWith(it -> {
274-
assertThat(it.getStream()).isEqualTo("my-stream");
275-
assertThat(it.getValue()).containsValue("value");
276-
}).consumeNextWith(it -> {
277-
278-
assertThat(it.getStream()).isEqualTo("my-stream");
279-
assertThat(it.getValue()).containsValue("value2");
280-
}) //
281-
.thenCancel() //
282-
.verify(Duration.ofSeconds(5));
283-
}
284-
285257
@Data
286258
@AllArgsConstructor
287259
static class LoginEvent {

src/test/kotlin/org/springframework/data/redis/core/ReactiveStreamOperationsExtensionsUnitTests.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import reactor.core.publisher.Mono
3535
*
3636
* @author Mark Paluch
3737
* @author Sebastien Deleuze
38-
* @author Tugdual Grall
3938
*/
4039
class ReactiveStreamOperationsExtensionsUnitTests {
4140

0 commit comments

Comments
 (0)