Skip to content

Improve Delayer DSL #8645

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1178,12 +1178,18 @@ public B bridge(@Nullable Consumer<GenericEndpointSpec<BridgeHandler>> endpointC
/**
* Populate a {@link DelayHandler} to the current integration flow position
* with default options.
* Shortcut for:
* <pre class="code">
* {@code
* .delay(delayer -> delayer.messageGroupId(groupId))
* }
* </pre>
* @param groupId the {@code groupId} for delayed messages in the
* {@link org.springframework.integration.store.MessageGroupStore}.
* @return the current {@link BaseIntegrationFlowDefinition}.
*/
public B delay(String groupId) {
return delay(groupId, null);
return delay(delayer -> delayer.messageGroupId(groupId));
}

/**
Expand All @@ -1192,12 +1198,25 @@ public B delay(String groupId) {
* {@link org.springframework.integration.store.MessageGroupStore}.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @deprecated since 6.2 in favor of {@link #delay(Consumer)}
* @see DelayerEndpointSpec
*/
@Deprecated(since = "6.2", forRemoval = true)
public B delay(String groupId, @Nullable Consumer<DelayerEndpointSpec> endpointConfigurer) {
return register(new DelayerEndpointSpec(new DelayHandler(groupId)), endpointConfigurer);
}

/**
* Populate a {@link DelayHandler} to the current integration flow position.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @since 6.2
* @see DelayerEndpointSpec
*/
public B delay(Consumer<DelayerEndpointSpec> endpointConfigurer) {
return register(new DelayerEndpointSpec(), endpointConfigurer);
}

/**
* Populate a {@link org.springframework.integration.transformer.ContentEnricher}
* to the current integration flow position
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,6 +46,10 @@ public class DelayerEndpointSpec extends ConsumerEndpointSpec<DelayerEndpointSpe

private final List<Advice> delayedAdvice = new LinkedList<>();

protected DelayerEndpointSpec() {
this(new DelayHandler());
}

protected DelayerEndpointSpec(DelayHandler delayHandler) {
super(delayHandler);
Assert.notNull(delayHandler, "'delayHandler' must not be null.");
Expand Down Expand Up @@ -225,4 +229,16 @@ public <P> DelayerEndpointSpec delayFunction(Function<Message<P>, Object> delayF
return this;
}

/**
* Set a group id to manage delayed messages by this handler.
* @param messageGroupId the group id for delayed messages.
* @return the endpoint spec.
* @since 6.2
* @see DelayHandler#setMessageGroupId(String)
*/
public DelayerEndpointSpec messageGroupId(String messageGroupId) {
this.handler.setMessageGroupId(messageGroupId);
return this;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

import org.aopalliance.aop.Advice;
Expand Down Expand Up @@ -98,10 +100,12 @@ public class DelayHandler extends AbstractReplyProducingMessageHandler implement

public static final long DEFAULT_RETRY_DELAY = 1_000;

private final String messageGroupId;

private final ConcurrentMap<String, AtomicInteger> deliveries = new ConcurrentHashMap<>();

private final Lock removeReleasedMessageLock = new ReentrantLock();

private String messageGroupId;

private long defaultDelay;

private Expression delayExpression;
Expand All @@ -126,6 +130,14 @@ public class DelayHandler extends AbstractReplyProducingMessageHandler implement

private long retryDelay = DEFAULT_RETRY_DELAY;

/**
* Construct an instance with default options.
* The {@link #messageGroupId}must then be provided via the setter.
* @since 6.2
*/
public DelayHandler() {
}

/**
* Create a DelayHandler with the given 'messageGroupId' that is used as 'key' for
* {@link MessageGroup} to store delayed Messages in the {@link MessageGroupStore}.
Expand All @@ -151,6 +163,15 @@ public DelayHandler(String messageGroupId, TaskScheduler taskScheduler) {
setTaskScheduler(taskScheduler);
}

/**
* Set a group id to manage delayed messages by this handler.
* @param messageGroupId the group id for delayed messages.
* @since 6.2
*/
public void setMessageGroupId(String messageGroupId) {
this.messageGroupId = messageGroupId;
}

/**
* Set the default delay in milliseconds. If no {@code delayExpression} property has
* been provided, the default delay will be applied to all Messages. If a delay should
Expand Down Expand Up @@ -187,10 +208,10 @@ public void setDelayExpressionString(String delayExpression) {

/**
* Specify whether {@code Exceptions} thrown by {@link #delayExpression} evaluation
* should be ignored (only logged). In this case case the delayer will fall back to
* the to the {@link #defaultDelay}. If this property is specified as {@code false},
* should be ignored (only logged). In this case the delayer will fall back to
* the {@link #defaultDelay}. If this property is specified as {@code false},
* any {@link #delayExpression} evaluation {@code Exception} will be thrown to the
* caller without falling back to the to the {@link #defaultDelay}. Default is
* caller without falling back to the {@link #defaultDelay}. Default is
* {@code true}.
* @param ignoreExpressionFailures true if expression evaluation failures should be
* ignored.
Expand Down Expand Up @@ -297,6 +318,8 @@ public IntegrationPatternType getIntegrationPatternType() {

@Override
protected void doInit() {
Assert.notNull(this.messageGroupId, "A 'messageGroupId' must be provided");

if (this.messageStore == null) {
this.messageStore = new SimpleMessageStore();
}
Expand Down Expand Up @@ -552,7 +575,8 @@ private void doReleaseMessage(Message<?> message) {

private boolean removeDelayedMessageFromMessageStore(Message<?> message) {
if (this.messageStore instanceof SimpleMessageStore) {
synchronized (this.messageGroupId) {
this.removeReleasedMessageLock.lock();
try {
Collection<Message<?>> messages = this.messageStore.getMessageGroup(this.messageGroupId).getMessages();
if (messages.contains(message)) {
this.messageStore.removeMessagesFromGroup(this.messageGroupId, message);
Expand All @@ -562,6 +586,9 @@ private boolean removeDelayedMessageFromMessageStore(Message<?> message) {
return false;
}
}
finally {
this.removeReleasedMessageLock.unlock();
}
}
else {
return ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,24 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
/**
* Populate a [DelayHandler] to the current integration flow position.
*/
@Deprecated("since 6.2",
ReplaceWith("""
delay {
messageGroupId(groupId)
}"""))
@Suppress("DEPRECATION")
fun delay(groupId: String, endpointConfigurer: DelayerEndpointSpec.() -> Unit = {}) {
this.delegate.delay(groupId, endpointConfigurer)
}

/**
* Populate a [DelayHandler] to the current integration flow position.
* @since 6.2
*/
fun delay(endpointConfigurer: DelayerEndpointSpec.() -> Unit) {
this.delegate.delay(endpointConfigurer)
}

/**
* Populate a [org.springframework.integration.transformer.ContentEnricher]
* to the current integration flow position
Expand Down Expand Up @@ -713,10 +727,11 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
* Provide the [HeaderFilter] to the current [IntegrationFlow].
*/
@Deprecated("since 6.2",
ReplaceWith("""headerFilter {
patternMatch()
headersToRemove()
}"""))
ReplaceWith("""
headerFilter {
patternMatch()
headersToRemove()
}"""))
fun headerFilter(headersToRemove: String, patternMatch: Boolean = true) {
headerFilter {
patternMatch(patternMatch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,8 @@ public IntegrationFlow bridgeFlow2() {
return IntegrationFlow.from("bridgeFlow2Input")
.bridge(c -> c.autoStartup(false).id("bridge"))
.fixedSubscriberChannel()
.delay("delayer", d -> d
.delay(d -> d
.messageGroupId("delayer")
.delayExpression("200")
.advice(this.delayedAdvice)
.messageStore(this.messageStore()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,10 @@ class KotlinDslTests {
wireTap {
channel { queue("wireTapChannel") }
}
delay("delayGroup") { defaultDelay(100) }
delay {
messageGroupId("delayGroup")
defaultDelay(100)
}
transform<String> { it.uppercase() }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ public IntegrationFlow eventProducerFlow() {
@Bean
public IntegrationFlow delayFlow() {
return flow -> flow
.delay(GROUP_ID, e -> e
.delay(e -> e
.messageGroupId(GROUP_ID)
.messageStore(messageGroupStore)
.id("delayer"))
.channel(MessageChannels.queue("delayedResults"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,9 @@ class GroovyIntegrationFlowDefinition {
* {@link org.springframework.integration.store.MessageGroupStore}.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @see org.springframework.integration.dsl.DelayerEndpointSpec
* @deprecated since 6.2 in favor of {@link #delay(groovy.lang.Closure)}
*/
@Deprecated(since = "6.2", forRemoval = true)
GroovyIntegrationFlowDefinition delay(
String groupId,
@DelegatesTo(value = DelayerEndpointSpec, strategy = Closure.DELEGATE_FIRST)
Expand All @@ -629,6 +631,21 @@ class GroovyIntegrationFlowDefinition {
this
}

/**
* Populate a {@link org.springframework.integration.handler.DelayHandler} to the current integration flow position.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @see org.springframework.integration.dsl.DelayerEndpointSpec
* @since 6.2
*/
GroovyIntegrationFlowDefinition delay(
@DelegatesTo(value = DelayerEndpointSpec, strategy = Closure.DELEGATE_FIRST)
@ClosureParams(value = SimpleType.class, options = 'org.springframework.integration.dsl.DelayerEndpointSpec')
Closure<?> endpointConfigurer) {

this.delegate.delay createConfigurerIfAny(endpointConfigurer)
this
}

/**
* Populate a {@link org.springframework.integration.transformer.ContentEnricher}
* to the current integration flow position
Expand Down Expand Up @@ -657,7 +674,7 @@ class GroovyIntegrationFlowDefinition {
GroovyIntegrationFlowDefinition enrichHeaders(
@DelegatesTo(value = HeaderEnricherSpec, strategy = Closure.DELEGATE_FIRST)
@ClosureParams(value = SimpleType.class, options = 'org.springframework.integration.dsl.HeaderEnricherSpec')
Closure<?> enricherConfigurer = null) {
Closure<?> enricherConfigurer) {

this.delegate.enrichHeaders createConfigurerIfAny(enricherConfigurer)
this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,10 @@ class GroovyDslTests {
wireTap integrationFlow {
channel { queue 'wireTapChannel' }
}
delay 'delayGroup', { defaultDelay 100 }
delay {
messageGroupId 'delayGroup'
defaultDelay 100
}
transform String, { it.toUpperCase() }
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/reference/asciidoc/delayer.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ If you need to determine the delay for each message, you can also provide the Sp
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay("delayer.messageGroupId", d -> d
.delay(d -> d
.messageGroupId("delayer.messageGroupId")
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
Expand All @@ -46,7 +47,8 @@ public IntegrationFlow flow() {
@Bean
fun flow() =
integrationFlow("input") {
delay("delayer.messageGroupId") {
delay {
messageGroupId("delayer.messageGroupId")
defaultDelay(3000L)
delayExpression("headers['delay']")
}
Expand Down
5 changes: 4 additions & 1 deletion src/reference/asciidoc/groovy-dsl.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ flowLambda() {
wireTap integrationFlow {
channel { queue 'wireTapChannel' }
}
delay 'delayGroup', { defaultDelay 100 }
delay {
messageGroupId 'delayGroup'
defaultDelay 100
}
transform String, { it.toUpperCase() }
}
}
Expand Down