Skip to content

Commit d7b8d9b

Browse files
tgrallmp911de
authored andcommitted
DATAREDIS-1042 - Allow consumer group creation without an existing stream.
Original pull request: #527.
1 parent c4623ef commit d7b8d9b

17 files changed

+260
-13
lines changed

src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
* @author Thomas Darimont
6868
* @author Mark Paluch
6969
* @author Ninad Divadkar
70+
* @author Tugdual Grall
7071
*/
7172
public class DefaultStringRedisConnection implements StringRedisConnection, DecoratedRedisConnection {
7273

@@ -3691,6 +3692,15 @@ public String xGroupCreate(String key, ReadOffset readOffset, String group) {
36913692
return convertAndReturn(delegate.xGroupCreate(serialize(key), group, readOffset), identityConverter);
36923693
}
36933694

3695+
/*
3696+
* (non-Javadoc)
3697+
* @see org.springframework.data.redis.connection.StringRedisConnection#xGroupCreate(java.lang.String, org.springframework.data.redis.connection.RedisStreamCommands.ReadOffset, java.lang.String, boolean)
3698+
*/
3699+
@Override
3700+
public String xGroupCreate(String key, ReadOffset readOffset, String group, boolean mkStream) {
3701+
return convertAndReturn(delegate.xGroupCreate(serialize(key), group, readOffset, mkStream), identityConverter);
3702+
}
3703+
36943704
/*
36953705
* (non-Javadoc)
36963706
* @see org.springframework.data.redis.connection.StringRedisConnection#xGroupDelConsumer(java.lang.String, org.springframework.data.redis.connection.RedisStreamCommands.Consumer)
@@ -3888,6 +3898,15 @@ public String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset)
38883898
return delegate.xGroupCreate(key, groupName, readOffset);
38893899
}
38903900

3901+
/*
3902+
* (non-Javadoc)
3903+
* @see org.springframework.data.redis.connection.RedisStreamCommands#xGroupCreate(byte[], org.springframework.data.redis.connection.RedisStreamCommands.ReadOffset, java.lang.String, boolean)
3904+
*/
3905+
@Override
3906+
public String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset, boolean mkStream) {
3907+
return delegate.xGroupCreate(key, groupName, readOffset, mkStream);
3908+
}
3909+
38913910
/*
38923911
* (non-Javadoc)
38933912
* @see org.springframework.data.redis.connection.RedisStreamCommands#xGroupDelConsumer(byte[], org.springframework.data.redis.connection.RedisStreamCommands.Consumer)

src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
*
5555
* @author Christoph Strobl
5656
* @author Mark Paluch
57+
* @author Tugdual Grall
5758
* @since 2.0
5859
*/
5960
public interface DefaultedRedisConnection extends RedisConnection {
@@ -484,6 +485,13 @@ default String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset)
484485
return streamCommands().xGroupCreate(key, groupName, readOffset);
485486
}
486487

488+
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
489+
@Override
490+
@Deprecated
491+
default String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset, boolean mkStream) {
492+
return streamCommands().xGroupCreate(key, groupName, readOffset, mkStream);
493+
}
494+
487495
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
488496
@Override
489497
@Deprecated

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
*
5757
* @author Mark Paluch
5858
* @author Christoph Strobl
59+
* @author Tugdual Grall
5960
* @since 2.2
6061
*/
6162
public interface ReactiveStreamCommands {
@@ -1134,15 +1135,22 @@ class GroupCommand extends KeyCommand {
11341135
private final @Nullable String groupName;
11351136
private final @Nullable String consumerName;
11361137
private final @Nullable ReadOffset offset;
1138+
private final boolean mkStream;
11371139

11381140
public GroupCommand(@Nullable ByteBuffer key, GroupCommandAction action, @Nullable String groupName,
1139-
@Nullable String consumerName, @Nullable ReadOffset offset) {
1141+
@Nullable String consumerName, @Nullable ReadOffset offset, boolean mkStream) {
11401142

11411143
super(key);
11421144
this.action = action;
11431145
this.groupName = groupName;
11441146
this.consumerName = consumerName;
11451147
this.offset = offset;
1148+
this.mkStream = mkStream;
1149+
}
1150+
1151+
public GroupCommand(@Nullable ByteBuffer key, GroupCommandAction action, @Nullable String groupName,
1152+
@Nullable String consumerName, @Nullable ReadOffset offset) {
1153+
this(key, action, groupName, consumerName, offset, false);
11461154
}
11471155

11481156
public static GroupCommand createGroup(String group) {
@@ -1161,6 +1169,10 @@ public static GroupCommand deleteConsumer(Consumer consumer) {
11611169
return new GroupCommand(null, GroupCommandAction.DELETE_CONSUMER, consumer.getGroup(), consumer.getName(), null);
11621170
}
11631171

1172+
public GroupCommand makeStream(boolean mkStream) {
1173+
return new GroupCommand(getKey(), action, groupName, consumerName, offset,mkStream);
1174+
}
1175+
11641176
public GroupCommand at(ReadOffset offset) {
11651177
return new GroupCommand(getKey(), action, groupName, consumerName, offset);
11661178
}
@@ -1173,6 +1185,10 @@ public GroupCommand fromGroup(String groupName) {
11731185
return new GroupCommand(getKey(), action, groupName, consumerName, offset);
11741186
}
11751187

1188+
public boolean getMkStream() {
1189+
return this.mkStream;
1190+
}
1191+
11761192
@Nullable
11771193
public ReadOffset getReadOffset() {
11781194
return this.offset;
@@ -1211,6 +1227,20 @@ default Mono<String> xGroupCreate(ByteBuffer key, String groupName, ReadOffset r
12111227
.map(CommandResponse::getOutput);
12121228
}
12131229

1230+
/**
1231+
* Create a consumer group.
1232+
*
1233+
* @param key key the {@literal key} the stream is stored at.
1234+
* @param groupName name of the consumer group to create.
1235+
* @param readOffset the offset to start at.
1236+
* @param mkStream if true the group will create the stream if needed (MKSTREAM)
1237+
* @return the {@link Mono} emitting {@literal ok} if successful.
1238+
*/
1239+
default Mono<String> xGroupCreate(ByteBuffer key, String groupName, ReadOffset readOffset, boolean mkStream) {
1240+
return xGroup(Mono.just(GroupCommand.createGroup(groupName).forStream(key).at(readOffset).makeStream(mkStream))).next()
1241+
.map(CommandResponse::getOutput);
1242+
}
1243+
12141244
/**
12151245
* Delete a consumer from a consumer group.
12161246
*

src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
*
4040
* @author Mark Paluch
4141
* @author Christoph Strobl
42+
* @author Tugdual Grall
4243
* @see <a href="https://redis.io/topics/streams-intro">Redis Documentation - Streams</a>
4344
* @since 2.2
4445
*/
@@ -464,6 +465,18 @@ default Long xDel(byte[] key, String... recordIds) {
464465
@Nullable
465466
String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset);
466467

468+
/**
469+
* Create a consumer group.
470+
*
471+
* @param key the {@literal key} the stream is stored at.
472+
* @param groupName name of the consumer group to create.
473+
* @param readOffset the offset to start at.
474+
* @param mkStream if true the group will create the stream if not already present (MKSTREAM)
475+
* @return {@literal ok} if successful. {@literal null} when used in pipeline / transaction.
476+
*/
477+
@Nullable
478+
String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset, boolean mkStream);
479+
467480
/**
468481
* Delete a consumer from a consumer group.
469482
*

src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
* @author David Liu
6161
* @author Mark Paluch
6262
* @author Ninad Divadkar
63+
* @author Tugdual Grall
6364
* @see RedisCallback
6465
* @see RedisSerializer
6566
* @see StringRedisTemplate
@@ -2109,6 +2110,19 @@ default Long xDel(String key, String... entryIds) {
21092110
@Nullable
21102111
String xGroupCreate(String key, ReadOffset readOffset, String group);
21112112

2113+
/**
2114+
* Create a consumer group.
2115+
*
2116+
* @param key
2117+
* @param readOffset
2118+
* @param group name of the consumer group.
2119+
* @param mkStream if true the group will create the stream if needed (MKSTREAM)
2120+
* @since
2121+
* @return {@literal true} if successful. {@literal null} when used in pipeline / transaction.
2122+
*/
2123+
@Nullable
2124+
String xGroupCreate(String key, ReadOffset readOffset, String group, boolean mkStream);
2125+
21122126
/**
21132127
* Delete a consumer from a consumer group.
21142128
*

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.lettuce.core.XAddArgs;
1919
import io.lettuce.core.XClaimArgs;
20+
import io.lettuce.core.XGroupCreateArgs;
2021
import io.lettuce.core.XReadArgs;
2122
import io.lettuce.core.XReadArgs.StreamOffset;
2223
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
@@ -51,6 +52,7 @@
5152
* {@link ReactiveStreamCommands} implementation for {@literal Lettuce}.
5253
*
5354
* @author Mark Paluch
55+
* @author Tugdual Grall
5456
* @since 2.2
5557
*/
5658
class LettuceReactiveStreamCommands implements ReactiveStreamCommands {
@@ -188,8 +190,12 @@ public Flux<CommandResponse<GroupCommand, String>> xGroup(Publisher<GroupCommand
188190

189191
StreamOffset offset = StreamOffset.from(command.getKey(), command.getReadOffset().getOffset());
190192

191-
return cmd.xgroupCreate(offset, ByteUtils.getByteBuffer(command.getGroupName()))
192-
.map(it -> new CommandResponse<>(command, it));
193+
return cmd.xgroupCreate(offset,
194+
ByteUtils.getByteBuffer(command.getGroupName()),
195+
XGroupCreateArgs.Builder.mkstream( command.getMkStream()))
196+
.map(it ->
197+
new CommandResponse<>(command, it)
198+
);
193199
}
194200

195201
if (command.getAction().equals(GroupCommandAction.DELETE_CONSUMER)) {

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.lettuce.core.XAddArgs;
1919
import io.lettuce.core.XClaimArgs;
20+
import io.lettuce.core.XGroupCreateArgs;
2021
import io.lettuce.core.XReadArgs;
2122
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
2223
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
@@ -47,6 +48,7 @@
4748

4849
/**
4950
* @author Mark Paluch
51+
* @author Tugdual Grall
5052
* @since 2.2
5153
*/
5254
@RequiredArgsConstructor
@@ -216,6 +218,15 @@ public Long xDel(byte[] key, RecordId... recordIds) {
216218
*/
217219
@Override
218220
public String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset) {
221+
return xGroupCreate(key, groupName, readOffset, false);
222+
}
223+
224+
/*
225+
* (non-Javadoc)
226+
* @see org.springframework.data.redis.connection.RedisStreamCommands#xGroupCreate(byte[], org.springframework.data.redis.connection.RedisStreamCommands.ReadOffset, java.lang.String, boolean)
227+
*/
228+
@Override
229+
public String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset, boolean mkSteam) {
219230

220231
Assert.notNull(key, "Key must not be null!");
221232
Assert.hasText(groupName, "Group name must not be null or empty!");
@@ -226,15 +237,17 @@ public String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset)
226237

227238
if (isPipelined()) {
228239
pipeline(connection
229-
.newLettuceResult(getAsyncConnection().xgroupCreate(streamOffset, LettuceConverters.toBytes(groupName))));
240+
.newLettuceResult(getAsyncConnection().xgroupCreate(streamOffset, LettuceConverters.toBytes(groupName),
241+
XGroupCreateArgs.Builder.mkstream(mkSteam))));
230242
return null;
231243
}
232244
if (isQueueing()) {
233245
transaction(connection
234-
.newLettuceResult(getAsyncConnection().xgroupCreate(streamOffset, LettuceConverters.toBytes(groupName))));
246+
.newLettuceResult(getAsyncConnection().xgroupCreate(streamOffset, LettuceConverters.toBytes(groupName),
247+
XGroupCreateArgs.Builder.mkstream(mkSteam))));
235248
return null;
236249
}
237-
return getConnection().xgroupCreate(streamOffset, LettuceConverters.toBytes(groupName));
250+
return getConnection().xgroupCreate(streamOffset, LettuceConverters.toBytes(groupName), XGroupCreateArgs.Builder.mkstream(mkSteam));
238251
} catch (Exception ex) {
239252
throw convertLettuceAccessException(ex);
240253
}

src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
*
5656
* @author Mark Paluch
5757
* @author Christoph Strobl
58+
* @author Tugdual Grall
5859
* @since 2.2
5960
*/
6061
class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperations<K, HK, HV> {
@@ -162,12 +163,17 @@ public Mono<Long> delete(K key, RecordId... recordIds) {
162163

163164
@Override
164165
public Mono<String> createGroup(K key, ReadOffset readOffset, String group) {
166+
return createMono(connection -> connection.xGroupCreate(rawKey(key), group, readOffset, false));
167+
}
168+
169+
@Override
170+
public Mono<String> createGroup(K key, ReadOffset readOffset, String group, boolean mkStream) {
165171

166172
Assert.notNull(key, "Key must not be null!");
167173
Assert.notNull(readOffset, "ReadOffset must not be null!");
168174
Assert.notNull(group, "Group must not be null!");
169175

170-
return createMono(connection -> connection.xGroupCreate(rawKey(key), group, readOffset));
176+
return createMono(connection -> connection.xGroupCreate(rawKey(key), group, readOffset, mkStream));
171177
}
172178

173179
@Override

src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
*
5151
* @author Mark Paluch
5252
* @author Christoph Strobl
53+
* @autor Tugdual Grall
5354
* @since 2.2
5455
*/
5556
class DefaultStreamOperations<K, HK, HV> extends AbstractOperations<K, Object> implements StreamOperations<K, HK, HV> {
@@ -162,6 +163,20 @@ public String createGroup(K key, ReadOffset readOffset, String group) {
162163
return execute(connection -> connection.xGroupCreate(rawKey, group, readOffset), true);
163164
}
164165

166+
/*
167+
* (non-Javadoc)
168+
* @see org.springframework.data.redis.core.StreamOperations#createGroup(java.lang.Object, org.springframework.data.redis.connection.RedisStreamCommands.ReadOffset, java.lang.String, boolean)
169+
*/
170+
@Override
171+
public String createGroup(K key, ReadOffset readOffset, String group, boolean mkStream) {
172+
byte[] rawKey = rawKey(key);
173+
if (!mkStream) {
174+
return createGroup(key, readOffset, group);
175+
} else {
176+
return execute(connection -> connection.xGroupCreate(rawKey, group, readOffset, true), true);
177+
}
178+
}
179+
165180
/*
166181
* (non-Javadoc)
167182
* @see org.springframework.data.redis.core.StreamOperations#deleteConsumer(java.lang.Object, org.springframework.data.redis.connection.RedisStreamCommands.Consumer)

src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
*
3838
* @author Mark Paluch
3939
* @author Christoph Strobl
40+
* @author Tugdual Grall
4041
* @since 2.2
4142
*/
4243
public interface ReactiveStreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
@@ -169,6 +170,19 @@ default Mono<String> createGroup(K key, String group) {
169170
return createGroup(key, ReadOffset.latest(), group);
170171
}
171172

173+
/**
174+
* Create a consumer group at the {@link ReadOffset#latest() latest offset}.
175+
*
176+
* @param key the {@literal key} the stream is stored at.
177+
* @param group name of the consumer group.
178+
* @param mkStream if true the group will create the stream if not already present (MKSTREAM)
179+
* @return the {@link Mono} emitting {@literal OK} if successful.. {@literal null} when used in pipeline /
180+
* transaction.
181+
*/
182+
default Mono<String> createGroup(K key, String group, boolean mkStream) {
183+
return createGroup(key, ReadOffset.latest(), group, mkStream);
184+
}
185+
172186
/**
173187
* Create a consumer group.
174188
*
@@ -179,6 +193,16 @@ default Mono<String> createGroup(K key, String group) {
179193
*/
180194
Mono<String> createGroup(K key, ReadOffset readOffset, String group);
181195

196+
/**
197+
* Create a consumer group.
198+
*
199+
* @param key the {@literal key} the stream is stored at.
200+
* @param readOffset the {@link ReadOffset} to apply.
201+
* @param group name of the consumer group.
202+
* @param mkStream if true the group will create the stream if needed (MKSTREAM)
203+
* @return the {@link Mono} emitting {@literal OK} if successful.
204+
*/
205+
Mono<String> createGroup(K key, ReadOffset readOffset, String group, boolean mkStream);
182206
/**
183207
* Delete a consumer from a consumer group.
184208
*

0 commit comments

Comments
 (0)