Skip to content

Commit 81af20a

Browse files
authored
Improve Delayer DSL (#8645)
* Improve Delayer DSL Move `groupId` option from a `delay()` method arg to the `DelayerEndpointSpec` to make it cleaner from code reading perspective * Expose new DSL method based on just a `DelayerEndpointSpec` for Kotlin &v Groovy * Deprecate multi-arg `delay()` methods in favor of `Consumer<DelayerEndpointSpec>`-based * * Fix language and code style
1 parent c4ee551 commit 81af20a

File tree

11 files changed

+128
-21
lines changed

11 files changed

+128
-21
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1178,12 +1178,18 @@ public B bridge(@Nullable Consumer<GenericEndpointSpec<BridgeHandler>> endpointC
11781178
/**
11791179
* Populate a {@link DelayHandler} to the current integration flow position
11801180
* with default options.
1181+
* Shortcut for:
1182+
* <pre class="code">
1183+
* {@code
1184+
* .delay(delayer -> delayer.messageGroupId(groupId))
1185+
* }
1186+
* </pre>
11811187
* @param groupId the {@code groupId} for delayed messages in the
11821188
* {@link org.springframework.integration.store.MessageGroupStore}.
11831189
* @return the current {@link BaseIntegrationFlowDefinition}.
11841190
*/
11851191
public B delay(String groupId) {
1186-
return delay(groupId, null);
1192+
return delay(delayer -> delayer.messageGroupId(groupId));
11871193
}
11881194

11891195
/**
@@ -1192,12 +1198,25 @@ public B delay(String groupId) {
11921198
* {@link org.springframework.integration.store.MessageGroupStore}.
11931199
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
11941200
* @return the current {@link BaseIntegrationFlowDefinition}.
1201+
* @deprecated since 6.2 in favor of {@link #delay(Consumer)}
11951202
* @see DelayerEndpointSpec
11961203
*/
1204+
@Deprecated(since = "6.2", forRemoval = true)
11971205
public B delay(String groupId, @Nullable Consumer<DelayerEndpointSpec> endpointConfigurer) {
11981206
return register(new DelayerEndpointSpec(new DelayHandler(groupId)), endpointConfigurer);
11991207
}
12001208

1209+
/**
1210+
* Populate a {@link DelayHandler} to the current integration flow position.
1211+
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
1212+
* @return the current {@link BaseIntegrationFlowDefinition}.
1213+
* @since 6.2
1214+
* @see DelayerEndpointSpec
1215+
*/
1216+
public B delay(Consumer<DelayerEndpointSpec> endpointConfigurer) {
1217+
return register(new DelayerEndpointSpec(), endpointConfigurer);
1218+
}
1219+
12011220
/**
12021221
* Populate a {@link org.springframework.integration.transformer.ContentEnricher}
12031222
* to the current integration flow position

spring-integration-core/src/main/java/org/springframework/integration/dsl/DelayerEndpointSpec.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -46,6 +46,10 @@ public class DelayerEndpointSpec extends ConsumerEndpointSpec<DelayerEndpointSpe
4646

4747
private final List<Advice> delayedAdvice = new LinkedList<>();
4848

49+
protected DelayerEndpointSpec() {
50+
this(new DelayHandler());
51+
}
52+
4953
protected DelayerEndpointSpec(DelayHandler delayHandler) {
5054
super(delayHandler);
5155
Assert.notNull(delayHandler, "'delayHandler' must not be null.");
@@ -225,4 +229,16 @@ public <P> DelayerEndpointSpec delayFunction(Function<Message<P>, Object> delayF
225229
return this;
226230
}
227231

232+
/**
233+
* Set a group id to manage delayed messages by this handler.
234+
* @param messageGroupId the group id for delayed messages.
235+
* @return the endpoint spec.
236+
* @since 6.2
237+
* @see DelayHandler#setMessageGroupId(String)
238+
*/
239+
public DelayerEndpointSpec messageGroupId(String messageGroupId) {
240+
this.handler.setMessageGroupId(messageGroupId);
241+
return this;
242+
}
243+
228244
}

spring-integration-core/src/main/java/org/springframework/integration/handler/DelayHandler.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,8 @@
2828
import java.util.concurrent.ConcurrentMap;
2929
import java.util.concurrent.atomic.AtomicBoolean;
3030
import java.util.concurrent.atomic.AtomicInteger;
31+
import java.util.concurrent.locks.Lock;
32+
import java.util.concurrent.locks.ReentrantLock;
3133
import java.util.stream.Stream;
3234

3335
import org.aopalliance.aop.Advice;
@@ -98,10 +100,12 @@ public class DelayHandler extends AbstractReplyProducingMessageHandler implement
98100

99101
public static final long DEFAULT_RETRY_DELAY = 1_000;
100102

101-
private final String messageGroupId;
102-
103103
private final ConcurrentMap<String, AtomicInteger> deliveries = new ConcurrentHashMap<>();
104104

105+
private final Lock removeReleasedMessageLock = new ReentrantLock();
106+
107+
private String messageGroupId;
108+
105109
private long defaultDelay;
106110

107111
private Expression delayExpression;
@@ -126,6 +130,14 @@ public class DelayHandler extends AbstractReplyProducingMessageHandler implement
126130

127131
private long retryDelay = DEFAULT_RETRY_DELAY;
128132

133+
/**
134+
* Construct an instance with default options.
135+
* The {@link #messageGroupId}must then be provided via the setter.
136+
* @since 6.2
137+
*/
138+
public DelayHandler() {
139+
}
140+
129141
/**
130142
* Create a DelayHandler with the given 'messageGroupId' that is used as 'key' for
131143
* {@link MessageGroup} to store delayed Messages in the {@link MessageGroupStore}.
@@ -151,6 +163,15 @@ public DelayHandler(String messageGroupId, TaskScheduler taskScheduler) {
151163
setTaskScheduler(taskScheduler);
152164
}
153165

166+
/**
167+
* Set a group id to manage delayed messages by this handler.
168+
* @param messageGroupId the group id for delayed messages.
169+
* @since 6.2
170+
*/
171+
public void setMessageGroupId(String messageGroupId) {
172+
this.messageGroupId = messageGroupId;
173+
}
174+
154175
/**
155176
* Set the default delay in milliseconds. If no {@code delayExpression} property has
156177
* been provided, the default delay will be applied to all Messages. If a delay should
@@ -187,10 +208,10 @@ public void setDelayExpressionString(String delayExpression) {
187208

188209
/**
189210
* Specify whether {@code Exceptions} thrown by {@link #delayExpression} evaluation
190-
* should be ignored (only logged). In this case case the delayer will fall back to
191-
* the to the {@link #defaultDelay}. If this property is specified as {@code false},
211+
* should be ignored (only logged). In this case the delayer will fall back to
212+
* the {@link #defaultDelay}. If this property is specified as {@code false},
192213
* any {@link #delayExpression} evaluation {@code Exception} will be thrown to the
193-
* caller without falling back to the to the {@link #defaultDelay}. Default is
214+
* caller without falling back to the {@link #defaultDelay}. Default is
194215
* {@code true}.
195216
* @param ignoreExpressionFailures true if expression evaluation failures should be
196217
* ignored.
@@ -297,6 +318,8 @@ public IntegrationPatternType getIntegrationPatternType() {
297318

298319
@Override
299320
protected void doInit() {
321+
Assert.notNull(this.messageGroupId, "A 'messageGroupId' must be provided");
322+
300323
if (this.messageStore == null) {
301324
this.messageStore = new SimpleMessageStore();
302325
}
@@ -552,7 +575,8 @@ private void doReleaseMessage(Message<?> message) {
552575

553576
private boolean removeDelayedMessageFromMessageStore(Message<?> message) {
554577
if (this.messageStore instanceof SimpleMessageStore) {
555-
synchronized (this.messageGroupId) {
578+
this.removeReleasedMessageLock.lock();
579+
try {
556580
Collection<Message<?>> messages = this.messageStore.getMessageGroup(this.messageGroupId).getMessages();
557581
if (messages.contains(message)) {
558582
this.messageStore.removeMessagesFromGroup(this.messageGroupId, message);
@@ -562,6 +586,9 @@ private boolean removeDelayedMessageFromMessageStore(Message<?> message) {
562586
return false;
563587
}
564588
}
589+
finally {
590+
this.removeReleasedMessageLock.unlock();
591+
}
565592
}
566593
else {
567594
return ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null;

spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -561,10 +561,24 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
561561
/**
562562
* Populate a [DelayHandler] to the current integration flow position.
563563
*/
564+
@Deprecated("since 6.2",
565+
ReplaceWith("""
566+
delay {
567+
messageGroupId(groupId)
568+
}"""))
569+
@Suppress("DEPRECATION")
564570
fun delay(groupId: String, endpointConfigurer: DelayerEndpointSpec.() -> Unit = {}) {
565571
this.delegate.delay(groupId, endpointConfigurer)
566572
}
567573

574+
/**
575+
* Populate a [DelayHandler] to the current integration flow position.
576+
* @since 6.2
577+
*/
578+
fun delay(endpointConfigurer: DelayerEndpointSpec.() -> Unit) {
579+
this.delegate.delay(endpointConfigurer)
580+
}
581+
568582
/**
569583
* Populate a [org.springframework.integration.transformer.ContentEnricher]
570584
* to the current integration flow position
@@ -713,10 +727,11 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
713727
* Provide the [HeaderFilter] to the current [IntegrationFlow].
714728
*/
715729
@Deprecated("since 6.2",
716-
ReplaceWith("""headerFilter {
717-
patternMatch()
718-
headersToRemove()
719-
}"""))
730+
ReplaceWith("""
731+
headerFilter {
732+
patternMatch()
733+
headersToRemove()
734+
}"""))
720735
fun headerFilter(headersToRemove: String, patternMatch: Boolean = true) {
721736
headerFilter {
722737
patternMatch(patternMatch)

spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,8 @@ public IntegrationFlow bridgeFlow2() {
801801
return IntegrationFlow.from("bridgeFlow2Input")
802802
.bridge(c -> c.autoStartup(false).id("bridge"))
803803
.fixedSubscriberChannel()
804-
.delay("delayer", d -> d
804+
.delay(d -> d
805+
.messageGroupId("delayer")
805806
.delayExpression("200")
806807
.advice(this.delayedAdvice)
807808
.messageStore(this.messageStore()))

spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,10 @@ class KotlinDslTests {
320320
wireTap {
321321
channel { queue("wireTapChannel") }
322322
}
323-
delay("delayGroup") { defaultDelay(100) }
323+
delay {
324+
messageGroupId("delayGroup")
325+
defaultDelay(100)
326+
}
324327
transform<String> { it.uppercase() }
325328
}
326329

spring-integration-event/src/test/java/org/springframework/integration/event/dsl/IntegrationFlowEventsTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,8 @@ public IntegrationFlow eventProducerFlow() {
184184
@Bean
185185
public IntegrationFlow delayFlow() {
186186
return flow -> flow
187-
.delay(GROUP_ID, e -> e
187+
.delay(e -> e
188+
.messageGroupId(GROUP_ID)
188189
.messageStore(messageGroupStore)
189190
.id("delayer"))
190191
.channel(MessageChannels.queue("delayedResults"));

spring-integration-groovy/src/main/groovy/org/springframework/integration/groovy/dsl/GroovyIntegrationFlowDefinition.groovy

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,9 @@ class GroovyIntegrationFlowDefinition {
618618
* {@link org.springframework.integration.store.MessageGroupStore}.
619619
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
620620
* @see org.springframework.integration.dsl.DelayerEndpointSpec
621+
* @deprecated since 6.2 in favor of {@link #delay(groovy.lang.Closure)}
621622
*/
623+
@Deprecated(since = "6.2", forRemoval = true)
622624
GroovyIntegrationFlowDefinition delay(
623625
String groupId,
624626
@DelegatesTo(value = DelayerEndpointSpec, strategy = Closure.DELEGATE_FIRST)
@@ -629,6 +631,21 @@ class GroovyIntegrationFlowDefinition {
629631
this
630632
}
631633

634+
/**
635+
* Populate a {@link org.springframework.integration.handler.DelayHandler} to the current integration flow position.
636+
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
637+
* @see org.springframework.integration.dsl.DelayerEndpointSpec
638+
* @since 6.2
639+
*/
640+
GroovyIntegrationFlowDefinition delay(
641+
@DelegatesTo(value = DelayerEndpointSpec, strategy = Closure.DELEGATE_FIRST)
642+
@ClosureParams(value = SimpleType.class, options = 'org.springframework.integration.dsl.DelayerEndpointSpec')
643+
Closure<?> endpointConfigurer) {
644+
645+
this.delegate.delay createConfigurerIfAny(endpointConfigurer)
646+
this
647+
}
648+
632649
/**
633650
* Populate a {@link org.springframework.integration.transformer.ContentEnricher}
634651
* to the current integration flow position
@@ -657,7 +674,7 @@ class GroovyIntegrationFlowDefinition {
657674
GroovyIntegrationFlowDefinition enrichHeaders(
658675
@DelegatesTo(value = HeaderEnricherSpec, strategy = Closure.DELEGATE_FIRST)
659676
@ClosureParams(value = SimpleType.class, options = 'org.springframework.integration.dsl.HeaderEnricherSpec')
660-
Closure<?> enricherConfigurer = null) {
677+
Closure<?> enricherConfigurer) {
661678

662679
this.delegate.enrichHeaders createConfigurerIfAny(enricherConfigurer)
663680
this

spring-integration-groovy/src/test/groovy/org/springframework/integration/groovy/dsl/test/GroovyDslTests.groovy

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,10 @@ class GroovyDslTests {
314314
wireTap integrationFlow {
315315
channel { queue 'wireTapChannel' }
316316
}
317-
delay 'delayGroup', { defaultDelay 100 }
317+
delay {
318+
messageGroupId 'delayGroup'
319+
defaultDelay 100
320+
}
318321
transform String, { it.toUpperCase() }
319322
}
320323
}

src/reference/asciidoc/delayer.adoc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ If you need to determine the delay for each message, you can also provide the Sp
3333
@Bean
3434
public IntegrationFlow flow() {
3535
return IntegrationFlow.from("input")
36-
.delay("delayer.messageGroupId", d -> d
36+
.delay(d -> d
37+
.messageGroupId("delayer.messageGroupId")
3738
.defaultDelay(3_000L)
3839
.delayExpression("headers['delay']"))
3940
.channel("output")
@@ -46,7 +47,8 @@ public IntegrationFlow flow() {
4647
@Bean
4748
fun flow() =
4849
integrationFlow("input") {
49-
delay("delayer.messageGroupId") {
50+
delay {
51+
messageGroupId("delayer.messageGroupId")
5052
defaultDelay(3000L)
5153
delayExpression("headers['delay']")
5254
}

src/reference/asciidoc/groovy-dsl.adoc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ flowLambda() {
3535
wireTap integrationFlow {
3636
channel { queue 'wireTapChannel' }
3737
}
38-
delay 'delayGroup', { defaultDelay 100 }
38+
delay {
39+
messageGroupId 'delayGroup'
40+
defaultDelay 100
41+
}
3942
transform String, { it.toUpperCase() }
4043
}
4144
}

0 commit comments

Comments
 (0)