Skip to content

Commit 3c0ca2c

Browse files
committed
feat(impv): Add idle argument to xPending
1 parent 06f3591 commit 3c0ca2c

File tree

2 files changed

+109
-5
lines changed

2 files changed

+109
-5
lines changed

src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java

Lines changed: 102 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
* @author Tugdual Grall
4242
* @author Dengliming
4343
* @author Mark John Moreno
44+
* @author Jeonggyu Choi
4445
* @see <a href="https://redis.io/topics/streams-intro">Redis Documentation - Streams</a>
4546
* @since 2.2
4647
*/
@@ -706,6 +707,25 @@ default PendingMessages xPending(byte[] key, String groupName, Range<?> range, L
706707
return xPending(key, groupName, XPendingOptions.range(range, count));
707708
}
708709

710+
/**
711+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a
712+
* {@literal consumer group} and over a given {@link Duration} of idle time.
713+
*
714+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
715+
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
716+
* @param range the range of messages ids to search within. Must not be {@literal null}.
717+
* @param count limit the number of results. Must not be {@literal null}.
718+
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
719+
* @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
720+
* transaction.
721+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
722+
* @since 3.5
723+
*/
724+
@Nullable
725+
default PendingMessages xPending(byte[] key, String groupName, Range<?> range, Long count, Duration idle) {
726+
return xPending(key, groupName, XPendingOptions.range(range, count).idle(idle));
727+
}
728+
709729
/**
710730
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
711731
* {@link Consumer} within a {@literal consumer group}.
@@ -723,6 +743,24 @@ default PendingMessages xPending(byte[] key, Consumer consumer, Range<?> range,
723743
return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
724744
}
725745

746+
/**
747+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
748+
* {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
749+
*
750+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
751+
* @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
752+
* @param range the range of messages ids to search within. Must not be {@literal null}.
753+
* @param count limit the number of results. Must not be {@literal null}.
754+
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
755+
* @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
756+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
757+
* @since 3.5
758+
*/
759+
@Nullable
760+
default PendingMessages xPending(byte[] key, Consumer consumer, Range<?> range, Long count, Duration idle) {
761+
return xPending(key, consumer.getGroup(), consumer.getName(), range, count, idle);
762+
}
763+
726764
/**
727765
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
728766
* {@literal consumer} within a {@literal consumer group}.
@@ -742,6 +780,27 @@ default PendingMessages xPending(byte[] key, String groupName, String consumerNa
742780
return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName));
743781
}
744782

783+
/**
784+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
785+
* {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
786+
*
787+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
788+
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
789+
* @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
790+
* @param range the range of messages ids to search within. Must not be {@literal null}.
791+
* @param count limit the number of results. Must not be {@literal null}.
792+
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
793+
* @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
794+
* when used in pipeline / transaction.
795+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
796+
* @since 3.5
797+
*/
798+
@Nullable
799+
default PendingMessages xPending(byte[] key, String groupName, String consumerName, Range<?> range, Long count,
800+
Duration idle) {
801+
return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName).idle(idle));
802+
}
803+
745804
/**
746805
* Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
747806
* options}.
@@ -761,19 +820,23 @@ default PendingMessages xPending(byte[] key, String groupName, String consumerNa
761820
* Value Object holding parameters for obtaining pending messages.
762821
*
763822
* @author Christoph Strobl
823+
* @author Jeonggyu Choi
764824
* @since 2.3
765825
*/
766826
class XPendingOptions {
767827

768828
private final @Nullable String consumerName;
769829
private final Range<?> range;
770830
private final @Nullable Long count;
831+
private final @Nullable Duration idle;
771832

772-
private XPendingOptions(@Nullable String consumerName, Range<?> range, @Nullable Long count) {
833+
private XPendingOptions(@Nullable String consumerName, Range<?> range, @Nullable Long count,
834+
@Nullable Duration idle) {
773835

774836
this.range = range;
775837
this.count = count;
776838
this.consumerName = consumerName;
839+
this.idle = idle;
777840
}
778841

779842
/**
@@ -782,7 +845,7 @@ private XPendingOptions(@Nullable String consumerName, Range<?> range, @Nullable
782845
* @return new instance of {@link XPendingOptions}.
783846
*/
784847
public static XPendingOptions unbounded() {
785-
return new XPendingOptions(null, Range.unbounded(), null);
848+
return new XPendingOptions(null, Range.unbounded(), null, null);
786849
}
787850

788851
/**
@@ -795,7 +858,7 @@ public static XPendingOptions unbounded(Long count) {
795858

796859
Assert.isTrue(count > -1, "Count must not be negative");
797860

798-
return new XPendingOptions(null, Range.unbounded(), count);
861+
return new XPendingOptions(null, Range.unbounded(), count, null);
799862
}
800863

801864
/**
@@ -810,7 +873,7 @@ public static XPendingOptions range(Range<?> range, Long count) {
810873
Assert.notNull(range, "Range must not be null");
811874
Assert.isTrue(count > -1, "Count must not be negative");
812875

813-
return new XPendingOptions(null, range, count);
876+
return new XPendingOptions(null, range, count, null);
814877
}
815878

816879
/**
@@ -820,7 +883,20 @@ public static XPendingOptions range(Range<?> range, Long count) {
820883
* @return new instance of {@link XPendingOptions}.
821884
*/
822885
public XPendingOptions consumer(String consumerName) {
823-
return new XPendingOptions(consumerName, range, count);
886+
return new XPendingOptions(consumerName, range, count, idle);
887+
}
888+
889+
/**
890+
* Append given idle time.
891+
*
892+
* @param idle must not be {@literal null}.
893+
* @return new instance of {@link} XPendingOptions}.
894+
*/
895+
public XPendingOptions idle(Duration idle) {
896+
897+
Assert.notNull(idle, "Idle must not be null");
898+
899+
return new XPendingOptions(consumerName, range, count, idle);
824900
}
825901

826902
/**
@@ -846,6 +922,20 @@ public String getConsumerName() {
846922
return consumerName;
847923
}
848924

925+
@Nullable
926+
public Duration getIdle() {
927+
return idle;
928+
}
929+
930+
@Nullable
931+
public Long getIdleMillis() {
932+
if (idle == null) {
933+
return null;
934+
}
935+
936+
return idle.toMillis();
937+
}
938+
849939
/**
850940
* @return {@literal true} if a consumer name is present.
851941
*/
@@ -859,6 +949,13 @@ public boolean hasConsumer() {
859949
public boolean isLimited() {
860950
return count != null;
861951
}
952+
953+
/**
954+
* @return {@literal true} if idle time is set.
955+
*/
956+
public boolean hasIdle() {
957+
return idle != null;
958+
}
862959
}
863960

864961
/**

src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,11 @@ void xPendingOptionsRangeShouldThrowExceptionWhenCountIsNegative() {
4646

4747
assertThatIllegalArgumentException().isThrownBy(() -> XPendingOptions.range(range, -1L));
4848
}
49+
50+
@Test // GH-2046
51+
void xPendingOptionsIdleShouldThrowExceptionWhenIdleIsNull() {
52+
XPendingOptions xPendingOptions = XPendingOptions.unbounded();
53+
54+
assertThatIllegalArgumentException().isThrownBy(() -> xPendingOptions.idle(null));
55+
}
4956
}

0 commit comments

Comments
 (0)