Skip to content

Commit 38d3576

Browse files
mp911dechristophstrobl
authored andcommitted
Add reactive support for ZDIFF, ZDIFFSTORE, ZINTER, and ZUNION commands.
See: #2041 & #2042 Original Pull Request: #2097
1 parent 8dc6f01 commit 38d3576

File tree

8 files changed

+1537
-347
lines changed

8 files changed

+1537
-347
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveZSetCommands.java

Lines changed: 739 additions & 199 deletions
Large diffs are not rendered by default.

src/main/java/org/springframework/data/redis/connection/RedisZSetCommands.java

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1174,34 +1174,6 @@ default Long zRemRangeByScore(byte[] key, double min, double max) {
11741174
@Nullable
11751175
Set<byte[]> zInter(byte[]... sets);
11761176

1177-
/**
1178-
* Intersect sorted {@code sets}.
1179-
*
1180-
* @param aggregate must not be {@literal null}.
1181-
* @param weights must not be {@literal null}.
1182-
* @param sets must not be {@literal null}.
1183-
* @return {@literal null} when used in pipeline / transaction.
1184-
* @since 2.6
1185-
* @see <a href="https://redis.io/commands/zinter">Redis Documentation: ZINTER</a>
1186-
*/
1187-
@Nullable
1188-
default Set<byte[]> zInter(Aggregate aggregate, int[] weights, byte[]... sets) {
1189-
return zInter(aggregate, Weights.of(weights), sets);
1190-
}
1191-
1192-
/**
1193-
* Intersect sorted {@code sets}.
1194-
*
1195-
* @param aggregate must not be {@literal null}.
1196-
* @param weights must not be {@literal null}.
1197-
* @param sets must not be {@literal null}.
1198-
* @return {@literal null} when used in pipeline / transaction.
1199-
* @since 2.6
1200-
* @see <a href="https://redis.io/commands/zinter">Redis Documentation: ZINTER</a>
1201-
*/
1202-
@Nullable
1203-
Set<byte[]> zInter(Aggregate aggregate, Weights weights, byte[]... sets);
1204-
12051177
/**
12061178
* Intersect sorted {@code sets}.
12071179
*
@@ -1293,34 +1265,6 @@ default Long zInterStore(byte[] destKey, Aggregate aggregate, int[] weights, byt
12931265
@Nullable
12941266
Set<byte[]> zUnion(byte[]... sets);
12951267

1296-
/**
1297-
* Union sorted {@code sets}.
1298-
*
1299-
* @param aggregate must not be {@literal null}.
1300-
* @param weights must not be {@literal null}.
1301-
* @param sets must not be {@literal null}.
1302-
* @return {@literal null} when used in pipeline / transaction.
1303-
* @since 2.6
1304-
* @see <a href="https://redis.io/commands/zunion">Redis Documentation: ZUNION</a>
1305-
*/
1306-
@Nullable
1307-
default Set<byte[]> zUnion(Aggregate aggregate, int[] weights, byte[]... sets) {
1308-
return zUnion(aggregate, Weights.of(weights), sets);
1309-
}
1310-
1311-
/**
1312-
* Union sorted {@code sets}.
1313-
*
1314-
* @param aggregate must not be {@literal null}.
1315-
* @param weights must not be {@literal null}.
1316-
* @param sets must not be {@literal null}.
1317-
* @return {@literal null} when used in pipeline / transaction.
1318-
* @since 2.6
1319-
* @see <a href="https://redis.io/commands/zunion">Redis Documentation: ZUNION</a>
1320-
*/
1321-
@Nullable
1322-
Set<byte[]> zUnion(Aggregate aggregate, Weights weights, byte[]... sets);
1323-
13241268
/**
13251269
* Union sorted {@code sets}.
13261270
*

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterZSetCommands.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,13 @@ class LettuceReactiveClusterZSetCommands extends LettuceReactiveZSetCommands imp
4141
super(connection);
4242
}
4343

44-
/* (non-Javadoc)
44+
/*
45+
* (non-Javadoc)
4546
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveZSetCommands#zUnionStore(org.reactivestreams.Publisher)
4647
*/
4748
@Override
48-
public Flux<NumericResponse<ZUnionStoreCommand, Long>> zUnionStore(Publisher<ZUnionStoreCommand> commands) {
49+
public Flux<NumericResponse<ZAggregateStoreCommand, Long>> zUnionStore(
50+
Publisher<? extends ZAggregateStoreCommand> commands) {
4951

5052
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
5153

@@ -60,11 +62,13 @@ public Flux<NumericResponse<ZUnionStoreCommand, Long>> zUnionStore(Publisher<ZUn
6062
}));
6163
}
6264

63-
/* (non-Javadoc)
65+
/*
66+
* (non-Javadoc)
6467
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveZSetCommands#zInterStore(org.reactivestreams.Publisher)
6568
*/
6669
@Override
67-
public Flux<NumericResponse<ZInterStoreCommand, Long>> zInterStore(Publisher<ZInterStoreCommand> commands) {
70+
public Flux<NumericResponse<ZAggregateStoreCommand, Long>> zInterStore(
71+
Publisher<? extends ZAggregateStoreCommand> commands) {
6872
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
6973

7074
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty.");

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommands.java

Lines changed: 167 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -200,22 +200,18 @@ public Flux<CommandResponse<ZRangeCommand, Flux<Tuple>>> zRange(Publisher<ZRange
200200
if (ObjectUtils.nullSafeEquals(command.getDirection(), Direction.ASC)) {
201201
if (command.isWithScores()) {
202202

203-
result = cmd.zrangeWithScores(command.getKey(), start, stop)
204-
.map(this::toTuple);
203+
result = cmd.zrangeWithScores(command.getKey(), start, stop).map(this::toTuple);
205204
} else {
206205

207-
result = cmd.zrange(command.getKey(), start, stop)
208-
.map(value -> toTuple(value, Double.NaN));
206+
result = cmd.zrange(command.getKey(), start, stop).map(value -> toTuple(value, Double.NaN));
209207
}
210208
} else {
211209
if (command.isWithScores()) {
212210

213-
result = cmd.zrevrangeWithScores(command.getKey(), start, stop)
214-
.map(this::toTuple);
211+
result = cmd.zrevrangeWithScores(command.getKey(), start, stop).map(this::toTuple);
215212
} else {
216213

217-
result = cmd.zrevrange(command.getKey(), start, stop)
218-
.map(value -> toTuple(value, Double.NaN));
214+
result = cmd.zrevrange(command.getKey(), start, stop).map(value -> toTuple(value, Double.NaN));
219215
}
220216
}
221217

@@ -247,8 +243,7 @@ public Flux<CommandResponse<ZRangeByScoreCommand, Flux<Tuple>>> zRangeByScore(
247243
if (command.isWithScores()) {
248244

249245
if (!isLimited) {
250-
result = cmd.zrangebyscoreWithScores(command.getKey(), range)
251-
.map(this::toTuple);
246+
result = cmd.zrangebyscoreWithScores(command.getKey(), range).map(this::toTuple);
252247
} else {
253248
result = cmd
254249
.zrangebyscoreWithScores(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get()))
@@ -257,8 +252,7 @@ public Flux<CommandResponse<ZRangeByScoreCommand, Flux<Tuple>>> zRangeByScore(
257252
} else {
258253

259254
if (!isLimited) {
260-
result = cmd.zrangebyscore(command.getKey(), range)
261-
.map(value -> toTuple(value, Double.NaN));
255+
result = cmd.zrangebyscore(command.getKey(), range).map(value -> toTuple(value, Double.NaN));
262256
} else {
263257

264258
result = cmd.zrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get()))
@@ -272,20 +266,16 @@ public Flux<CommandResponse<ZRangeByScoreCommand, Flux<Tuple>>> zRangeByScore(
272266
if (command.isWithScores()) {
273267

274268
if (!isLimited) {
275-
result = cmd.zrevrangebyscoreWithScores(command.getKey(), range)
276-
.map(this::toTuple);
269+
result = cmd.zrevrangebyscoreWithScores(command.getKey(), range).map(this::toTuple);
277270
} else {
278271

279-
result = cmd
280-
.zrevrangebyscoreWithScores(command.getKey(), range,
281-
LettuceConverters.toLimit(command.getLimit().get()))
282-
.map(this::toTuple);
272+
result = cmd.zrevrangebyscoreWithScores(command.getKey(), range,
273+
LettuceConverters.toLimit(command.getLimit().get())).map(this::toTuple);
283274
}
284275
} else {
285276

286277
if (!isLimited) {
287-
result = cmd.zrevrangebyscore(command.getKey(), range)
288-
.map(value -> toTuple(value, Double.NaN));
278+
result = cmd.zrevrangebyscore(command.getKey(), range).map(value -> toTuple(value, Double.NaN));
289279
} else {
290280

291281
result = cmd.zrevrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get()))
@@ -520,26 +510,99 @@ public Flux<NumericResponse<ZRemRangeByLexCommand, Long>> zRemRangeByLex(Publish
520510

521511
/*
522512
* (non-Javadoc)
523-
* @see org.springframework.data.redis.connection.ReactiveZSetCommands#zUnionStore(org.reactivestreams.Publisher)
513+
* @see org.springframework.data.redis.connection.ReactiveZSetCommands#zDiff(Publisher)
514+
*/
515+
@Override
516+
public Flux<CommandResponse<ZDiffCommand, Flux<ByteBuffer>>> zDiff(Publisher<? extends ZDiffCommand> commands) {
517+
518+
return connection.execute(cmd -> Flux.from(commands).map(command -> {
519+
520+
Assert.notEmpty(command.getKeys(), "Keys must not be null or empty!");
521+
522+
ByteBuffer[] sourceKeys = command.getKeys().toArray(new ByteBuffer[0]);
523+
return new CommandResponse<>(command, cmd.zdiff(sourceKeys));
524+
}));
525+
}
526+
527+
/*
528+
* (non-Javadoc)
529+
* @see org.springframework.data.redis.connection.ReactiveZSetCommands#zDiffWithScores(Publisher)
530+
*/
531+
@Override
532+
public Flux<CommandResponse<ZDiffCommand, Flux<Tuple>>> zDiffWithScores(Publisher<? extends ZDiffCommand> commands) {
533+
534+
return connection.execute(cmd -> Flux.from(commands).map(command -> {
535+
536+
Assert.notEmpty(command.getKeys(), "Keys must not be null or empty!");
537+
538+
ByteBuffer[] sourceKeys = command.getKeys().toArray(new ByteBuffer[0]);
539+
return new CommandResponse<>(command, cmd.zdiffWithScores(sourceKeys).map(this::toTuple));
540+
}));
541+
}
542+
543+
/*
544+
* (non-Javadoc)
545+
* @see org.springframework.data.redis.connection.ReactiveZSetCommands#zDiffStore(Publisher)
524546
*/
525547
@Override
526-
public Flux<NumericResponse<ZUnionStoreCommand, Long>> zUnionStore(Publisher<ZUnionStoreCommand> commands) {
548+
public Flux<NumericResponse<ZDiffStoreCommand, Long>> zDiffStore(Publisher<ZDiffStoreCommand> commands) {
527549

528550
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
529551

530552
Assert.notNull(command.getKey(), "Destination key must not be null!");
531553
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty!");
532554

555+
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]);
556+
return cmd.zdiffstore(command.getKey(), sourceKeys).map(value -> new NumericResponse<>(command, value));
557+
}));
558+
}
559+
560+
/*
561+
* (non-Javadoc)
562+
* @see org.springframework.data.redis.connection.ReactiveZSetCommands#zInter(Publisher)
563+
*/
564+
@Override
565+
public Flux<CommandResponse<ZAggregateCommand, Flux<ByteBuffer>>> zInter(
566+
Publisher<? extends ZAggregateCommand> commands) {
567+
568+
return connection.execute(cmd -> Flux.from(commands).map(command -> {
569+
570+
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty!");
571+
533572
ZStoreArgs args = null;
534573
if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) {
535574
args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null,
536575
command.getWeights());
537576
}
538577

539-
ByteBuffer[] sourceKeys = command.getSourceKeys().stream().toArray(ByteBuffer[]::new);
540-
Mono<Long> result = args != null ? cmd.zunionstore(command.getKey(), args, sourceKeys)
541-
: cmd.zunionstore(command.getKey(), sourceKeys);
542-
return result.map(value -> new NumericResponse<>(command, value));
578+
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]);
579+
Flux<ByteBuffer> result = args != null ? cmd.zinter(args, sourceKeys) : cmd.zinter(sourceKeys);
580+
return new CommandResponse<>(command, result);
581+
}));
582+
}
583+
584+
/*
585+
* (non-Javadoc)
586+
* @see org.springframework.data.redis.connection.ReactiveZSetCommands#zInterWithScores(Publisher)
587+
*/
588+
@Override
589+
public Flux<CommandResponse<ZAggregateCommand, Flux<Tuple>>> zInterWithScores(
590+
Publisher<? extends ZAggregateCommand> commands) {
591+
592+
return connection.execute(cmd -> Flux.from(commands).map(command -> {
593+
594+
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty!");
595+
596+
ZStoreArgs args = null;
597+
if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) {
598+
args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null,
599+
command.getWeights());
600+
}
601+
602+
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]);
603+
Flux<ScoredValue<ByteBuffer>> result = args != null ? cmd.zinterWithScores(args, sourceKeys)
604+
: cmd.zinterWithScores(sourceKeys);
605+
return new CommandResponse<>(command, result.map(this::toTuple));
543606
}));
544607
}
545608

@@ -548,7 +611,8 @@ public Flux<NumericResponse<ZUnionStoreCommand, Long>> zUnionStore(Publisher<ZUn
548611
* @see org.springframework.data.redis.connection.ReactiveZSetCommands#zInterStore(org.reactivestreams.Publisher)
549612
*/
550613
@Override
551-
public Flux<NumericResponse<ZInterStoreCommand, Long>> zInterStore(Publisher<ZInterStoreCommand> commands) {
614+
public Flux<NumericResponse<ZAggregateStoreCommand, Long>> zInterStore(
615+
Publisher<? extends ZAggregateStoreCommand> commands) {
552616

553617
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
554618

@@ -561,13 +625,88 @@ public Flux<NumericResponse<ZInterStoreCommand, Long>> zInterStore(Publisher<ZIn
561625
command.getWeights());
562626
}
563627

564-
ByteBuffer[] sourceKeys = command.getSourceKeys().stream().toArray(ByteBuffer[]::new);
628+
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]);
565629
Mono<Long> result = args != null ? cmd.zinterstore(command.getKey(), args, sourceKeys)
566630
: cmd.zinterstore(command.getKey(), sourceKeys);
567631
return result.map(value -> new NumericResponse<>(command, value));
568632
}));
569633
}
570634

635+
/*
636+
* (non-Javadoc)
637+
* @see org.springframework.data.redis.connection.ReactiveZSetCommands#zUnion(org.reactivestreams.Publisher)
638+
*/
639+
@Override
640+
public Flux<CommandResponse<ZAggregateCommand, Flux<ByteBuffer>>> zUnion(
641+
Publisher<? extends ZAggregateCommand> commands) {
642+
643+
return connection.execute(cmd -> Flux.from(commands).map(command -> {
644+
645+
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty!");
646+
647+
ZStoreArgs args = null;
648+
if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) {
649+
args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null,
650+
command.getWeights());
651+
}
652+
653+
ByteBuffer[] sourceKeys = command.getSourceKeys().stream().toArray(ByteBuffer[]::new);
654+
Flux<ByteBuffer> result = args != null ? cmd.zunion(args, sourceKeys) : cmd.zunion(sourceKeys);
655+
return new CommandResponse<>(command, result);
656+
}));
657+
}
658+
659+
/*
660+
* (non-Javadoc)
661+
* @see org.springframework.data.redis.connection.ReactiveZSetCommands#zUnion(org.reactivestreams.Publisher)
662+
*/
663+
@Override
664+
public Flux<CommandResponse<ZAggregateCommand, Flux<Tuple>>> zUnionWithScores(
665+
Publisher<? extends ZAggregateCommand> commands) {
666+
667+
return connection.execute(cmd -> Flux.from(commands).map(command -> {
668+
669+
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty!");
670+
671+
ZStoreArgs args = null;
672+
if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) {
673+
args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null,
674+
command.getWeights());
675+
}
676+
677+
ByteBuffer[] sourceKeys = command.getSourceKeys().stream().toArray(ByteBuffer[]::new);
678+
Flux<ScoredValue<ByteBuffer>> result = args != null ? cmd.zunionWithScores(args, sourceKeys)
679+
: cmd.zunionWithScores(sourceKeys);
680+
return new CommandResponse<>(command, result.map(this::toTuple));
681+
}));
682+
}
683+
684+
/*
685+
* (non-Javadoc)
686+
* @see org.springframework.data.redis.connection.ReactiveZSetCommands#zUnionStore(org.reactivestreams.Publisher)
687+
*/
688+
@Override
689+
public Flux<NumericResponse<ZAggregateStoreCommand, Long>> zUnionStore(
690+
Publisher<? extends ZAggregateStoreCommand> commands) {
691+
692+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
693+
694+
Assert.notNull(command.getKey(), "Destination key must not be null!");
695+
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty!");
696+
697+
ZStoreArgs args = null;
698+
if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) {
699+
args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null,
700+
command.getWeights());
701+
}
702+
703+
ByteBuffer[] sourceKeys = command.getSourceKeys().stream().toArray(ByteBuffer[]::new);
704+
Mono<Long> result = args != null ? cmd.zunionstore(command.getKey(), args, sourceKeys)
705+
: cmd.zunionstore(command.getKey(), sourceKeys);
706+
return result.map(value -> new NumericResponse<>(command, value));
707+
}));
708+
}
709+
571710
/*
572711
* (non-Javadoc)
573712
* @see org.springframework.data.redis.connection.ReactiveZSetCommands#zRangeByLex(org.reactivestreams.Publisher)

0 commit comments

Comments
 (0)