diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/DirectChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/DirectChannel.java index 21169c47777..7707b1ff113 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/DirectChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/DirectChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.integration.channel; +import java.util.function.Predicate; + import org.springframework.integration.dispatcher.LoadBalancingStrategy; import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy; import org.springframework.integration.dispatcher.UnicastingDispatcher; @@ -60,12 +62,26 @@ public DirectChannel(@Nullable LoadBalancingStrategy loadBalancingStrategy) { /** * Specify whether the channel's dispatcher should have failover enabled. * By default, it will. Set this value to 'false' to disable it. + * Overrides {@link #setFailoverStrategy(Predicate)} option. + * In other words: or this, or that option has to be set. * @param failover The failover boolean. */ public void setFailover(boolean failover) { this.dispatcher.setFailover(failover); } + /** + * Configure a strategy whether the channel's dispatcher should have failover enabled + * for the exception thrown. + * Overrides {@link #setFailover(boolean)} option. + * In other words: or this, or that option has to be set. + * @param failoverStrategy The failover boolean. + * @since 6.3 + */ + public void setFailoverStrategy(Predicate failoverStrategy) { + this.dispatcher.setFailoverStrategy(failoverStrategy); + } + /** * Specify the maximum number of subscribers supported by the * channel's dispatcher. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/ExecutorChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/ExecutorChannel.java index 2a90db4b2f7..0c674e36fdb 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/ExecutorChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/ExecutorChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.integration.channel; import java.util.concurrent.Executor; +import java.util.function.Predicate; import org.springframework.integration.dispatcher.LoadBalancingStrategy; import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy; @@ -49,7 +50,7 @@ public class ExecutorChannel extends AbstractExecutorChannel { private final LoadBalancingStrategy loadBalancingStrategy; - private boolean failover = true; + private Predicate failoverStrategy = (exception) -> true; /** * Create an ExecutorChannel that delegates to the provided @@ -88,8 +89,20 @@ public ExecutorChannel(Executor executor, @Nullable LoadBalancingStrategy loadBa * @param failover The failover boolean. */ public void setFailover(boolean failover) { - this.failover = failover; - getDispatcher().setFailover(failover); + setFailoverStrategy((exception) -> failover); + } + + /** + * Configure a strategy whether the channel's dispatcher should have failover enabled + * for the exception thrown. + * Overrides {@link #setFailover(boolean)} option. + * In other words: or this, or that option has to be set. + * @param failoverStrategy The failover boolean. + * @since 6.3 + */ + public void setFailoverStrategy(Predicate failoverStrategy) { + this.failoverStrategy = failoverStrategy; + getDispatcher().setFailoverStrategy(failoverStrategy); } @Override @@ -107,7 +120,7 @@ public final void onInit() { this.executor = new ErrorHandlingTaskExecutor(this.executor, errorHandler); } UnicastingDispatcher unicastingDispatcher = new UnicastingDispatcher(this.executor); - unicastingDispatcher.setFailover(this.failover); + unicastingDispatcher.setFailoverStrategy(this.failoverStrategy); if (this.maxSubscribers == null) { this.maxSubscribers = getIntegrationProperties().getChannelsMaxUnicastSubscribers(); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java index f8bc59ea98b..2dabfd5d887 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import java.util.concurrent.ThreadFactory; import java.util.function.Function; +import java.util.function.Predicate; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.dispatcher.LoadBalancingStrategy; @@ -99,6 +100,18 @@ public void setFailover(boolean failover) { getDispatcher().setFailover(failover); } + /** + * Configure a strategy whether the channel's dispatcher should have failover enabled + * for the exception thrown. + * Overrides {@link #setFailover(boolean)} option. + * In other words: or this, or that option has to be set. + * @param failoverStrategy The failover boolean. + * @since 6.3 + */ + public void setFailoverStrategy(Predicate failoverStrategy) { + getDispatcher().setFailoverStrategy(failoverStrategy); + } + /** * Provide a {@link LoadBalancingStrategy} for the {@link PartitionedDispatcher}. * @param loadBalancingStrategy The load balancing strategy implementation. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java index afb8de28595..9b19885830e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import java.util.function.Predicate; import org.springframework.integration.util.ErrorHandlingTaskExecutor; import org.springframework.lang.Nullable; @@ -67,7 +68,7 @@ public class PartitionedDispatcher extends AbstractDispatcher { private ThreadFactory threadFactory = new CustomizableThreadFactory("partition-thread-"); - private boolean failover = true; + private Predicate failoverStrategy = (exception) -> true; @Nullable private LoadBalancingStrategy loadBalancingStrategy; @@ -108,7 +109,20 @@ public void setThreadFactory(ThreadFactory threadFactory) { * @param failover The failover boolean. */ public void setFailover(boolean failover) { - this.failover = failover; + setFailoverStrategy((exception) -> failover); + } + + /** + * Configure a strategy whether the channel's dispatcher should have failover enabled + * for the exception thrown. + * Overrides {@link #setFailover(boolean)} option. + * In other words: or this, or that option has to be set. + * @param failoverStrategy The failover boolean. + * @since 6.3 + */ + public void setFailoverStrategy(Predicate failoverStrategy) { + Assert.notNull(failoverStrategy, "'failoverStrategy' must not be null"); + this.failoverStrategy = failoverStrategy; } /** @@ -179,7 +193,7 @@ private UnicastingDispatcher newPartition() { this.executors.add(executor); DelegateDispatcher delegateDispatcher = new DelegateDispatcher(new ErrorHandlingTaskExecutor(executor, this.errorHandler)); - delegateDispatcher.setFailover(this.failover); + delegateDispatcher.setFailoverStrategy(this.failoverStrategy); delegateDispatcher.setLoadBalancingStrategy(this.loadBalancingStrategy); delegateDispatcher.setMessageHandlingTaskDecorator(this.messageHandlingTaskDecorator); return delegateDispatcher; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java index 6988bab6939..699aed54981 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Executor; +import java.util.function.Predicate; import org.springframework.integration.MessageDispatchingException; import org.springframework.integration.support.utils.IntegrationUtils; @@ -58,7 +59,7 @@ public class UnicastingDispatcher extends AbstractDispatcher { private final Executor executor; - private boolean failover = true; + private Predicate failoverStrategy = (exception) -> true; private LoadBalancingStrategy loadBalancingStrategy; @@ -77,10 +78,25 @@ public UnicastingDispatcher(@Nullable Executor executor) { * Specify whether this dispatcher should failover when a single * {@link MessageHandler} throws an Exception. The default value is * true. + * Overrides {@link #setFailoverStrategy(Predicate)} option. + * In other words: or this, or that option has to be set. * @param failover The failover boolean. */ public void setFailover(boolean failover) { - this.failover = failover; + setFailoverStrategy((exception) -> failover); + } + + /** + * Configure a strategy whether the channel's dispatcher should have failover enabled + * for the exception thrown. + * Overrides {@link #setFailover(boolean)} option. + * In other words: or this, or that option has to be set. + * @param failoverStrategy The failover boolean. + * @since 6.3 + */ + public void setFailoverStrategy(Predicate failoverStrategy) { + Assert.notNull(failoverStrategy, "'failoverStrategy' must not be null"); + this.failoverStrategy = failoverStrategy; } /** @@ -154,10 +170,15 @@ private boolean doDispatch(Message message) { } exceptions.add(runtimeException); boolean isLast = !handlerIterator.hasNext(); - if (!isLast && this.failover) { + boolean failover = this.failoverStrategy.test(ex); + + if (!isLast && failover) { logExceptionBeforeFailOver(ex, handler, message); } - handleExceptions(exceptions, message, isLast); + + if (isLast || !failover) { + handleExceptions(exceptions, message); + } } } return success; @@ -187,22 +208,12 @@ else if (this.logger.isInfoEnabled()) { } } - /** - * Handles Exceptions that occur while dispatching. If this dispatcher has - * failover enabled, it will only throw an Exception when the handler list - * is exhausted. The 'isLast' flag will be true if the - * Exception occurred during the final iteration of the MessageHandlers. - * If failover is disabled for this dispatcher, it will re-throw any - * Exception immediately. - */ - private void handleExceptions(List allExceptions, Message message, boolean isLast) { - if (isLast || !this.failover) { - if (allExceptions.size() == 1) { - throw allExceptions.get(0); - } - throw new AggregateMessageDeliveryException(message, - "All attempts to deliver Message to MessageHandlers failed.", allExceptions); + private void handleExceptions(List allExceptions, Message message) { + if (allExceptions.size() == 1) { + throw allExceptions.get(0); } + throw new AggregateMessageDeliveryException(message, + "All attempts to deliver Message to MessageHandlers failed.", allExceptions); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/DirectChannelSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/DirectChannelSpec.java index 74e956fe24e..8fc58f51b68 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/DirectChannelSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/DirectChannelSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,8 +28,8 @@ public class DirectChannelSpec extends LoadBalancingChannelSpec failoverStrategy; // NOSONAR protected Integer maxSubscribers; // NOSONAR @@ -46,8 +48,20 @@ public S loadBalancer(LoadBalancingStrategy loadBalancingStrategyToSet) { return _this(); } - public S failover(Boolean failoverToSet) { - this.failover = failoverToSet; + public S failover(boolean failoverToSet) { + return failoverStrategy((exception) -> failoverToSet); + } + + /** + * Configure a strategy whether the channel's dispatcher should have failover enabled + * for the exception thrown. + * Overrides {@link #failover(boolean)} option. + * In other words: or this, or that option has to be set. + * @param failoverStrategy The failover boolean. + * @since 6.3 + */ + public S failoverStrategy(Predicate failoverStrategy) { + this.failoverStrategy = failoverStrategy; return _this(); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java index 0e8d31ee834..bc7d66bba3d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,8 +63,8 @@ protected PartitionedChannel doGet() { this.channel = new PartitionedChannel(this.partitionCount); } this.channel.setLoadBalancingStrategy(this.loadBalancingStrategy); - if (this.failover != null) { - this.channel.setFailover(this.failover); + if (this.failoverStrategy != null) { + this.channel.setFailoverStrategy(this.failoverStrategy); } if (this.maxSubscribers != null) { this.channel.setMaxSubscribers(this.maxSubscribers); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/channel/PartitionedChannelTests.java b/spring-integration-core/src/test/java/org/springframework/integration/channel/PartitionedChannelTests.java index 6aed78cf8ef..23e773c6adb 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/channel/PartitionedChannelTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/channel/PartitionedChannelTests.java @@ -21,6 +21,8 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.IntStream; import org.assertj.core.api.InstanceOfAssertFactories; @@ -76,7 +78,16 @@ public void afterMessageHandled(Message message, MessageChannel ch, MessageHa MultiValueMap> partitionedMessages = new LinkedMultiValueMap<>(); - partitionedChannel.subscribe((message) -> partitionedMessages.add(Thread.currentThread().getName(), message)); + Lock partitionsLock = new ReentrantLock(); + partitionedChannel.subscribe((message) -> { + partitionsLock.lock(); + try { + partitionedMessages.add(Thread.currentThread().getName(), message); + } + finally { + partitionsLock.unlock(); + } + }); partitionedChannel.send(MessageBuilder.withPayload("test1").setHeader("partitionKey", "1").build()); partitionedChannel.send(MessageBuilder.withPayload("test2").setHeader("partitionKey", "2").build()); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/channel/config/DispatchingChannelParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/channel/config/DispatchingChannelParserTests.java index c8f65a7a142..8d7e31ff730 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/channel/config/DispatchingChannelParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/channel/config/DispatchingChannelParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,9 +19,9 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; +import java.util.function.Predicate; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.Test; import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.annotation.Autowired; @@ -37,18 +37,20 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.mock; /** * @author Mark Fisher * @author Oleg Zhurakousky + * @author Artem Bilan + * * @since 1.0.3 */ -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration +@SpringJUnitConfig public class DispatchingChannelParserTests { @Autowired @@ -60,50 +62,60 @@ public class DispatchingChannelParserTests { @Test public void taskExecutorOnly() { MessageChannel channel = channels.get("taskExecutorOnly"); - assertThat(channel.getClass()).isEqualTo(ExecutorChannel.class); + assertThat(channel).isInstanceOf(ExecutorChannel.class); Object executor = getDispatcherProperty("executor", channel); - assertThat(executor.getClass()).isEqualTo(ErrorHandlingTaskExecutor.class); + assertThat(executor).isInstanceOf(ErrorHandlingTaskExecutor.class); assertThat(new DirectFieldAccessor(executor).getPropertyValue("executor")) .isSameAs(context.getBean("taskExecutor")); - assertThat((Boolean) getDispatcherProperty("failover", channel)).isTrue(); - assertThat(getDispatcherProperty("loadBalancingStrategy", channel).getClass()) - .isEqualTo(RoundRobinLoadBalancingStrategy.class); + @SuppressWarnings("unchecked") + Predicate failoverStrategy = (Predicate) getDispatcherProperty("failoverStrategy", channel); + assertThat(failoverStrategy.test(mock())).isTrue(); + assertThat(getDispatcherProperty("loadBalancingStrategy", channel)) + .isInstanceOf(RoundRobinLoadBalancingStrategy.class); } @Test public void failoverFalse() { MessageChannel channel = channels.get("failoverFalse"); - assertThat(channel.getClass()).isEqualTo(DirectChannel.class); - assertThat((Boolean) getDispatcherProperty("failover", channel)).isFalse(); - assertThat(getDispatcherProperty("loadBalancingStrategy", channel).getClass()) - .isEqualTo(RoundRobinLoadBalancingStrategy.class); + assertThat(channel).isInstanceOf(DirectChannel.class); + @SuppressWarnings("unchecked") + Predicate failoverStrategy = (Predicate) getDispatcherProperty("failoverStrategy", channel); + assertThat(failoverStrategy.test(mock())).isFalse(); + assertThat(getDispatcherProperty("loadBalancingStrategy", channel)) + .isInstanceOf(RoundRobinLoadBalancingStrategy.class); } @Test public void failoverTrue() { MessageChannel channel = channels.get("failoverTrue"); - assertThat(channel.getClass()).isEqualTo(DirectChannel.class); - assertThat((Boolean) getDispatcherProperty("failover", channel)).isTrue(); - assertThat(getDispatcherProperty("loadBalancingStrategy", channel).getClass()) - .isEqualTo(RoundRobinLoadBalancingStrategy.class); + assertThat(channel).isInstanceOf(DirectChannel.class); + @SuppressWarnings("unchecked") + Predicate failoverStrategy = (Predicate) getDispatcherProperty("failoverStrategy", channel); + assertThat(failoverStrategy.test(mock())).isTrue(); + assertThat(getDispatcherProperty("loadBalancingStrategy", channel)) + .isInstanceOf(RoundRobinLoadBalancingStrategy.class); } @Test public void loadBalancerDisabled() { MessageChannel channel = channels.get("loadBalancerDisabled"); - assertThat(channel.getClass()).isEqualTo(DirectChannel.class); - assertThat((Boolean) getDispatcherProperty("failover", channel)).isTrue(); + assertThat(channel).isInstanceOf(DirectChannel.class); + @SuppressWarnings("unchecked") + Predicate failoverStrategy = (Predicate) getDispatcherProperty("failoverStrategy", channel); + assertThat(failoverStrategy.test(mock())).isTrue(); assertThat(getDispatcherProperty("loadBalancingStrategy", channel)).isNull(); } @Test public void loadBalancerDisabledAndTaskExecutor() { MessageChannel channel = channels.get("loadBalancerDisabledAndTaskExecutor"); - assertThat(channel.getClass()).isEqualTo(ExecutorChannel.class); - assertThat((Boolean) getDispatcherProperty("failover", channel)).isTrue(); + assertThat(channel).isInstanceOf(ExecutorChannel.class); + @SuppressWarnings("unchecked") + Predicate failoverStrategy = (Predicate) getDispatcherProperty("failoverStrategy", channel); + assertThat(failoverStrategy.test(mock())).isTrue(); assertThat(getDispatcherProperty("loadBalancingStrategy", channel)).isNull(); Object executor = getDispatcherProperty("executor", channel); - assertThat(executor.getClass()).isEqualTo(ErrorHandlingTaskExecutor.class); + assertThat(executor).isInstanceOf(ErrorHandlingTaskExecutor.class); assertThat(new DirectFieldAccessor(executor).getPropertyValue("executor")) .isSameAs(context.getBean("taskExecutor")); } @@ -111,12 +123,14 @@ public void loadBalancerDisabledAndTaskExecutor() { @Test public void roundRobinLoadBalancerAndTaskExecutor() { MessageChannel channel = channels.get("roundRobinLoadBalancerAndTaskExecutor"); - assertThat(channel.getClass()).isEqualTo(ExecutorChannel.class); - assertThat((Boolean) getDispatcherProperty("failover", channel)).isTrue(); - assertThat(getDispatcherProperty("loadBalancingStrategy", channel).getClass()) - .isEqualTo(RoundRobinLoadBalancingStrategy.class); + assertThat(channel).isInstanceOf(ExecutorChannel.class); + @SuppressWarnings("unchecked") + Predicate failoverStrategy = (Predicate) getDispatcherProperty("failoverStrategy", channel); + assertThat(failoverStrategy.test(mock())).isTrue(); + assertThat(getDispatcherProperty("loadBalancingStrategy", channel)) + .isInstanceOf(RoundRobinLoadBalancingStrategy.class); Object executor = getDispatcherProperty("executor", channel); - assertThat(executor.getClass()).isEqualTo(ErrorHandlingTaskExecutor.class); + assertThat(executor).isInstanceOf(ErrorHandlingTaskExecutor.class); assertThat(new DirectFieldAccessor(executor).getPropertyValue("executor")) .isSameAs(context.getBean("taskExecutor")); } @@ -126,19 +140,15 @@ public void loadBalancerRef() { MessageChannel channel = channels.get("lbRefChannel"); LoadBalancingStrategy lbStrategy = TestUtils.getPropertyValue(channel, "dispatcher.loadBalancingStrategy", LoadBalancingStrategy.class); - assertThat(lbStrategy instanceof SampleLoadBalancingStrategy).isTrue(); + assertThat(lbStrategy).isInstanceOf(SampleLoadBalancingStrategy.class); } @Test public void loadBalancerRefFailWithLoadBalancer() { - - try { - new ClassPathXmlApplicationContext("ChannelWithLoadBalancerRef-fail-config.xml", this.getClass()).close(); - } - catch (BeanDefinitionParsingException e) { - assertThat(e.getMessage()).contains("'load-balancer' and 'load-balancer-ref' are mutually exclusive"); - } - + assertThatExceptionOfType(BeanDefinitionParsingException.class) + .isThrownBy(() -> new ClassPathXmlApplicationContext("ChannelWithLoadBalancerRef-fail-config.xml", + getClass())) + .withMessageContaining("'load-balancer' and 'load-balancer-ref' are mutually exclusive"); } private static Object getDispatcherProperty(String propertyName, MessageChannel channel) { @@ -153,5 +163,7 @@ public static class SampleLoadBalancingStrategy implements LoadBalancingStrategy public Iterator getHandlerIterator(Message message, Collection handlers) { return handlers.iterator(); } + } + } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dispatcher/FailOverDispatcherTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dispatcher/FailOverDispatcherTests.java index e7d5f114428..4db091f5698 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dispatcher/FailOverDispatcherTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dispatcher/FailOverDispatcherTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.springframework.beans.factory.BeanFactory; import org.springframework.integration.MessageRejectedException; @@ -32,6 +32,7 @@ import org.springframework.messaging.support.GenericMessage; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.Mockito.mock; /** @@ -69,12 +70,8 @@ public void noDuplicateSubscriptions() { MessageHandler target = new CountingTestEndpoint(counter, false); dispatcher.addHandler(target); dispatcher.addHandler(target); - try { - dispatcher.dispatch(new GenericMessage<>("test")); - } - catch (Exception e) { - // ignore - } + assertThatExceptionOfType(MessageRejectedException.class) + .isThrownBy(() -> dispatcher.dispatch(new GenericMessage<>("test"))); assertThat(counter.get()).as("target should not have duplicate subscriptions").isEqualTo(1); } @@ -89,12 +86,8 @@ public void removeConsumerBeforeSend() { dispatcher.addHandler(target2); dispatcher.addHandler(target3); dispatcher.removeHandler(target2); - try { - dispatcher.dispatch(new GenericMessage<>("test")); - } - catch (Exception e) { - // ignore - } + assertThatExceptionOfType(AggregateMessageDeliveryException.class) + .isThrownBy(() -> dispatcher.dispatch(new GenericMessage<>("test"))); assertThat(counter.get()).isEqualTo(2); } @@ -108,46 +101,31 @@ public void removeConsumerBetweenSends() { dispatcher.addHandler(target1); dispatcher.addHandler(target2); dispatcher.addHandler(target3); - try { - dispatcher.dispatch(new GenericMessage<>("test1")); - } - catch (Exception e) { - // ignore - } + assertThatExceptionOfType(AggregateMessageDeliveryException.class) + .isThrownBy(() -> dispatcher.dispatch(new GenericMessage<>("test1"))); assertThat(counter.get()).isEqualTo(3); dispatcher.removeHandler(target2); - try { - dispatcher.dispatch(new GenericMessage<>("test2")); - } - catch (Exception e) { - // ignore - } + assertThatExceptionOfType(AggregateMessageDeliveryException.class) + .isThrownBy(() -> dispatcher.dispatch(new GenericMessage<>("test2"))); assertThat(counter.get()).isEqualTo(5); dispatcher.removeHandler(target1); - try { - dispatcher.dispatch(new GenericMessage<>("test3")); - } - catch (Exception e) { - // ignore - } + assertThatExceptionOfType(MessageRejectedException.class) + .isThrownBy(() -> dispatcher.dispatch(new GenericMessage<>("test3"))); assertThat(counter.get()).isEqualTo(6); } - @Test(expected = MessageDeliveryException.class) + @Test public void removeConsumerLastTargetCausesDeliveryException() { UnicastingDispatcher dispatcher = new UnicastingDispatcher(); final AtomicInteger counter = new AtomicInteger(); MessageHandler target = new CountingTestEndpoint(counter, false); dispatcher.addHandler(target); - try { - dispatcher.dispatch(new GenericMessage<>("test1")); - } - catch (Exception e) { - // ignore - } + assertThatExceptionOfType(MessageRejectedException.class) + .isThrownBy(() -> dispatcher.dispatch(new GenericMessage<>("test1"))); assertThat(counter.get()).isEqualTo(1); dispatcher.removeHandler(target); - dispatcher.dispatch(new GenericMessage<>("test2")); + assertThatExceptionOfType(MessageDeliveryException.class) + .isThrownBy(() -> dispatcher.dispatch(new GenericMessage<>("test2"))); } @Test @@ -188,15 +166,26 @@ public void allHandlersReturnFalse() { dispatcher.addHandler(target1); dispatcher.addHandler(target2); dispatcher.addHandler(target3); - try { - assertThat(dispatcher.dispatch(new GenericMessage<>("test"))).isFalse(); - } - catch (Exception e) { - // ignore - } + assertThatExceptionOfType(AggregateMessageDeliveryException.class) + .isThrownBy(() -> dispatcher.dispatch(new GenericMessage<>("test"))); assertThat(counter.get()).as("each target should have been invoked").isEqualTo(3); } + @Test + public void failoverStrategyRejects() { + UnicastingDispatcher dispatcher = new UnicastingDispatcher(); + dispatcher.setFailoverStrategy((exception) -> !(exception instanceof MessageRejectedException)); + AtomicInteger counter = new AtomicInteger(); + MessageHandler target1 = new CountingTestEndpoint(counter, false); + MessageHandler target2 = new CountingTestEndpoint(counter, true); + MessageHandler target3 = new CountingTestEndpoint(counter, true); + dispatcher.addHandler(target1); + dispatcher.addHandler(target2); + dispatcher.addHandler(target3); + assertThatExceptionOfType(MessageRejectedException.class) + .isThrownBy(() -> dispatcher.dispatch(new GenericMessage<>("test"))); + assertThat(counter.get()).as("only first should have been invoked").isEqualTo(1); + } private static ServiceActivatingHandler createConsumer(Object object) { ServiceActivatingHandler handler = new ServiceActivatingHandler(object); diff --git a/src/reference/antora/modules/ROOT/pages/channel/configuration.adoc b/src/reference/antora/modules/ROOT/pages/channel/configuration.adoc index bfc957c9c2f..b34d9cfc394 100644 --- a/src/reference/antora/modules/ROOT/pages/channel/configuration.adoc +++ b/src/reference/antora/modules/ROOT/pages/channel/configuration.adoc @@ -112,6 +112,10 @@ XML:: ---- ====== +Starting with version 6.3, all the `MessageChannel` implementations based on the `UnicastingDispatcher` can be configured with a `Predicate failoverStrategy` instead of plain `failover` option. +This predicate makes a decision to failover or not to the next `MessageHandler` based on an exception thrown from the current one. +The more complex error analysis should be done using xref:router/implementations.adoc#router-implementations-exception-router[`ErrorMessageExceptionTypeRouter`]. + [[channel-datatype-channel]] == Datatype Channel Configuration diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 73400235dd9..3fb6bc06de4 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -23,6 +23,9 @@ The `MessageHistory` header is now mutable, append-only container. And all the subsequent tracks don't create new message - only their entry is added to existing message history header. See xref:message-history.adoc[Message History Chapter] for more information. +All the `MessageChannel` implementations based on the `UnicastingDispatcher` now can be configured with a `Predicate failoverStrategy` for dynamic decision for the failover on the exception thrown from the current `MessageHandler`. +See xref:channel/configuration.adoc[Message Channel Configuration] for more information. + [[x6.3-security-changes]] === Security Support Changes