Skip to content

GH-8981: Add UnicastingDispatcher.failoverStrategy option #8982

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package org.springframework.integration.channel;

import java.util.function.Predicate;

import org.springframework.integration.dispatcher.LoadBalancingStrategy;
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
import org.springframework.integration.dispatcher.UnicastingDispatcher;
Expand Down Expand Up @@ -60,12 +62,26 @@ public DirectChannel(@Nullable LoadBalancingStrategy loadBalancingStrategy) {
/**
* Specify whether the channel's dispatcher should have failover enabled.
* By default, it will. Set this value to 'false' to disable it.
* Overrides {@link #setFailoverStrategy(Predicate)} option.
* In other words: or this, or that option has to be set.
* @param failover The failover boolean.
*/
public void setFailover(boolean failover) {
this.dispatcher.setFailover(failover);
}

/**
* Configure a strategy whether the channel's dispatcher should have failover enabled
* for the exception thrown.
* Overrides {@link #setFailover(boolean)} option.
* In other words: or this, or that option has to be set.
* @param failoverStrategy The failover boolean.
* @since 6.3
*/
public void setFailoverStrategy(Predicate<Exception> failoverStrategy) {
this.dispatcher.setFailoverStrategy(failoverStrategy);
}

/**
* Specify the maximum number of subscribers supported by the
* channel's dispatcher.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.integration.channel;

import java.util.concurrent.Executor;
import java.util.function.Predicate;

import org.springframework.integration.dispatcher.LoadBalancingStrategy;
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
Expand Down Expand Up @@ -49,7 +50,7 @@ public class ExecutorChannel extends AbstractExecutorChannel {

private final LoadBalancingStrategy loadBalancingStrategy;

private boolean failover = true;
private Predicate<Exception> failoverStrategy = (exception) -> true;

/**
* Create an ExecutorChannel that delegates to the provided
Expand Down Expand Up @@ -88,8 +89,20 @@ public ExecutorChannel(Executor executor, @Nullable LoadBalancingStrategy loadBa
* @param failover The failover boolean.
*/
public void setFailover(boolean failover) {
this.failover = failover;
getDispatcher().setFailover(failover);
setFailoverStrategy((exception) -> failover);
}

/**
* Configure a strategy whether the channel's dispatcher should have failover enabled
* for the exception thrown.
* Overrides {@link #setFailover(boolean)} option.
* In other words: or this, or that option has to be set.
* @param failoverStrategy The failover boolean.
* @since 6.3
*/
public void setFailoverStrategy(Predicate<Exception> failoverStrategy) {
this.failoverStrategy = failoverStrategy;
getDispatcher().setFailoverStrategy(failoverStrategy);
}

@Override
Expand All @@ -107,7 +120,7 @@ public final void onInit() {
this.executor = new ErrorHandlingTaskExecutor(this.executor, errorHandler);
}
UnicastingDispatcher unicastingDispatcher = new UnicastingDispatcher(this.executor);
unicastingDispatcher.setFailover(this.failover);
unicastingDispatcher.setFailoverStrategy(this.failoverStrategy);
if (this.maxSubscribers == null) {
this.maxSubscribers = getIntegrationProperties().getChannelsMaxUnicastSubscribers();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import java.util.concurrent.ThreadFactory;
import java.util.function.Function;
import java.util.function.Predicate;

import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.dispatcher.LoadBalancingStrategy;
Expand Down Expand Up @@ -99,6 +100,18 @@ public void setFailover(boolean failover) {
getDispatcher().setFailover(failover);
}

/**
* Configure a strategy whether the channel's dispatcher should have failover enabled
* for the exception thrown.
* Overrides {@link #setFailover(boolean)} option.
* In other words: or this, or that option has to be set.
* @param failoverStrategy The failover boolean.
* @since 6.3
*/
public void setFailoverStrategy(Predicate<Exception> failoverStrategy) {
getDispatcher().setFailoverStrategy(failoverStrategy);
}

/**
* Provide a {@link LoadBalancingStrategy} for the {@link PartitionedDispatcher}.
* @param loadBalancingStrategy The load balancing strategy implementation.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;

import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -67,7 +68,7 @@ public class PartitionedDispatcher extends AbstractDispatcher {

private ThreadFactory threadFactory = new CustomizableThreadFactory("partition-thread-");

private boolean failover = true;
private Predicate<Exception> failoverStrategy = (exception) -> true;

@Nullable
private LoadBalancingStrategy loadBalancingStrategy;
Expand Down Expand Up @@ -108,7 +109,20 @@ public void setThreadFactory(ThreadFactory threadFactory) {
* @param failover The failover boolean.
*/
public void setFailover(boolean failover) {
this.failover = failover;
setFailoverStrategy((exception) -> failover);
}

/**
* Configure a strategy whether the channel's dispatcher should have failover enabled
* for the exception thrown.
* Overrides {@link #setFailover(boolean)} option.
* In other words: or this, or that option has to be set.
* @param failoverStrategy The failover boolean.
* @since 6.3
*/
public void setFailoverStrategy(Predicate<Exception> failoverStrategy) {
Assert.notNull(failoverStrategy, "'failoverStrategy' must not be null");
this.failoverStrategy = failoverStrategy;
}

/**
Expand Down Expand Up @@ -179,7 +193,7 @@ private UnicastingDispatcher newPartition() {
this.executors.add(executor);
DelegateDispatcher delegateDispatcher =
new DelegateDispatcher(new ErrorHandlingTaskExecutor(executor, this.errorHandler));
delegateDispatcher.setFailover(this.failover);
delegateDispatcher.setFailoverStrategy(this.failoverStrategy);
delegateDispatcher.setLoadBalancingStrategy(this.loadBalancingStrategy);
delegateDispatcher.setMessageHandlingTaskDecorator(this.messageHandlingTaskDecorator);
return delegateDispatcher;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Predicate;

import org.springframework.integration.MessageDispatchingException;
import org.springframework.integration.support.utils.IntegrationUtils;
Expand Down Expand Up @@ -58,7 +59,7 @@ public class UnicastingDispatcher extends AbstractDispatcher {

private final Executor executor;

private boolean failover = true;
private Predicate<Exception> failoverStrategy = (exception) -> true;

private LoadBalancingStrategy loadBalancingStrategy;

Expand All @@ -77,10 +78,25 @@ public UnicastingDispatcher(@Nullable Executor executor) {
* Specify whether this dispatcher should failover when a single
* {@link MessageHandler} throws an Exception. The default value is
* <code>true</code>.
* Overrides {@link #setFailoverStrategy(Predicate)} option.
* In other words: or this, or that option has to be set.
* @param failover The failover boolean.
*/
public void setFailover(boolean failover) {
this.failover = failover;
setFailoverStrategy((exception) -> failover);
}

/**
* Configure a strategy whether the channel's dispatcher should have failover enabled
* for the exception thrown.
* Overrides {@link #setFailover(boolean)} option.
* In other words: or this, or that option has to be set.
* @param failoverStrategy The failover boolean.
* @since 6.3
*/
public void setFailoverStrategy(Predicate<Exception> failoverStrategy) {
Assert.notNull(failoverStrategy, "'failoverStrategy' must not be null");
this.failoverStrategy = failoverStrategy;
}

/**
Expand Down Expand Up @@ -154,10 +170,15 @@ private boolean doDispatch(Message<?> message) {
}
exceptions.add(runtimeException);
boolean isLast = !handlerIterator.hasNext();
if (!isLast && this.failover) {
boolean failover = this.failoverStrategy.test(ex);

if (!isLast && failover) {
logExceptionBeforeFailOver(ex, handler, message);
}
handleExceptions(exceptions, message, isLast);

if (isLast || !failover) {
handleExceptions(exceptions, message);
}
}
}
return success;
Expand Down Expand Up @@ -187,22 +208,12 @@ else if (this.logger.isInfoEnabled()) {
}
}

/**
* Handles Exceptions that occur while dispatching. If this dispatcher has
* failover enabled, it will only throw an Exception when the handler list
* is exhausted. The 'isLast' flag will be <em>true</em> if the
* Exception occurred during the final iteration of the MessageHandlers.
* If failover is disabled for this dispatcher, it will re-throw any
* Exception immediately.
*/
private void handleExceptions(List<RuntimeException> allExceptions, Message<?> message, boolean isLast) {
if (isLast || !this.failover) {
if (allExceptions.size() == 1) {
throw allExceptions.get(0);
}
throw new AggregateMessageDeliveryException(message,
"All attempts to deliver Message to MessageHandlers failed.", allExceptions);
private void handleExceptions(List<RuntimeException> allExceptions, Message<?> message) {
if (allExceptions.size() == 1) {
throw allExceptions.get(0);
}
throw new AggregateMessageDeliveryException(message,
"All attempts to deliver Message to MessageHandlers failed.", allExceptions);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,8 +28,8 @@ public class DirectChannelSpec extends LoadBalancingChannelSpec<DirectChannelSpe
@Override
protected DirectChannel doGet() {
this.channel = new DirectChannel(this.loadBalancingStrategy);
if (this.failover != null) {
this.channel.setFailover(this.failover);
if (this.failoverStrategy != null) {
this.channel.setFailoverStrategy(this.failoverStrategy);
}
if (this.maxSubscribers != null) {
this.channel.setMaxSubscribers(this.maxSubscribers);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,8 +36,8 @@ protected ExecutorChannelSpec(Executor executor) {
@Override
protected ExecutorChannel doGet() {
this.channel = new ExecutorChannel(this.executor, this.loadBalancingStrategy);
if (this.failover != null) {
this.channel.setFailover(this.failover);
if (this.failoverStrategy != null) {
this.channel.setFailoverStrategy(this.failoverStrategy);
}
if (this.maxSubscribers != null) {
this.channel.setMaxSubscribers(this.maxSubscribers);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package org.springframework.integration.dsl;

import java.util.function.Predicate;

import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.dispatcher.LoadBalancingStrategy;
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
Expand All @@ -34,7 +36,7 @@ public abstract class LoadBalancingChannelSpec<S extends MessageChannelSpec<S, C

protected LoadBalancingStrategy loadBalancingStrategy = new RoundRobinLoadBalancingStrategy(); // NOSONAR

protected Boolean failover; // NOSONAR
protected Predicate<Exception> failoverStrategy; // NOSONAR

protected Integer maxSubscribers; // NOSONAR

Expand All @@ -46,8 +48,20 @@ public S loadBalancer(LoadBalancingStrategy loadBalancingStrategyToSet) {
return _this();
}

public S failover(Boolean failoverToSet) {
this.failover = failoverToSet;
public S failover(boolean failoverToSet) {
return failoverStrategy((exception) -> failoverToSet);
}

/**
* Configure a strategy whether the channel's dispatcher should have failover enabled
* for the exception thrown.
* Overrides {@link #failover(boolean)} option.
* In other words: or this, or that option has to be set.
* @param failoverStrategy The failover boolean.
* @since 6.3
*/
public S failoverStrategy(Predicate<Exception> failoverStrategy) {
this.failoverStrategy = failoverStrategy;
return _this();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,8 +63,8 @@ protected PartitionedChannel doGet() {
this.channel = new PartitionedChannel(this.partitionCount);
}
this.channel.setLoadBalancingStrategy(this.loadBalancingStrategy);
if (this.failover != null) {
this.channel.setFailover(this.failover);
if (this.failoverStrategy != null) {
this.channel.setFailoverStrategy(this.failoverStrategy);
}
if (this.maxSubscribers != null) {
this.channel.setMaxSubscribers(this.maxSubscribers);
Expand Down
Loading