From 85ef53eb2ab13d8d25368b114e467e3fc33c8ee7 Mon Sep 17 00:00:00 2001 From: abilan Date: Tue, 21 Mar 2023 15:33:14 -0400 Subject: [PATCH] Do not block by default Currently, many timeouts in the project are like `-1` or other negative value with a meaning to wait indefinitely. According to distributed systems design and bad demo developing experience it is not OK to block forever. * Rework most of the timeouts in the framework to be `30` seconds. Only one remained as `1` seconds is a `PollingConsumer` where it is better to not block even for those 30 seconds when no messages in the queue, but let the polling task be rescheduled. * Remove the `MessagingGatewaySupport.replyTimeout` propagation down to the `PollingConsumer` correlator where it was a `-1` before and blocked the polling thread on the `Queue.poll()`. This fixed the problem with a single thread in a pool for auto-configured `TaskScheduler`. Now with 1 seconds wait time we are able to switch to other scheduled tasks even with only 1 thread in the pool --- .../amqp/config/spring-integration-amqp.xsd | 8 +-- .../integration/annotation/Aggregator.java | 6 +- .../integration/annotation/Filter.java | 4 +- .../integration/annotation/Gateway.java | 10 ++-- .../annotation/MessagingGateway.java | 9 +-- .../integration/annotation/Router.java | 4 +- .../annotation/ServiceActivator.java | 4 +- .../integration/annotation/Splitter.java | 4 +- .../integration/annotation/Transformer.java | 4 +- .../config/xml/DefaultRouterParser.java | 6 +- .../context/IntegrationContextUtils.java | 14 ++++- .../integration/dsl/GatewayProxySpec.java | 4 +- .../integration/dsl/ScatterGatherSpec.java | 4 +- .../gateway/GatewayProxyFactoryBean.java | 60 +++++++------------ .../gateway/MessagingGatewaySupport.java | 11 ++-- .../AbstractMessageProducingHandler.java | 6 +- .../router/AbstractMessageRouter.java | 12 ++-- .../scattergather/ScatterGatherHandler.java | 4 +- .../transformer/ContentEnricher.java | 6 +- .../integration/config/spring-integration.xsd | 45 +++++++------- .../CorrelatingMessageHandlerTests.java | 50 ++++++++-------- .../config/ResequencerParserTests.java | 6 +- .../annotation/AggregatorAnnotationTests.java | 10 ++-- .../config/xml/HeaderEnricherParserTests.java | 4 +- .../configuration/EnableIntegrationTests.java | 4 +- .../gateway/MessagingGatewayTests.java | 16 ++--- ...ractReplyProducingMessageHandlerTests.java | 24 ++++---- .../handler/MessageHandlerChainTests.java | 57 ++++++++---------- .../RecipientListRouterParserTests.java | 14 ++--- .../file/config/spring-integration-file.xsd | 6 +- .../ftp/config/spring-integration-ftp.xsd | 6 +- .../http/config/spring-integration-http.xsd | 6 +- .../ip/config/spring-integration-ip.xsd | 10 +--- .../jdbc/config/spring-integration-jdbc.xsd | 12 ++-- .../jms/config/spring-integration-jms.xsd | 6 +- .../JpaOutboundGatewayFactoryBean.java | 8 +-- .../jpa/config/spring-integration-jpa.xsd | 11 ++-- .../kafka/config/spring-integration-kafka.xsd | 6 +- .../config/MongoDbOutboundGatewayParser.java | 4 +- .../config/spring-integration-mongodb.xsd | 6 +- .../MongoDbOutboundGatewayParserTests.java | 3 +- .../redis/config/spring-integration-redis.xsd | 18 ++---- .../sftp/config/spring-integration-sftp.xsd | 6 +- .../smb/config/spring-integration-smb.xsd | 6 +- .../config/spring-integration-webflux.xsd | 6 +- .../ws/config/spring-integration-ws.xsd | 6 +- .../SimpleWebServiceInboundGatewayTests.java | 37 ++++++------ src/reference/asciidoc/aggregator.adoc | 2 +- src/reference/asciidoc/claim-check.adoc | 4 +- .../asciidoc/content-enrichment.adoc | 2 +- src/reference/asciidoc/gateway.adoc | 24 ++++---- src/reference/asciidoc/http.adoc | 2 +- src/reference/asciidoc/jdbc.adoc | 1 - src/reference/asciidoc/jpa.adoc | 1 - src/reference/asciidoc/resequencer.adoc | 5 +- src/reference/asciidoc/router.adoc | 1 - src/reference/asciidoc/scatter-gather.adoc | 6 +- src/reference/asciidoc/whats-new.adoc | 3 + 58 files changed, 284 insertions(+), 340 deletions(-) diff --git a/spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd b/spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd index 770522f3a6a..bf514627342 100644 --- a/spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd +++ b/spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd @@ -721,7 +721,7 @@ property set to TRUE. - + @@ -752,7 +750,7 @@ property set to TRUE. which itself is a Map. This can only be provided if the 'header-mapper' reference is not being set directly. The values in this list can also be simple patterns to be matched against the header names (e.g. "foo*" or "*foo"). - A special token 'STANDARD_REPLY_HEADERS' represents all the standard AMQP headers (replyTo, correlationId etc); + A special token 'STANDARD_REPLY_HEADERS' represents all the standard AMQP headers (replyTo, correlationId etc.); it is included by default. If you wish to add your own headers, you must also include this token if you wish the standard headers to also be mapped. To map all non-standard headers the 'NON_STANDARD_HEADERS' token can be used. ]]> diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Aggregator.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Aggregator.java index 673a33ab51b..469159c2957 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Aggregator.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Aggregator.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,7 +61,7 @@ /** * Specify the maximum amount of time in milliseconds to wait when sending a reply * {@link org.springframework.messaging.Message} to the {@link #outputChannel()}. - * Defaults to {@code -1} - blocking indefinitely. + * Defaults to {@code 30} seconds. * It is applied only if the output channel has some 'sending' limitations, e.g. * {@link org.springframework.integration.channel.QueueChannel} with * a fixed 'capacity' and is currently full. @@ -78,7 +78,7 @@ * or {@code replyChannel} from message headers. Messages are expired when their containing * {@link org.springframework.integration.store.MessageGroup} expires. One of the ways of expiring MessageGroups * is by configuring a {@link org.springframework.integration.store.MessageGroupStoreReaper}. - * However MessageGroups can alternatively be expired by simply calling + * However, MessageGroups can alternatively be expired by simply calling * {@code MessageGroupStore.expireMessageGroup(groupId)}. That could be accomplished via a ControlBus operation * or by simply invoking that method if you have a reference to the * {@link org.springframework.integration.store.MessageGroupStore} instance. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Filter.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Filter.java index bab42461361..7e82c3a3b2e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Filter.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Filter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -100,7 +100,7 @@ /** * Specify the maximum amount of time in milliseconds to wait when sending a reply * {@link org.springframework.messaging.Message} to the {@link #outputChannel()}. - * Defaults to {@code -1} - blocking indefinitely. + * Defaults to {@code 30} seconds. * It is applied only if the output channel has some 'sending' limitations, e.g. * {@link org.springframework.integration.channel.QueueChannel} with * fixed a 'capacity'. In this case a {@link org.springframework.messaging.MessageDeliveryException} is thrown. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Gateway.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Gateway.java index 3a56ce1f1f1..41dcc0a78d9 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Gateway.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Gateway.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.springframework.integration.context.IntegrationContextUtils; + /** * Indicates that an interface method is capable of mapping its parameters * to a message or message payload. These method-level annotations are detected @@ -82,7 +84,7 @@ * @return the timeout. * @see #requestTimeoutExpression() */ - long requestTimeout() default Long.MIN_VALUE; + long requestTimeout() default IntegrationContextUtils.DEFAULT_TIMEOUT; /** * Specify a SpEL Expression to determine the timeout (ms) when sending to the request @@ -101,10 +103,10 @@ * @return the timeout. * @see #replyTimeoutExpression() */ - long replyTimeout() default Long.MIN_VALUE; + long replyTimeout() default IntegrationContextUtils.DEFAULT_TIMEOUT; /** - * Specify a SpEL Expression to determine the the time (ms) that the thread sending + * Specify a SpEL Expression to determine the time (ms) that the thread sending * the request will wait for a reply. The timer starts when the thread returns to the * gateway, not when the request message is sent. Overrides the encompassing gateway's * default reply timeout. Overrides {@link #replyTimeout()}. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/MessagingGateway.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/MessagingGateway.java index f1d8e362981..426a232dfe2 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/MessagingGateway.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/MessagingGateway.java @@ -22,6 +22,7 @@ import java.lang.annotation.Target; import org.springframework.core.annotation.AliasFor; +import org.springframework.integration.context.IntegrationContextUtils; /** * A stereotype annotation to provide an Integration Messaging Gateway Proxy @@ -103,17 +104,17 @@ * See {@link Gateway#requestTimeout()} for per-method configuration. * @return the suggested timeout in milliseconds, if any */ - String defaultRequestTimeout() default "-9223372036854775808"; + String defaultRequestTimeout() default IntegrationContextUtils.DEFAULT_TIMEOUT_STRING; /** * Allows to specify how long this gateway will wait for the reply {@code Message} - * before returning. By default, it will wait indefinitely. {@code null} is returned if - * the gateway times out. Value is specified in milliseconds; it can be a simple long + * before returning. The {@code null} is returned if the gateway times out. + * Value is specified in milliseconds; it can be a simple long * value or a SpEL expression; array variable #args is available. * See {@link Gateway#replyTimeout()} for per-method configuration. * @return the suggested timeout in milliseconds, if any */ - String defaultReplyTimeout() default "-9223372036854775808"; + String defaultReplyTimeout() default IntegrationContextUtils.DEFAULT_TIMEOUT_STRING; /** * Provide a reference to an implementation of {@link java.util.concurrent.Executor} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Router.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Router.java index 7fb3487171b..e8235c1708e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Router.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Router.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -123,7 +123,7 @@ /** * Specify the maximum amount of time in milliseconds to wait when sending a reply * {@link org.springframework.messaging.Message} to the {@code outputChannel}. - * Defaults to {@code -1} - blocking indefinitely. + * Defaults to {@code 30} seconds. * It is applied only if the output channel has some 'sending' limitations, e.g. * {@link org.springframework.integration.channel.QueueChannel} with * fixed a 'capacity'. In this case a {@link org.springframework.messaging.MessageDeliveryException} is thrown. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java index 1dee83c3bc2..e8c2a9c0016 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -86,7 +86,7 @@ /** * Specify the maximum amount of time in milliseconds to wait when sending a reply * {@link org.springframework.messaging.Message} to the {@code outputChannel}. - * Defaults to {@code -1} - blocking indefinitely. + * Defaults to {@code 30} seconds. * It is applied only if the output channel has some 'sending' limitations, e.g. * {@link org.springframework.integration.channel.QueueChannel} with * fixed a 'capacity'. In this case a {@link org.springframework.messaging.MessageDeliveryException} is thrown. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Splitter.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Splitter.java index 5312994b430..9aad44c962c 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Splitter.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Splitter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -88,7 +88,7 @@ /** * Specify the maximum amount of time in milliseconds to wait when sending a reply * {@link org.springframework.messaging.Message} to the {@code outputChannel}. - * Defaults to {@code -1} - blocking indefinitely. + * Defaults to {@code 30} seconds. * It is applied only if the output channel has some 'sending' limitations, e.g. * {@link org.springframework.integration.channel.QueueChannel} with * fixed a 'capacity'. In this case a {@link org.springframework.messaging.MessageDeliveryException} is thrown. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Transformer.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Transformer.java index 12b676eb019..522e27c0c49 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Transformer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Transformer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -64,7 +64,7 @@ /** * Specify the maximum amount of time in milliseconds to wait when sending a reply * {@link org.springframework.messaging.Message} to the {@code outputChannel}. - * Defaults to {@code -1} - blocking indefinitely. + * Defaults to {@code 30} seconds. * It is applied only if the output channel has some 'sending' limitations, e.g. * {@link org.springframework.integration.channel.QueueChannel} with * fixed a 'capacity'. In this case a {@link org.springframework.messaging.MessageDeliveryException} is thrown. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/DefaultRouterParser.java b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/DefaultRouterParser.java index ee98922c456..581d0d39ad6 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/DefaultRouterParser.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/DefaultRouterParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,14 +50,14 @@ boolean hasDefaultOption() { protected void postProcess(BeanDefinitionBuilder builder, Element element, ParserContext parserContext) { List mappingElements = DomUtils.getChildElementsByTagName(element, "mapping"); if (!CollectionUtils.isEmpty(mappingElements)) { - ManagedMap channelMappings = new ManagedMap(); + ManagedMap channelMappings = new ManagedMap<>(); for (Element mappingElement : mappingElements) { channelMappings.put(mappingElement.getAttribute("value"), mappingElement.getAttribute("channel")); } builder.addPropertyValue("channelMappings", channelMappings); } IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "default-output-channel"); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "timeout"); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "send-timeout"); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "resolution-required"); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "apply-sequence"); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "ignore-send-failures"); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java index 7d82d35b7c7..aba92e2ca2a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -101,6 +101,18 @@ public abstract class IntegrationContextUtils { public static final String LIST_MESSAGE_HANDLER_FACTORY_BEAN_NAME = "integrationListMessageHandlerMethodFactory"; + /** + * The default timeout for blocking operations like send and receive messages. + * @since 6.1 + */ + public static final long DEFAULT_TIMEOUT = 30000L; + + /** + * A string representation for {@link #DEFAULT_TIMEOUT}, e.g. for annotation attributes. + * @since 6.1 + */ + public static final String DEFAULT_TIMEOUT_STRING = "" + DEFAULT_TIMEOUT; + /** * @param beanFactory BeanFactory for lookup, must not be null. * @return The {@link MetadataStore} bean whose name is "metadataStore". diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/GatewayProxySpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/GatewayProxySpec.java index 3b7b161ef2c..0585962c0d0 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/GatewayProxySpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/GatewayProxySpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -145,7 +145,7 @@ public GatewayProxySpec requestTimeout(long requestTimeout) { /** * Allows to specify how long this gateway will wait for the reply {@code Message} - * before returning. By default, it will wait indefinitely. {@code null} is returned if + * before returning. By default, it will wait 30 seconds. {@code null} is returned if * the gateway times out. Value is specified in milliseconds. * @param replyTimeout the timeout for replies in milliseconds. * @return current {@link GatewayProxySpec}. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/ScatterGatherSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/ScatterGatherSpec.java index 96e58504cb0..3f7fb684ea2 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/ScatterGatherSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/ScatterGatherSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,7 +50,7 @@ public ScatterGatherSpec gatherChannel(MessageChannel gatherChannel) { * Specify a timeout (in milliseconds) for the * {@link org.springframework.messaging.PollableChannel#receive(long)} operation * to wait for gathering results to output. - * Defaults to {@code -1} - to wait indefinitely. + * Defaults to {@code 30} seconds. * @param gatherTimeout the {@link org.springframework.messaging.PollableChannel} receive timeout. * @return the current {@link ScatterGatherSpec} instance. */ diff --git a/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java b/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java index 624c621f547..bdb953d78dc 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -782,14 +782,9 @@ private Expression extractRequestTimeoutFromAnnotationOrMetadata(@Nullable Gatew Expression requestTimeout = this.defaultRequestTimeout; if (gatewayAnnotation != null) { - /* - * INT-2636 Unspecified annotation attributes should not - * override the default values supplied by explicit configuration. - * There is a small risk that someone has used Long.MIN_VALUE explicitly - * to indicate an indefinite timeout on a gateway method and that will - * no longer work as expected; they will need to use, say, -1 instead. - */ - if (requestTimeout == null || gatewayAnnotation.requestTimeout() != Long.MIN_VALUE) { + if (requestTimeout == null || + gatewayAnnotation.requestTimeout() != IntegrationContextUtils.DEFAULT_TIMEOUT) { + requestTimeout = new ValueExpression<>(gatewayAnnotation.requestTimeout()); } if (StringUtils.hasText(gatewayAnnotation.requestTimeoutExpression())) { @@ -813,14 +808,7 @@ private Expression extractReplyTimeoutFromAnnotationOrMetadata(@Nullable Gateway Expression replyTimeout = this.defaultReplyTimeout; if (gatewayAnnotation != null) { - /* - * INT-2636 Unspecified annotation attributes should not - * override the default values supplied by explicit configuration. - * There is a small risk that someone has used Long.MIN_VALUE explicitly - * to indicate an indefinite timeout on a gateway method and that will - * no longer work as expected; they will need to use, say, -1 instead. - */ - if (replyTimeout == null || gatewayAnnotation.replyTimeout() != Long.MIN_VALUE) { + if (replyTimeout == null || gatewayAnnotation.replyTimeout() != IntegrationContextUtils.DEFAULT_TIMEOUT) { replyTimeout = new ValueExpression<>(gatewayAnnotation.replyTimeout()); } if (StringUtils.hasText(gatewayAnnotation.replyTimeoutExpression())) { @@ -968,31 +956,27 @@ private void channels(@Nullable String requestChannelName, @Nullable String repl private void timeouts(@Nullable Expression requestTimeout, @Nullable Expression replyTimeout, GatewayMethodInboundMessageMapper messageMapper, MethodInvocationGateway gateway) { - if (requestTimeout == null) { - gateway.setRequestTimeout(-1); - } - else if (requestTimeout instanceof ValueExpression) { - Long timeout = requestTimeout.getValue(Long.class); - if (timeout != null) { - gateway.setRequestTimeout(timeout); + if (requestTimeout != null) { + if (requestTimeout instanceof ValueExpression) { + Long timeout = requestTimeout.getValue(Long.class); + if (timeout != null) { + gateway.setRequestTimeout(timeout); + } } - } - else { - messageMapper.setSendTimeoutExpression(requestTimeout); - } - if (replyTimeout == null) { - gateway.setReplyTimeout(-1); - } - else if (replyTimeout instanceof ValueExpression) { - Long timeout = replyTimeout.getValue(Long.class); - if (timeout != null) { - gateway.setReplyTimeout(timeout); + else { + messageMapper.setSendTimeoutExpression(requestTimeout); } } - else { - messageMapper.setReplyTimeoutExpression(replyTimeout); - } if (replyTimeout != null) { + if (replyTimeout instanceof ValueExpression) { + Long timeout = replyTimeout.getValue(Long.class); + if (timeout != null) { + gateway.setReplyTimeout(timeout); + } + } + else { + messageMapper.setReplyTimeoutExpression(replyTimeout); + } gateway.setReceiveTimeoutExpression(replyTimeout); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java b/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java index d439e6c1da6..5f048953637 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java @@ -33,6 +33,7 @@ import org.springframework.integration.IntegrationPatternType; import org.springframework.integration.MessageTimeoutException; import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.MessagingTemplate; import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.endpoint.EventDrivenConsumer; @@ -91,8 +92,6 @@ public abstract class MessagingGatewaySupport extends AbstractEndpoint implements org.springframework.integration.support.management.TrackableComponent, IntegrationInboundManagement, IntegrationPattern { - private static final long DEFAULT_TIMEOUT = 1000L; - protected final ConvertingMessagingTemplate messagingTemplate; // NOSONAR private final SimpleMessageConverter messageConverter = new SimpleMessageConverter(); @@ -122,7 +121,7 @@ public abstract class MessagingGatewaySupport extends AbstractEndpoint private String errorChannelName; - private long replyTimeout = DEFAULT_TIMEOUT; + private long replyTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT; private InboundMessageMapper requestMapper = new DefaultRequestMapper(); @@ -165,7 +164,7 @@ public MessagingGatewaySupport() { public MessagingGatewaySupport(boolean errorOnTimeout) { ConvertingMessagingTemplate template = new ConvertingMessagingTemplate(); template.setMessageConverter(this.messageConverter); - template.setSendTimeout(DEFAULT_TIMEOUT); + template.setSendTimeout(IntegrationContextUtils.DEFAULT_TIMEOUT); template.setReceiveTimeout(this.replyTimeout); this.messagingTemplate = template; this.errorOnTimeout = errorOnTimeout; @@ -908,9 +907,7 @@ protected void registerReplyMessageCorrelatorIfNecessary() { correlator = new EventDrivenConsumer((SubscribableChannel) replyChan, handler); } else if (replyChan instanceof PollableChannel) { - PollingConsumer endpoint = new PollingConsumer((PollableChannel) replyChan, handler); - endpoint.setReceiveTimeout(this.replyTimeout); - correlator = endpoint; + correlator = new PollingConsumer((PollableChannel) replyChan, handler); } else if (replyChan instanceof ReactiveStreamsSubscribableChannel) { correlator = new ReactiveStreamsConsumer(replyChan, (Subscriber>) handler); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java index bb0790df73e..a96ba7c88fe 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2022 the original author or authors. + * Copyright 2014-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -87,6 +87,10 @@ public abstract class AbstractMessageProducingHandler extends AbstractMessageHan private boolean noHeadersPropagation; + { + this.messagingTemplate.setSendTimeout(IntegrationContextUtils.DEFAULT_TIMEOUT); + } + /** * Set the timeout for sending reply Messages. * @param sendTimeout The send timeout. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java b/spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java index 5528cdbcf3d..f412c9ad386 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import org.springframework.core.convert.ConversionService; import org.springframework.core.convert.support.DefaultConversionService; import org.springframework.integration.IntegrationPatternType; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.MessagingTemplate; import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.integration.support.management.IntegrationManagedResource; @@ -47,6 +48,8 @@ @IntegrationManagedResource public abstract class AbstractMessageRouter extends AbstractMessageHandler implements MessageRouter { + private final MessagingTemplate messagingTemplate = new MessagingTemplate(); + private volatile MessageChannel defaultOutputChannel; private volatile String defaultOutputChannelName; @@ -55,8 +58,9 @@ public abstract class AbstractMessageRouter extends AbstractMessageHandler imple private volatile boolean applySequence; - private final MessagingTemplate messagingTemplate = new MessagingTemplate(); - + { + this.messagingTemplate.setSendTimeout(IntegrationContextUtils.DEFAULT_TIMEOUT); + } /** * Set the default channel where Messages should be sent if channel resolution @@ -96,7 +100,7 @@ public void setDefaultOutputChannelName(String defaultOutputChannelName) { /** * Set the timeout for sending a message to the resolved channel. - * By default, there is no timeout, meaning the send will block indefinitely. + * By default, 30 seconds timeout. * @param timeout The timeout. * @since 4.3 */ diff --git a/spring-integration-core/src/main/java/org/springframework/integration/scattergather/ScatterGatherHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/scattergather/ScatterGatherHandler.java index f8145e8eee6..e565eabc273 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/scattergather/ScatterGatherHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/scattergather/ScatterGatherHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2020 the original author or authors. + * Copyright 2014-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,7 +67,7 @@ public class ScatterGatherHandler extends AbstractReplyProducingMessageHandler i private String errorChannelName = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME; - private long gatherTimeout = -1; + private long gatherTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT; private AbstractEndpoint gatherEndpoint; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/transformer/ContentEnricher.java b/spring-integration-core/src/main/java/org/springframework/integration/transformer/ContentEnricher.java index 7d54c467ae3..f7b90d09b9e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/transformer/ContentEnricher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/transformer/ContentEnricher.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -187,7 +187,7 @@ public void setErrorChannelName(String errorChannelName) { /** * Set the timeout value for sending request messages. If not explicitly configured, - * the default is one second. + * the default is 30 seconds. * @param requestTimeout the timeout value in milliseconds. Must not be null. */ public void setRequestTimeout(Long requestTimeout) { @@ -197,7 +197,7 @@ public void setRequestTimeout(Long requestTimeout) { /** * Set the timeout value for receiving reply messages. If not explicitly configured, - * the default is one second. + * the default is 30 seconds. * @param replyTimeout the timeout value in milliseconds. Must not be null. */ public void setReplyTimeout(Long replyTimeout) { diff --git a/spring-integration-core/src/main/resources/org/springframework/integration/config/spring-integration.xsd b/spring-integration-core/src/main/resources/org/springframework/integration/config/spring-integration.xsd index 9f9f9931d7c..66d33f72ff0 100644 --- a/spring-integration-core/src/main/resources/org/springframework/integration/config/spring-integration.xsd +++ b/spring-integration-core/src/main/resources/org/springframework/integration/config/spring-integration.xsd @@ -687,24 +687,24 @@ - + - + @@ -779,26 +779,25 @@ ]]> - + - + - + @@ -932,13 +931,12 @@ - + - + Maximum amount of time in milliseconds to wait when sending a message to the channel if such channel may block. For example, a Queue Channel can block until space is available if its maximum capacity has been reached. - Defaults to '-1' - blocking indefinitely. @@ -3640,8 +3637,7 @@ Specify the maximum amount of time in milliseconds to wait when sending Messages to the target MessageChannels if blocking is possible (e.g. a bounded queue channel that is currently full). - By default the send will block indefinitely. - Synonym for 'timeout' - only one can be supplied. + Defaults to 30 seconds to not block the process indefinitely. @@ -4775,11 +4771,11 @@ The list of component name patterns you want to track (e.g., tracked-components - + Allows to specify how long the Scatter-Gather will wait for reply Messages for gathering. - By default it will wait indefinitely. Value is specified in milliseconds. + Value is specified in milliseconds. @@ -4789,8 +4785,8 @@ The list of component name patterns you want to track (e.g., tracked-components Specify whether the Scatter-Gather must return a non-null value. This value is 'true' by default, hence a ReplyRequiredException will be thrown when the underlying aggregator returns a null value after 'gather-timeout'. - Note, if 'null' is a possibility, the 'gather-timeout' should be specified to avoid an indefinite - wait. + Note, if 'null' is a possibility, the 'gather-timeout' should not be specified as negative + to avoid an indefinite wait. @@ -4807,7 +4803,7 @@ The list of component name patterns you want to track (e.g., tracked-components - Consumer Endpoint name(s) or patterns. To specify more than one name(pattern) use ','  + Consumer Endpoint name(s) or patterns. To specify more than one name(pattern) use ',' (e.g. endpoint="xxx, xxx*, *xxx, *xxx*, xxx*yyy"). The endpoint 'id' is used to retrieve the target endpoint's 'MessageHandler' bean (using its '.handler' suffix), @@ -5070,10 +5066,11 @@ The list of component name patterns you want to track (e.g., tracked-components Specify the maximum amount of time in milliseconds to wait when sending a reply - Message to the output channel. Defaults to '-1' - blocking indefinitely. + Message to the output channel. It is applied only if the output channel has some 'sending' limitations, e.g. QueueChannel with fixed a 'capacity'. In this case a MessageDeliveryException is thrown. The 'send-timeout' is ignored in case of AbstractSubscribableChannel implementations. + Defaults to 30 seconds to not block the process indefinitely. diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/CorrelatingMessageHandlerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/CorrelatingMessageHandlerTests.java index 7918cb24e2a..db5f376e99e 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/CorrelatingMessageHandlerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/CorrelatingMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,19 +16,14 @@ package org.springframework.integration.aggregator; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.internal.stubbing.answers.ThrowsException; -import org.mockito.junit.MockitoJUnitRunner; import org.springframework.integration.store.MessageGroup; import org.springframework.integration.store.MessageGroupStore; @@ -42,50 +37,53 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** * @author Iwein Fuld * @author Dave Syer + * @author Artme Bilan */ -@RunWith(MockitoJUnitRunner.class) public class CorrelatingMessageHandlerTests { private AggregatingMessageHandler handler; - @Mock private CorrelationStrategy correlationStrategy; private final ReleaseStrategy ReleaseStrategy = new SequenceSizeReleaseStrategy(); - @Mock private MessageGroupProcessor processor; - @Mock private MessageChannel outputChannel; private final MessageGroupStore store = new SimpleMessageStore(); - @Before + @BeforeEach public void initializeSubject() { + correlationStrategy = mock(CorrelationStrategy.class); + processor = mock(MessageGroupProcessor.class); + outputChannel = mock(MessageChannel.class); handler = new AggregatingMessageHandler(processor, store, correlationStrategy, ReleaseStrategy); handler.setOutputChannel(outputChannel); } @Test - public void bufferCompletesNormally() throws Exception { + public void bufferCompletesNormally() { String correlationKey = "key"; Message message1 = testMessage(correlationKey, 1, 2); Message message2 = testMessage(correlationKey, 2, 2); when(correlationStrategy.getCorrelationKey(isA(Message.class))).thenReturn(correlationKey); - when(processor.processMessageGroup(any(MessageGroup.class))).thenReturn(MessageBuilder.withPayload("grouped").build()); - when(outputChannel.send(any(Message.class))).thenReturn(true); + when(processor.processMessageGroup(any(MessageGroup.class))) + .thenReturn(MessageBuilder.withPayload("grouped").build()); + when(outputChannel.send(any(Message.class), eq(30000L))).thenReturn(true); handler.handleMessage(message1); @@ -135,21 +133,19 @@ public void shouldNotPruneWhileCompleting() throws Exception { String correlationKey = "key"; final Message message1 = testMessage(correlationKey, 1, 2); final Message message2 = testMessage(correlationKey, 2, 2); - final List> storedMessages = new ArrayList>(); final CountDownLatch bothMessagesHandled = new CountDownLatch(2); when(correlationStrategy.getCorrelationKey(isA(Message.class))).thenReturn(correlationKey); - when(processor.processMessageGroup(any(MessageGroup.class))).thenReturn(MessageBuilder.withPayload("grouped").build()); - when(outputChannel.send(any(Message.class))).thenReturn(true); + when(processor.processMessageGroup(any(MessageGroup.class))) + .thenReturn(MessageBuilder.withPayload("grouped").build()); + when(outputChannel.send(any(Message.class), eq(30000L))).thenReturn(true); handler.handleMessage(message1); bothMessagesHandled.countDown(); - storedMessages.add(message1); ExecutorService exec = Executors.newSingleThreadExecutor(); exec.submit(() -> { handler.handleMessage(message2); - storedMessages.add(message2); bothMessagesHandled.countDown(); }); @@ -160,7 +156,7 @@ public void shouldNotPruneWhileCompleting() throws Exception { } @Test - public void testNullCorrelationKey() throws Exception { + public void testNullCorrelationKey() { final Message message1 = MessageBuilder.withPayload("foo").build(); when(correlationStrategy.getCorrelationKey(isA(Message.class))).thenReturn(null); try { @@ -169,7 +165,8 @@ public void testNullCorrelationKey() throws Exception { } catch (MessageHandlingException e) { Throwable cause = e.getCause(); - boolean pass = cause instanceof IllegalStateException && cause.getMessage().toLowerCase().contains("null correlation"); + boolean pass = cause instanceof IllegalStateException + && cause.getMessage().toLowerCase().contains("null correlation"); if (!pass) { throw e; } @@ -178,8 +175,11 @@ public void testNullCorrelationKey() throws Exception { private Message testMessage(String correlationKey, int sequenceNumber, int sequenceSize) { - return MessageBuilder.withPayload("test" + sequenceNumber).setCorrelationId(correlationKey).setSequenceNumber( - sequenceNumber).setSequenceSize(sequenceSize).build(); + return MessageBuilder.withPayload("test" + sequenceNumber) + .setCorrelationId(correlationKey) + .setSequenceNumber(sequenceNumber) + .setSequenceSize(sequenceSize) + .build(); } } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java index 00072465b63..16e9cb6a226 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,9 +59,7 @@ void testDefaultResequencerProperties() { ResequencingMessageHandler resequencer = TestUtils.getPropertyValue(endpoint, "handler", ResequencingMessageHandler.class); assertThat(getPropertyValue(resequencer, "outputChannel")).isNull(); - assertThat(getPropertyValue( - resequencer, "messagingTemplate.sendTimeout")) - .as("The ResequencerEndpoint is not set with the appropriate timeout value").isEqualTo(-1L); + assertThat(getPropertyValue(resequencer, "messagingTemplate.sendTimeout")).isEqualTo(30000L); assertThat(getPropertyValue(resequencer, "sendPartialResultOnExpiry")) .as("The ResequencerEndpoint is not configured with the appropriate 'send partial results on " + "timeout'" + diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java index e212c61846e..3493d977906 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ import java.lang.reflect.Method; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.springframework.beans.DirectFieldAccessor; import org.springframework.context.ApplicationContext; @@ -51,7 +51,7 @@ public void testAnnotationWithDefaultSettings() { assertThat(getPropertyValue(aggregator, "releaseStrategy") instanceof SimpleSequenceSizeReleaseStrategy) .isTrue(); assertThat(getPropertyValue(aggregator, "outputChannel")).isNull(); - assertThat(getPropertyValue(aggregator, "messagingTemplate.sendTimeout")).isEqualTo(-1L); + assertThat(getPropertyValue(aggregator, "messagingTemplate.sendTimeout")).isEqualTo(30000L); assertThat(getPropertyValue(aggregator, "sendPartialResultOnExpiry")).isEqualTo(false); context.close(); } @@ -98,8 +98,8 @@ public void testAnnotationWithCustomCorrelationStrategy() throws Exception { Object correlationStrategy = getPropertyValue(aggregator, "correlationStrategy"); assertThat(correlationStrategy instanceof MethodInvokingCorrelationStrategy).isTrue(); MethodInvokingCorrelationStrategy releaseStrategyAdapter = (MethodInvokingCorrelationStrategy) correlationStrategy; - DirectFieldAccessor processorAccessor = new DirectFieldAccessor(new DirectFieldAccessor(new DirectFieldAccessor(releaseStrategyAdapter) - .getPropertyValue("processor")).getPropertyValue("delegate")); + DirectFieldAccessor processorAccessor = + new DirectFieldAccessor(TestUtils.getPropertyValue(releaseStrategyAdapter, "processor.delegate")); Object targetObject = processorAccessor.getPropertyValue("targetObject"); assertThat(targetObject).isSameAs(context.getBean(endpointName)); assertThat(processorAccessor.getPropertyValue("handlerMethods")).isNull(); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/HeaderEnricherParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/HeaderEnricherParserTests.java index 0fb6fc76a2e..07b1380be75 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/HeaderEnricherParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/HeaderEnricherParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,7 +49,7 @@ class HeaderEnricherParserTests { void sendTimeoutDefault() { Object endpoint = context.getBean("headerEnricherWithDefaults"); long sendTimeout = TestUtils.getPropertyValue(endpoint, "handler.messagingTemplate.sendTimeout", Long.class); - assertThat(sendTimeout).isEqualTo(-1L); + assertThat(sendTimeout).isEqualTo(30000L); } @Test diff --git a/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java index 770060a3bb9..3990919ef46 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2022 the original author or authors. + * Copyright 2014-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -639,7 +639,7 @@ public void testMetaAnnotations() { assertThat(TestUtils.getPropertyValue(consumer, "handler.outputChannelName")).isEqualTo("annOutput"); assertThat(TestUtils.getPropertyValue(consumer, "handler.discardChannelName")).isEqualTo("annOutput"); assertThat(TestUtils.getPropertyValue(consumer, "trigger.period")).isEqualTo(Duration.ofSeconds(1)); - assertThat(TestUtils.getPropertyValue(consumer, "handler.messagingTemplate.sendTimeout")).isEqualTo(-1L); + assertThat(TestUtils.getPropertyValue(consumer, "handler.messagingTemplate.sendTimeout")).isEqualTo(30000L); assertThat(TestUtils.getPropertyValue(consumer, "handler.sendPartialResultOnExpiry", Boolean.class)).isFalse(); consumer = this.context.getBean("annotationTestService.annAgg2.aggregator", PollingConsumer.class); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/gateway/MessagingGatewayTests.java b/spring-integration-core/src/test/java/org/springframework/integration/gateway/MessagingGatewayTests.java index c147abacfc9..a82af959c62 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/gateway/MessagingGatewayTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/gateway/MessagingGatewayTests.java @@ -91,9 +91,9 @@ public void tearDown() { @Test public void sendMessage() { - Mockito.when(requestChannel.send(messageMock, 1000L)).thenReturn(true); + Mockito.when(requestChannel.send(messageMock, 30000L)).thenReturn(true); this.messagingGateway.send(messageMock); - Mockito.verify(requestChannel).send(messageMock, 1000L); + Mockito.verify(requestChannel).send(messageMock, 30000L); } @Test @@ -109,10 +109,10 @@ public void sendObject() { Mockito.doAnswer(invocation -> { assertThat(((Message) invocation.getArguments()[0]).getPayload()).isEqualTo("test"); return true; - }).when(requestChannel).send(Mockito.any(Message.class), Mockito.eq(1000L)); + }).when(requestChannel).send(Mockito.any(Message.class), Mockito.eq(30000L)); this.messagingGateway.send("test"); - Mockito.verify(requestChannel).send(Mockito.any(Message.class), Mockito.eq(1000L)); + Mockito.verify(requestChannel).send(Mockito.any(Message.class), Mockito.eq(30000L)); } @Test @@ -136,17 +136,17 @@ public void sendMessage_null() { @Test public void receiveMessage() { - Mockito.when(replyChannel.receive(1000L)).thenReturn(messageMock); + Mockito.when(replyChannel.receive(30000L)).thenReturn(messageMock); Mockito.when(messageMock.getPayload()).thenReturn("test"); assertThat(this.messagingGateway.receive()).isEqualTo("test"); - Mockito.verify(replyChannel).receive(1000L); + Mockito.verify(replyChannel).receive(30000L); } @Test public void receiveMessage_null() { - Mockito.when(replyChannel.receive(1000L)).thenReturn(null); + Mockito.when(replyChannel.receive(30000L)).thenReturn(null); assertThat(this.messagingGateway.receive()).isNull(); - Mockito.verify(replyChannel).receive(1000L); + Mockito.verify(replyChannel).receive(30000L); } /* send and receive tests */ diff --git a/spring-integration-core/src/test/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandlerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandlerTests.java index 84fa34ac5f9..25604ab17d7 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandlerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,11 +18,9 @@ import java.util.Collections; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; @@ -32,8 +30,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; /** * @author Iwein Fuld @@ -43,7 +43,6 @@ * @author Artem Bilan * @author Oleg Zhurakousky */ -@RunWith(MockitoJUnitRunner.class) public class AbstractReplyProducingMessageHandlerTests { private final AbstractReplyProducingMessageHandler handler = new AbstractReplyProducingMessageHandler() { @@ -57,9 +56,12 @@ protected Object handleRequestMessage(Message requestMessage) { private final Message message = MessageBuilder.withPayload("test").build(); - @Mock - private final MessageChannel channel = null; + private MessageChannel channel; + @BeforeEach + void setup() { + channel = mock(MessageChannel.class); + } @Test public void errorMessageShouldContainChannelName() { @@ -91,7 +93,7 @@ protected Object handleRequestMessage(Message requestMessage) { handler.setOutputChannel(this.channel); assertThat(handler.getNotPropagatedHeaders()).contains("f*", "*r"); ArgumentCaptor> captor = ArgumentCaptor.forClass(Message.class); - willReturn(true).given(this.channel).send(captor.capture()); + willReturn(true).given(this.channel).send(captor.capture(), eq(30000L)); handler.handleMessage(MessageBuilder.withPayload("hello") .setHeader("foo", "FOO") .setHeader("bar", "BAR") @@ -119,7 +121,7 @@ protected Object handleRequestMessage(Message requestMessage) { assertThat(handler.getNotPropagatedHeaders()).contains("boom"); handler.setOutputChannel(this.channel); ArgumentCaptor> captor = ArgumentCaptor.forClass(Message.class); - willReturn(true).given(this.channel).send(captor.capture()); + willReturn(true).given(this.channel).send(captor.capture(), eq(30000L)); handler.handleMessage(MessageBuilder.withPayload("hello") .setHeader("boom", "FOO") .setHeader("bar", "BAR") @@ -149,7 +151,7 @@ protected Object handleRequestMessage(Message requestMessage) { handler.setOutputChannel(this.channel); assertThat(handler.getNotPropagatedHeaders()).contains("foo", "b*r"); ArgumentCaptor> captor = ArgumentCaptor.forClass(Message.class); - willReturn(true).given(this.channel).send(captor.capture()); + willReturn(true).given(this.channel).send(captor.capture(), eq(30000L)); handler.handleMessage( MessageBuilder.withPayload("hello") .setHeader("foo", "FOO") diff --git a/spring-integration-core/src/test/java/org/springframework/integration/handler/MessageHandlerChainTests.java b/spring-integration-core/src/test/java/org/springframework/integration/handler/MessageHandlerChainTests.java index 7c65ed4c6f9..328bc95ab2d 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/handler/MessageHandlerChainTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/handler/MessageHandlerChainTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,12 +19,9 @@ import java.util.ArrayList; import java.util.List; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.support.DefaultListableBeanFactory; @@ -35,6 +32,8 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; /** @@ -43,21 +42,12 @@ * @author Gary Russell * @author Artem Bilan */ -@RunWith(MockitoJUnitRunner.class) public class MessageHandlerChainTests { private final Message message = MessageBuilder.withPayload("foo").build(); - @Mock private MessageChannel outputChannel; - @Mock - private MessageHandler handler1; - - @Mock - private MessageHandler handler2; - - @Mock private MessageHandler handler3; private ProducingHandlerStub producer1; @@ -66,9 +56,13 @@ public class MessageHandlerChainTests { private ProducingHandlerStub producer3; - @Before + @BeforeEach public void setup() { - Mockito.when(outputChannel.send(Mockito.any(Message.class))).thenReturn(true); + outputChannel = mock(MessageChannel.class); + MessageHandler handler1 = mock(MessageHandler.class); + MessageHandler handler2 = mock(MessageHandler.class); + handler3 = mock(MessageHandler.class); + Mockito.when(outputChannel.send(Mockito.any(Message.class), eq(30000L))).thenReturn(true); producer1 = new ProducingHandlerStub(handler1); producer2 = new ProducingHandlerStub(handler2); producer3 = new ProducingHandlerStub(handler3); @@ -76,7 +70,7 @@ public void setup() { @Test public void chainWithOutputChannel() { - List handlers = new ArrayList(); + List handlers = new ArrayList<>(); handlers.add(producer1); handlers.add(producer2); handlers.add(producer3); @@ -86,12 +80,12 @@ public void chainWithOutputChannel() { chain.setOutputChannel(outputChannel); chain.setBeanFactory(mock(BeanFactory.class)); chain.handleMessage(message); - Mockito.verify(outputChannel).send(Mockito.eq(message)); + Mockito.verify(outputChannel).send(Mockito.eq(message), eq(30000L)); } - @Test(expected = IllegalArgumentException.class) + @Test public void chainWithOutputChannelButLastHandlerDoesNotProduceReplies() { - List handlers = new ArrayList(); + List handlers = new ArrayList<>(); handlers.add(producer1); handlers.add(producer2); handlers.add(handler3); @@ -100,12 +94,12 @@ public void chainWithOutputChannelButLastHandlerDoesNotProduceReplies() { chain.setHandlers(handlers); chain.setOutputChannel(outputChannel); chain.setBeanFactory(mock(BeanFactory.class)); - chain.afterPropertiesSet(); + assertThatIllegalArgumentException().isThrownBy(chain::afterPropertiesSet); } @Test public void chainWithoutOutputChannelButLastHandlerDoesNotProduceReplies() { - List handlers = new ArrayList(); + List handlers = new ArrayList<>(); handlers.add(producer1); handlers.add(producer2); handlers.add(handler3); @@ -119,7 +113,7 @@ public void chainWithoutOutputChannelButLastHandlerDoesNotProduceReplies() { @Test public void chainForwardsToReplyChannel() { Message message = MessageBuilder.withPayload("test").setReplyChannel(outputChannel).build(); - List handlers = new ArrayList(); + List handlers = new ArrayList<>(); handlers.add(producer1); handlers.add(producer2); handlers.add(producer3); @@ -128,7 +122,7 @@ public void chainForwardsToReplyChannel() { chain.setHandlers(handlers); chain.setBeanFactory(mock(BeanFactory.class)); chain.handleMessage(message); - Mockito.verify(outputChannel).send(Mockito.any(Message.class)); + Mockito.verify(outputChannel).send(Mockito.any(Message.class), eq(30000L)); } @Test @@ -136,7 +130,7 @@ public void chainResolvesReplyChannelName() { Message message = MessageBuilder.withPayload("test").setReplyChannelName("testChannel").build(); DefaultListableBeanFactory beanFactory = new DefaultListableBeanFactory(); beanFactory.registerSingleton("testChannel", outputChannel); - List handlers = new ArrayList(); + List handlers = new ArrayList<>(); handlers.add(producer1); handlers.add(producer2); handlers.add(producer3); @@ -145,14 +139,14 @@ public void chainResolvesReplyChannelName() { chain.setHandlers(handlers); chain.setBeanFactory(beanFactory); chain.handleMessage(message); - Mockito.verify(outputChannel).send(Mockito.eq(message)); + Mockito.verify(outputChannel).send(Mockito.eq(message), eq(30000L)); } - @Test(expected = IllegalArgumentException.class) // INT-1175 + @Test public void chainRejectsDuplicateHandlers() { DefaultListableBeanFactory beanFactory = new DefaultListableBeanFactory(); beanFactory.registerSingleton("testChannel", outputChannel); - List handlers = new ArrayList(); + List handlers = new ArrayList<>(); handlers.add(producer1); handlers.add(producer2); handlers.add(producer1); @@ -160,10 +154,11 @@ public void chainRejectsDuplicateHandlers() { chain.setBeanName("testChain"); chain.setHandlers(handlers); chain.setBeanFactory(beanFactory); - chain.afterPropertiesSet(); + assertThatIllegalArgumentException().isThrownBy(chain::afterPropertiesSet); } - private static class ProducingHandlerStub extends IntegrationObjectSupport implements MessageHandler, MessageProducer { + private static class ProducingHandlerStub extends IntegrationObjectSupport + implements MessageHandler, MessageProducer { private volatile MessageChannel output; diff --git a/spring-integration-core/src/test/java/org/springframework/integration/router/config/RecipientListRouterParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/router/config/RecipientListRouterParserTests.java index 2fa5edec171..c232d68c0be 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/router/config/RecipientListRouterParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/router/config/RecipientListRouterParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,7 @@ package org.springframework.integration.router.config; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.Test; import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.annotation.Autowired; @@ -29,8 +28,7 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.PollableChannel; import org.springframework.messaging.support.GenericMessage; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import static org.assertj.core.api.Assertions.assertThat; @@ -41,8 +39,7 @@ * * @since 1.0.3 */ -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration +@SpringJUnitConfig public class RecipientListRouterParserTests { @Autowired @@ -76,8 +73,7 @@ public void simpleRouter() { assertThat(handler.getClass()).isEqualTo(RecipientListRouter.class); RecipientListRouter router = (RecipientListRouter) handler; DirectFieldAccessor accessor = new DirectFieldAccessor(router); - assertThat(new DirectFieldAccessor( - accessor.getPropertyValue("messagingTemplate")).getPropertyValue("sendTimeout")).isEqualTo(-1L); + assertThat(TestUtils.getPropertyValue(router, "messagingTemplate.sendTimeout")).isEqualTo(30000L); assertThat(accessor.getPropertyValue("applySequence")).isEqualTo(Boolean.FALSE); assertThat(accessor.getPropertyValue("ignoreSendFailures")).isEqualTo(Boolean.FALSE); } diff --git a/spring-integration-file/src/main/resources/org/springframework/integration/file/config/spring-integration-file.xsd b/spring-integration-file/src/main/resources/org/springframework/integration/file/config/spring-integration-file.xsd index 053c7a619b2..be23a63ad8e 100644 --- a/spring-integration-file/src/main/resources/org/springframework/integration/file/config/spring-integration-file.xsd +++ b/spring-integration-file/src/main/resources/org/springframework/integration/file/config/spring-integration-file.xsd @@ -410,7 +410,7 @@ Only files matching this regular expression will be picked up by this adapter. - + diff --git a/spring-integration-ftp/src/main/resources/org/springframework/integration/ftp/config/spring-integration-ftp.xsd b/spring-integration-ftp/src/main/resources/org/springframework/integration/ftp/config/spring-integration-ftp.xsd index 973712aa7a8..7422ed5e77e 100644 --- a/spring-integration-ftp/src/main/resources/org/springframework/integration/ftp/config/spring-integration-ftp.xsd +++ b/spring-integration-ftp/src/main/resources/org/springframework/integration/ftp/config/spring-integration-ftp.xsd @@ -343,7 +343,7 @@ - + diff --git a/spring-integration-http/src/main/resources/org/springframework/integration/http/config/spring-integration-http.xsd b/spring-integration-http/src/main/resources/org/springframework/integration/http/config/spring-integration-http.xsd index a4690b647e1..766991b43fc 100644 --- a/spring-integration-http/src/main/resources/org/springframework/integration/http/config/spring-integration-http.xsd +++ b/spring-integration-http/src/main/resources/org/springframework/integration/http/config/spring-integration-http.xsd @@ -498,7 +498,7 @@ ]]> - + diff --git a/spring-integration-ip/src/main/resources/org/springframework/integration/ip/config/spring-integration-ip.xsd b/spring-integration-ip/src/main/resources/org/springframework/integration/ip/config/spring-integration-ip.xsd index 2721dae3d3b..81aaba17c1d 100644 --- a/spring-integration-ip/src/main/resources/org/springframework/integration/ip/config/spring-integration-ip.xsd +++ b/spring-integration-ip/src/main/resources/org/springframework/integration/ip/config/spring-integration-ip.xsd @@ -422,7 +422,7 @@ - + diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/config/spring-integration-jdbc.xsd b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/config/spring-integration-jdbc.xsd index b8b83d62d56..8253df938c1 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/config/spring-integration-jdbc.xsd +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/config/spring-integration-jdbc.xsd @@ -462,7 +462,7 @@ - + @@ -944,7 +942,7 @@ - + diff --git a/spring-integration-jms/src/main/resources/org/springframework/integration/jms/config/spring-integration-jms.xsd b/spring-integration-jms/src/main/resources/org/springframework/integration/jms/config/spring-integration-jms.xsd index 89f343d459b..687ba804386 100644 --- a/spring-integration-jms/src/main/resources/org/springframework/integration/jms/config/spring-integration-jms.xsd +++ b/spring-integration-jms/src/main/resources/org/springframework/integration/jms/config/spring-integration-jms.xsd @@ -874,7 +874,7 @@ - + diff --git a/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbound/JpaOutboundGatewayFactoryBean.java b/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbound/JpaOutboundGatewayFactoryBean.java index 254ffd53e20..11f546201f8 100644 --- a/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbound/JpaOutboundGatewayFactoryBean.java +++ b/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbound/JpaOutboundGatewayFactoryBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.integration.jpa.outbound; import org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.jpa.core.JpaExecutor; import org.springframework.integration.jpa.support.OutboundGatewayType; @@ -43,7 +44,7 @@ public class JpaOutboundGatewayFactoryBean extends AbstractSimpleMessageHandlerF private boolean producesReply = true; - private long replyTimeout; + private long replyTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT; private boolean requiresReply = false; @@ -61,9 +62,8 @@ public void setProducesReply(boolean producesReply) { /** * Specifies the time the gateway will wait to send the result to the reply channel. - * Only applies when the reply channel itself might block the send + * Only applies when the reply channel itself might block the 'send' operation * (for example a bounded QueueChannel that is currently full). - * By default the Gateway will wait indefinitely. * @param replyTimeout The timeout in milliseconds */ public void setReplyTimeout(long replyTimeout) { diff --git a/spring-integration-jpa/src/main/resources/org/springframework/integration/jpa/config/spring-integration-jpa.xsd b/spring-integration-jpa/src/main/resources/org/springframework/integration/jpa/config/spring-integration-jpa.xsd index 1eab3352090..caff1a3d925 100644 --- a/spring-integration-jpa/src/main/resources/org/springframework/integration/jpa/config/spring-integration-jpa.xsd +++ b/spring-integration-jpa/src/main/resources/org/springframework/integration/jpa/config/spring-integration-jpa.xsd @@ -54,7 +54,7 @@ - + @@ -367,7 +366,7 @@ - + diff --git a/spring-integration-kafka/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka.xsd b/spring-integration-kafka/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka.xsd index 1fe69766f99..ea92c621cbc 100644 --- a/spring-integration-kafka/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka.xsd +++ b/spring-integration-kafka/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka.xsd @@ -160,7 +160,7 @@ - + diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/config/MongoDbOutboundGatewayParser.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/config/MongoDbOutboundGatewayParser.java index 1aec4e9f4a7..e5a197032b0 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/config/MongoDbOutboundGatewayParser.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/config/MongoDbOutboundGatewayParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,7 +43,7 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars MongoParserUtils.processCommonAttributes(element, parserContext, builder); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "reply-timeout"); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "reply-timeout", "sendTimeout"); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "reply-channel", "outputChannel"); String collectionCallback = element.getAttribute("collection-callback"); diff --git a/spring-integration-mongodb/src/main/resources/org/springframework/integration/mongodb/config/spring-integration-mongodb.xsd b/spring-integration-mongodb/src/main/resources/org/springframework/integration/mongodb/config/spring-integration-mongodb.xsd index 59b8649ea6c..2403ea04d00 100644 --- a/spring-integration-mongodb/src/main/resources/org/springframework/integration/mongodb/config/spring-integration-mongodb.xsd +++ b/spring-integration-mongodb/src/main/resources/org/springframework/integration/mongodb/config/spring-integration-mongodb.xsd @@ -175,7 +175,7 @@ - + diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/config/MongoDbOutboundGatewayParserTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/config/MongoDbOutboundGatewayParserTests.java index 052518e7c47..54899122619 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/config/MongoDbOutboundGatewayParserTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/config/MongoDbOutboundGatewayParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -72,6 +72,7 @@ public void minimalConfig() { assertThat(TestUtils.getPropertyValue(gateway, "collectionNameExpression")) .isInstanceOf(LiteralExpression.class); assertThat(TestUtils.getPropertyValue(gateway, "collectionNameExpression.literalValue")).isEqualTo("foo"); + assertThat(TestUtils.getPropertyValue(gateway, "messagingTemplate.sendTimeout")).isEqualTo(30000L); assertThat(endpoint).isInstanceOf(PollingConsumer.class); MessageHandler handler = TestUtils.getPropertyValue(endpoint, "handler", MessageHandler.class); diff --git a/spring-integration-redis/src/main/resources/org/springframework/integration/redis/config/spring-integration-redis.xsd b/spring-integration-redis/src/main/resources/org/springframework/integration/redis/config/spring-integration-redis.xsd index b2723adecf1..04d02e23798 100644 --- a/spring-integration-redis/src/main/resources/org/springframework/integration/redis/config/spring-integration-redis.xsd +++ b/spring-integration-redis/src/main/resources/org/springframework/integration/redis/config/spring-integration-redis.xsd @@ -514,7 +514,7 @@ - + @@ -729,7 +727,7 @@ - + @@ -862,7 +858,7 @@ - + diff --git a/spring-integration-sftp/src/main/resources/org/springframework/integration/sftp/config/spring-integration-sftp.xsd b/spring-integration-sftp/src/main/resources/org/springframework/integration/sftp/config/spring-integration-sftp.xsd index 5e556e5ea2f..4a591abe92e 100644 --- a/spring-integration-sftp/src/main/resources/org/springframework/integration/sftp/config/spring-integration-sftp.xsd +++ b/spring-integration-sftp/src/main/resources/org/springframework/integration/sftp/config/spring-integration-sftp.xsd @@ -336,7 +336,7 @@ - + diff --git a/spring-integration-smb/src/main/resources/org/springframework/integration/smb/config/spring-integration-smb.xsd b/spring-integration-smb/src/main/resources/org/springframework/integration/smb/config/spring-integration-smb.xsd index 7a4b257edfe..59301addd13 100644 --- a/spring-integration-smb/src/main/resources/org/springframework/integration/smb/config/spring-integration-smb.xsd +++ b/spring-integration-smb/src/main/resources/org/springframework/integration/smb/config/spring-integration-smb.xsd @@ -297,7 +297,7 @@ - + diff --git a/spring-integration-webflux/src/main/resources/org/springframework/integration/webflux/config/spring-integration-webflux.xsd b/spring-integration-webflux/src/main/resources/org/springframework/integration/webflux/config/spring-integration-webflux.xsd index d2a04069c6d..946c665cd6f 100644 --- a/spring-integration-webflux/src/main/resources/org/springframework/integration/webflux/config/spring-integration-webflux.xsd +++ b/spring-integration-webflux/src/main/resources/org/springframework/integration/webflux/config/spring-integration-webflux.xsd @@ -498,7 +498,7 @@ - + diff --git a/spring-integration-ws/src/main/resources/org/springframework/integration/ws/config/spring-integration-ws.xsd b/spring-integration-ws/src/main/resources/org/springframework/integration/ws/config/spring-integration-ws.xsd index efb98b70827..d0a8ee1afa8 100644 --- a/spring-integration-ws/src/main/resources/org/springframework/integration/ws/config/spring-integration-ws.xsd +++ b/spring-integration-ws/src/main/resources/org/springframework/integration/ws/config/spring-integration-ws.xsd @@ -66,7 +66,7 @@ - + diff --git a/spring-integration-ws/src/test/java/org/springframework/integration/ws/SimpleWebServiceInboundGatewayTests.java b/spring-integration-ws/src/test/java/org/springframework/integration/ws/SimpleWebServiceInboundGatewayTests.java index d885113bdc2..02b257d6fd2 100644 --- a/spring-integration-ws/src/test/java/org/springframework/integration/ws/SimpleWebServiceInboundGatewayTests.java +++ b/spring-integration-ws/src/test/java/org/springframework/integration/ws/SimpleWebServiceInboundGatewayTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,11 +24,8 @@ import javax.xml.transform.stream.StreamResult; import javax.xml.transform.stream.StreamSource; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.stubbing.Answer; import org.springframework.beans.factory.BeanFactory; @@ -40,6 +37,7 @@ import org.springframework.ws.context.MessageContext; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; @@ -50,22 +48,16 @@ /** * @author Iwein Fuld + * @author Artem Bilan */ -@RunWith(MockitoJUnitRunner.class) public class SimpleWebServiceInboundGatewayTests { private final SimpleWebServiceInboundGateway gateway = new SimpleWebServiceInboundGateway(); - @Mock private MessageContext context; - @Mock private WebServiceMessage request; - @Mock - private WebServiceMessage response; - - @Mock private MessageChannel requestChannel; private final MessageChannel replyChannel = new DirectChannel(); @@ -78,13 +70,17 @@ public class SimpleWebServiceInboundGatewayTests { private final Result payloadResult = new StreamResult(output); - @Before + @BeforeEach public void setup() { + this.context = mock(MessageContext.class); + this.request = mock(WebServiceMessage.class); + this.requestChannel = mock(MessageChannel.class); gateway.setRequestChannel(requestChannel); gateway.setReplyChannel(replyChannel); gateway.setBeanFactory(mock(BeanFactory.class)); gateway.start(); + WebServiceMessage response = mock(WebServiceMessage.class); when(context.getResponse()).thenReturn(response); when(response.getPayloadResult()).thenReturn(payloadResult); when(context.getRequest()).thenReturn(request); @@ -92,22 +88,23 @@ public void setup() { @Test public void invokePoxSourceWithReply() throws Exception { - when(requestChannel.send(isA(Message.class), eq(1000L))).thenAnswer( - withReplyTo(replyChannel)); + when(requestChannel.send(isA(Message.class), eq(30000L))) + .thenAnswer(withReplyTo(replyChannel)); when(request.getPayloadSource()).thenReturn(payloadSource); gateway.start(); gateway.invoke(context); - verify(requestChannel).send(argThat(m -> m.getPayload().equals(payloadSource)), eq(1000L)); + verify(requestChannel).send(argThat(m -> m.getPayload().equals(payloadSource)), eq(30000L)); assertThat(output.toString().endsWith(input)).isTrue(); } - @Test(expected = MessageDeliveryException.class) - public void invokePoxSourceTimeout() throws Exception { + @Test + public void invokePoxSourceTimeout() { gateway.setRequestTimeout(10); gateway.setReplyTimeout(10); when(requestChannel.send(isA(Message.class), anyLong())).thenReturn(false); when(request.getPayloadSource()).thenReturn(payloadSource); - gateway.invoke(context); + assertThatExceptionOfType(MessageDeliveryException.class) + .isThrownBy(() -> gateway.invoke(context)); } diff --git a/src/reference/asciidoc/aggregator.adoc b/src/reference/asciidoc/aggregator.adoc index eb26b982c17..407aeb8178f 100644 --- a/src/reference/asciidoc/aggregator.adoc +++ b/src/reference/asciidoc/aggregator.adoc @@ -431,7 +431,7 @@ It serves only as an indicator of whether to discard or send to the output or re Optional (the default is `false`). NOTE: This attribute might more properly be called `send-partial-result-on-timeout`, because the group may not actually expire if `expire-groups-upon-timeout` is set to `false`. <9> The timeout interval to wait when sending a reply `Message` to the `output-channel` or `discard-channel`. -Defaults to `-1`, which results in blocking indefinitely. +Defaults to `30` seconds. It is applied only if the output channel has some 'sending' limitations, such as a `QueueChannel` with a fixed 'capacity'. In this case, a `MessageDeliveryException` is thrown. For `AbstractSubscribableChannel` implementations, the `send-timeout` is ignored . diff --git a/src/reference/asciidoc/claim-check.adoc b/src/reference/asciidoc/claim-check.adoc index 3e247c66dce..57dd69c866f 100644 --- a/src/reference/asciidoc/claim-check.adoc +++ b/src/reference/asciidoc/claim-check.adoc @@ -80,7 +80,7 @@ Optional. This attribute is not available inside a `Chain` element. Optional. <7> Specifies the maximum amount of time (in milliseconds) to wait when sending a reply message to the output channel. -Defaults to `-1` -- blocking indefinitely. +Defaults to `30` seconds. This attribute is not available inside a `Chain` element. Optional. <8> Defines a poller. @@ -150,7 +150,7 @@ This setting is useful when Message can be "`claimed`" only once. It defaults to `false`. Optional. <8> Specifies the maximum amount of time (in milliseconds) to wait when sending a reply message to the output channel. -It defaults to `-1` -- blocking indefinitely. +It defaults to `30` seconds. This attribute is not available inside a `Chain` element. Optional. <9> Defines a poller. diff --git a/src/reference/asciidoc/content-enrichment.adoc b/src/reference/asciidoc/content-enrichment.adoc index 635fb16704c..979a9dcc63c 100644 --- a/src/reference/asciidoc/content-enrichment.adoc +++ b/src/reference/asciidoc/content-enrichment.adoc @@ -328,7 +328,7 @@ Optional. <9> Maximum amount of time in milliseconds to wait when sending a message to the channel, if the channel might block. For example, a queue channel can block until space is available, if its maximum capacity has been reached. Internally, the `send()` timeout is set on the `MessagingTemplate` and ultimately applied when invoking the send operation on the `MessageChannel`. -By default, the `send() timeout is set to '-1', which can cause the send operation on the `MessageChannel`, depending on the implementation, to block indefinitely. +By default, the `send()` timeout is set to '30'. Optional. <10> Boolean value indicating whether any payload that implements `Cloneable` should be cloned prior to sending the message to the request channel for acquiring the enriching data. The cloned version would be used as the target payload for the ultimate reply. diff --git a/src/reference/asciidoc/gateway.adoc b/src/reference/asciidoc/gateway.adoc index cdf3c35add3..35b22af90ac 100644 --- a/src/reference/asciidoc/gateway.adoc +++ b/src/reference/asciidoc/gateway.adoc @@ -48,7 +48,7 @@ With this configuration defined, the `cafeService` can now be injected into othe See the <<./samples.adoc#samples,"`Samples`">> Appendix for an example that uses the `gateway` element (in the Cafe demo). The defaults in the preceding configuration are applied to all methods on the gateway interface. -If a reply timeout is not specified, the calling thread waits indefinitely for a reply. +If a reply timeout is not specified, the calling thread waits for a reply for 30 seconds. See <>. The defaults can be overridden for individual methods. @@ -480,14 +480,16 @@ This means that there might be a chance that a message that was initiated by a g Some service activator method might result in an exception, thus providing no reply (as we do not generate null messages). In other words, multiple scenarios can cause a reply message to never come. That is perfectly natural in messaging systems. -However, think about the implication on the gateway method. The gateway's method input arguments were incorporated into a message and sent downstream. +However, think about the implication on the gateway method. +The gateway's method input arguments were incorporated into a message and sent downstream. The reply message would be converted to a return value of the gateway's method. So you might want to ensure that, for each gateway call, there is always a reply message. -Otherwise, your gateway method might never return and hang indefinitely. +Otherwise, your gateway method might never return and hang indefinitely if `reply-timeout` is set to negative value. One way to handle this situation is by using an asynchronous gateway (explained later in this section). -Another way of handling it is to explicitly set the `reply-timeout` attribute. +Another way of handling it is to rely on a default `reply-timeout` as a `30` seconds. That way, the gateway does not hang any longer than the time specified by the `reply-timeout` and returns 'null' if that timeout does elapse. -Finally, you might want to consider setting downstream flags, such as 'requires-reply', on a service-activator or 'throw-exceptions-on-rejection' on a filter. These options are discussed in more detail in the final section of this chapter. +Finally, you might want to consider setting downstream flags, such as 'requires-reply', on a service-activator or 'throw-exceptions-on-rejection' on a filter. +These options are discussed in more detail in the final section of this chapter. NOTE: If the downstream flow returns an `ErrorMessage`, its `payload` (a `Throwable`) is treated as a regular downstream error. If there is an `error-channel` configured, it is sent to the error flow. @@ -841,7 +843,7 @@ You should understand that the reply message (if produced) is sent to a reply ch ===== Downstream Component Returns 'null' Sync Gateway -- single-threaded:: -If a component downstream returns 'null' and no `reply-timeout` has been configured, the gateway method call hangs indefinitely, unless a `reply-timeout` has been configured or the `requires-reply` attribute has been set on the downstream component (for example, a service activator) that might return 'null'. +If a component downstream returns 'null' and the `reply-timeout` has been configured to negative value, the gateway method call hangs indefinitely, unless the `requires-reply` attribute has been set on the downstream component (for example, a service activator) that might return 'null'. In this case, an exception would be thrown and propagated to the gateway. Sync Gateway -- multi-threaded:: The behavior is the same as the previous case. @@ -849,7 +851,7 @@ The behavior is the same as the previous case. ===== Downstream Component Return Signature is 'void' While Gateway Method Signature Is Non-void Sync Gateway -- single-threaded:: -If a component downstream returns 'void' and no `reply-timeout` has been configured, the gateway method call hangs indefinitely unless a `reply-timeout` has been configured. +If a component downstream returns 'void' and the `reply-timeout` has been configured to negative value, the gateway method call hangs indefinitely. Sync Gateway -- multi-threaded:: The behavior is the same as the previous case. @@ -861,8 +863,9 @@ Sync Gateway -- multi-threaded:: The behavior is the same as the previous case. IMPORTANT: You should understand that, by default, `reply-timeout` is unbounded. -Consequently, if you do not explicitly set the `reply-timeout`, your gateway method invocation might hang indefinitely. +Consequently, if you set the `reply-timeout` to negative value, your gateway method invocation might hang indefinitely. So, to make sure you analyze your flow and if there is even a remote possibility of one of these scenarios to occur, you should set the `reply-timeout` attribute to a "'safe'" value. +It is `30` seconds by default. Even better, you can set the `requires-reply` attribute of the downstream component to 'true' to ensure a timely response, as produced by the throwing of an exception as soon as that downstream component returns null internally. However, you should also realize that there are some scenarios (see <>) where `reply-timeout` does not help. That means it is also important to analyze your message flow and decide when to use a synchronous gateway rather than an asynchronous gateway. @@ -873,11 +876,6 @@ Likewise, when dealing with a Filter, you can set the `throw-exception-on-reject In both of these cases, the resulting flow behaves like it contain a service activator with the 'requires-reply' attribute. In other words, it helps to ensure a timely response from the gateway method invocation. -NOTE: `reply-timeout` is unbounded for `` elements (created by the `GatewayProxyFactoryBean`). -Inbound gateways for external integration (WS, HTTP, and so on) share many characteristics and attributes with these gateways. -However, for those inbound gateways, the default `reply-timeout` is 1000 milliseconds (one second). -If a downstream asynchronous hand-off is made to another thread, you may need to increase this attribute to allow enough time for the flow to complete before the gateway times out. - IMPORTANT: You should understand that the timer starts when the thread returns to the gateway -- that is, when the flow completes or a message is handed off to another thread. At that time, the calling thread starts waiting for the reply. If the flow was completely synchronous, the reply is immediately available. diff --git a/src/reference/asciidoc/http.adoc b/src/reference/asciidoc/http.adoc index 4799683260d..7cc6fda9255 100644 --- a/src/reference/asciidoc/http.adoc +++ b/src/reference/asciidoc/http.adoc @@ -899,7 +899,7 @@ For the _HTTP Outbound Gateway_, the XML Schema defines only the _reply-timeout_ The _reply-timeout_ maps to the _sendTimeout_ property of the _org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler_ class. More precisely, the property is set on the extended `AbstractReplyProducingMessageHandler` class, which ultimately sets the property on the `MessagingTemplate`. -The value of the _sendTimeout_ property defaults to "-1" and will be applied to the connected `MessageChannel`. +The value of the _sendTimeout_ property defaults to `30` seconds and will be applied to the connected `MessageChannel`. This means, that depending on the implementation, the Message Channel's _send_ method may block indefinitely. Furthermore, the _sendTimeout_ property is only used, when the actual MessageChannel implementation has a blocking send (such as 'full' bounded QueueChannel). diff --git a/src/reference/asciidoc/jdbc.adoc b/src/reference/asciidoc/jdbc.adoc index 0a051b7bf64..38d9215065e 100644 --- a/src/reference/asciidoc/jdbc.adoc +++ b/src/reference/asciidoc/jdbc.adoc @@ -985,7 +985,6 @@ Optional. <3> Lets you specify how long this gateway waits for the reply message to be sent successfully before throwing an exception. Keep in mind that, when sending to a `DirectChannel`, the invocation occurs in the sender's thread. Consequently, the failing of the send operation may be caused by other components further downstream. -By default, the gateway waits indefinitely. The value is specified in milliseconds. Optional. <4> Indicates whether this procedure's return value should be included. diff --git a/src/reference/asciidoc/jpa.adoc b/src/reference/asciidoc/jpa.adoc index c4baf64eab4..269613f4d4a 100644 --- a/src/reference/asciidoc/jpa.adoc +++ b/src/reference/asciidoc/jpa.adoc @@ -926,7 +926,6 @@ If this attribute is not defined, the request message must have a `replyChannel` Optional. <3> Specifies the time the gateway waits to send the result to the reply channel. Only applies when the reply channel itself might block the send operation (for example, a bounded `QueueChannel` that is currently full). -By default, the gateway waits indefinitely. The value is specified in milliseconds. Optional. ==== diff --git a/src/reference/asciidoc/resequencer.adoc b/src/reference/asciidoc/resequencer.adoc index 27321033609..56bd1bd673a 100644 --- a/src/reference/asciidoc/resequencer.adoc +++ b/src/reference/asciidoc/resequencer.adoc @@ -69,16 +69,15 @@ Optional. <6> A reference to a `MessageGroupStore` that can be used to store groups of messages under their correlation key until they are complete. Optional. (The default is a volatile in-memory store.) -<7> Whether, upon the expiration of the group, the ordered group should be sent out (even if some of the messages are missing). +<7> Whether, upon the expiration of the group, the ordered group should be sent out (even if some messages are missing). Optional. (The default is false.) See <<./aggregator.adoc#reaper,Managing State in an Aggregator: `MessageGroupStore`>>. <8> The timeout interval to wait when sending a reply `Message` to the `output-channel` or `discard-channel`. -Defaults to `-1`, which blocks indefinitely. It is applied only if the output channel has some 'sending' limitations, such as a `QueueChannel` with a fixed 'capacity'. In this case, a `MessageDeliveryException` is thrown. The `send-timeout` is ignored for `AbstractSubscribableChannel` implementations. -For `group-timeout(-expression)`, the `MessageDeliveryException` from the scheduled expire task leads this task to be rescheduled. +For `group-timeout(-expression)`, the `MessageDeliveryException` from the scheduled expired task leads this task to be rescheduled. Optional. <9> A reference to a bean that implements the message correlation (grouping) algorithm. The bean can be an implementation of the `CorrelationStrategy` interface or a POJO. diff --git a/src/reference/asciidoc/router.adoc b/src/reference/asciidoc/router.adoc index 91454011f33..94889a986f7 100644 --- a/src/reference/asciidoc/router.adoc +++ b/src/reference/asciidoc/router.adoc @@ -380,7 +380,6 @@ This attribute defaults to `false`. `timeout`:: The `timeout` attribute specifies the maximum amount of time in milliseconds to wait when sending messages to the target Message Channels. -By default, the send operation blocks indefinitely. [[router-common-parameters-top]] ===== Top-Level (Outside a Chain) diff --git a/src/reference/asciidoc/scatter-gather.adoc b/src/reference/asciidoc/scatter-gather.adoc index 8923421efed..0af291c5388 100644 --- a/src/reference/asciidoc/scatter-gather.adoc +++ b/src/reference/asciidoc/scatter-gather.adoc @@ -139,13 +139,12 @@ By default, the `send()` blocks for one second. It applies only if the output channel has some 'sending' limitations -- for example, a `QueueChannel` with a fixed 'capacity' that is full. In this case, a `MessageDeliveryException` is thrown. The `send-timeout` is ignored for `AbstractSubscribableChannel` implementations. -For `group-timeout(-expression)`, the `MessageDeliveryException` from the scheduled expire task leads this task to be rescheduled. +For `group-timeout(-expression)`, the `MessageDeliveryException` from the scheduled expired task leads this task to be rescheduled. Optional. <10> Lets you specify how long the scatter-gather waits for the reply message before returning. -By default, it waits indefinitely. +By default, it waits for `30` seconds. 'null' is returned if the reply times out. Optional. -It defaults to `-1`, meaning to wait indefinitely. <11> Specifies whether the scatter-gather must return a non-null value. This value is `true` by default. Consequently, a `ReplyRequiredException` is thrown when the underlying aggregator returns a null value after `gather-timeout`. @@ -214,4 +213,3 @@ This way errors from the `AggregatingMessageHandler` are going to be propagated For successful operation, a `gatherResultChannel`, `originalReplyChannel` and `originalErrorChannel` headers must be transferred back to replies from scatter recipient subflows. In this case a reasonable, finite `gatherTimeout` must be configured for the `ScatterGatherHandler`. Otherwise, it is going to be blocked waiting for a reply from the gatherer forever, by default. - diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 34bc21f6a97..0a7dbe39ab5 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -33,6 +33,9 @@ See <<./zip.adoc#zip,Zip Support>> for more information. - The `MessageFilter` now emits a warning into logs when message is silently discarded and dropped. See <<./filter.adoc#filter, Filter>> for more information. + - The default timeout for send and receive operations in gateways and replying channel adapters has been changed from infinity to `30` seconds. +Only one left as a `1` second is a `receiveTimeout` for `PollingConsumer` to not block a scheduler thread too long and let other queued tasks to be performed with the `TaskScheduler`. + [[x6.1-web-sockets]] === Web Sockets Changes