Skip to content

Commit ed0576c

Browse files
committed
Support executor qualification with @async#value
Prior to this change, Spring's @async annotation support was tied to a single AsyncTaskExecutor bean, meaning that all methods marked with @async were forced to use the same executor. This is an undesirable limitation, given that certain methods may have different priorities, etc. This leads to the need to (optionally) qualify which executor should handle each method. This is similar to the way that Spring's @transactional annotation was originally tied to a single PlatformTransactionManager, but in Spring 3.0 was enhanced to allow for a qualifier via the #value attribute, e.g. @transactional("ptm1") public void m() { ... } where "ptm1" is either the name of a PlatformTransactionManager bean or a qualifier value associated with a PlatformTransactionManager bean, e.g. via the <qualifier> element in XML or the @qualifier annotation. This commit introduces the same approach to @async and its relationship to underlying executor beans. As always, the following syntax remains supported @async public void m() { ... } indicating that calls to #m will be delegated to the "default" executor, i.e. the executor provided to <task:annotation-driven executor="..."/> or the executor specified when authoring a @configuration class that implements AsyncConfigurer and its #getAsyncExecutor method. However, it now also possible to qualify which executor should be used on a method-by-method basis, e.g. @async("e1") public void m() { ... } indicating that calls to #m will be delegated to the executor bean named or otherwise qualified as "e1". Unlike the default executor which is specified up front at configuration time as described above, the "e1" executor bean is looked up within the container on the first execution of #m and then cached in association with that method for the lifetime of the container. Class-level use of Async#value behaves as expected, indicating that all methods within the annotated class should be executed with the named executor. In the case of both method- and class-level annotations, any method-level #value overrides any class level #value. This commit introduces the following major changes: - Add @async#value attribute for executor qualification - Introduce AsyncExecutionAspectSupport as a common base class for both MethodInterceptor- and AspectJ-based async aspects. This base class provides common structure for specifying the default executor (#setExecutor) as well as logic for determining (and caching) which executor should execute a given method (#determineAsyncExecutor) and an abstract method to allow subclasses to provide specific strategies for executor qualification (#getExecutorQualifier). - Introduce AnnotationAsyncExecutionInterceptor as a specialization of the existing AsyncExecutionInterceptor to allow for introspection of the @async annotation and its #value attribute for a given method. Note that this new subclass was necessary for packaging reasons - the original AsyncExecutionInterceptor lives in org.springframework.aop and therefore does not have visibility to the @async annotation in org.springframework.scheduling.annotation. This new subclass replaces usage of AsyncExecutionInterceptor throughout the framework, though the latter remains usable and undeprecated for compatibility with any existing third-party extensions. - Add documentation to spring-task-3.2.xsd and reference manual explaining @async executor qualification - Add tests covering all new functionality Note that the public API of all affected components remains backward- compatible. Issue: SPR-6847
1 parent 3fb1187 commit ed0576c

File tree

15 files changed

+559
-45
lines changed

15 files changed

+559
-45
lines changed
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright 2002-2012 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.aop.interceptor;
18+
19+
import java.lang.reflect.Method;
20+
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
import java.util.concurrent.Executor;
24+
25+
import org.springframework.beans.BeansException;
26+
import org.springframework.beans.factory.BeanFactory;
27+
import org.springframework.beans.factory.BeanFactoryAware;
28+
import org.springframework.beans.factory.BeanFactoryUtils;
29+
import org.springframework.core.task.AsyncTaskExecutor;
30+
import org.springframework.core.task.support.TaskExecutorAdapter;
31+
import org.springframework.util.Assert;
32+
import org.springframework.util.StringUtils;
33+
34+
/**
35+
* Base class for asynchronous method execution aspects, such as
36+
* {@link org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor}
37+
* or {@link org.springframework.scheduling.aspectj.AnnotationAsyncExecutionAspect}.
38+
*
39+
* <p>Provides support for <i>executor qualification</i> on a method-by-method basis.
40+
* {@code AsyncExecutionAspectSupport} objects must be constructed with a default {@code
41+
* Executor}, but each individual method may further qualify a specific {@code Executor}
42+
* bean to be used when executing it, e.g. through an annotation attribute.
43+
*
44+
* @author Chris Beams
45+
* @since 3.2
46+
*/
47+
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
48+
49+
private final Map<Method, AsyncTaskExecutor> executors = new HashMap<Method, AsyncTaskExecutor>();
50+
51+
private Executor defaultExecutor;
52+
53+
private BeanFactory beanFactory;
54+
55+
56+
/**
57+
* Create a new {@link AsyncExecutionAspectSupport}, using the provided default
58+
* executor unless individual async methods indicate via qualifier that a more
59+
* specific executor should be used.
60+
* @param defaultExecutor the executor to use when executing asynchronous methods
61+
*/
62+
public AsyncExecutionAspectSupport(Executor defaultExecutor) {
63+
this.setExecutor(defaultExecutor);
64+
}
65+
66+
67+
/**
68+
* Supply the executor to be used when executing async methods.
69+
* @param defaultExecutor the {@code Executor} (typically a Spring {@code
70+
* AsyncTaskExecutor} or {@link java.util.concurrent.ExecutorService}) to delegate to
71+
* unless a more specific executor has been requested via a qualifier on the async
72+
* method, in which case the executor will be looked up at invocation time against the
73+
* enclosing bean factory.
74+
* @see #getExecutorQualifier
75+
* @see #setBeanFactory(BeanFactory)
76+
*/
77+
public void setExecutor(Executor defaultExecutor) {
78+
this.defaultExecutor = defaultExecutor;
79+
}
80+
81+
/**
82+
* Set the {@link BeanFactory} to be used when looking up executors by qualifier.
83+
*/
84+
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
85+
this.beanFactory = beanFactory;
86+
}
87+
88+
/**
89+
* Return the qualifier or bean name of the executor to be used when executing the
90+
* given async method, typically specified in the form of an annotation attribute.
91+
* Returning an empty string or {@code null} indicates that no specific executor has
92+
* been specified and that the {@linkplain #setExecutor(Executor) default executor}
93+
* should be used.
94+
* @param method the method to inspect for executor qualifier metadata
95+
* @return the qualifier if specified, otherwise empty string or {@code null}
96+
* @see #determineAsyncExecutor(Method)
97+
*/
98+
protected abstract String getExecutorQualifier(Method method);
99+
100+
/**
101+
* Determine the specific executor to use when executing the given method.
102+
* @returns the executor to use (never {@code null})
103+
*/
104+
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
105+
if (!this.executors.containsKey(method)) {
106+
Executor executor = this.defaultExecutor;
107+
108+
String qualifier = getExecutorQualifier(method);
109+
if (StringUtils.hasLength(qualifier)) {
110+
Assert.notNull(this.beanFactory,
111+
"BeanFactory must be set on " + this.getClass().getSimpleName() +
112+
" to access qualified executor [" + qualifier + "]");
113+
executor = BeanFactoryUtils.qualifiedBeanOfType(this.beanFactory, Executor.class, qualifier);
114+
}
115+
116+
if (executor instanceof AsyncTaskExecutor) {
117+
this.executors.put(method, (AsyncTaskExecutor) executor);
118+
}
119+
else if (executor instanceof Executor) {
120+
this.executors.put(method, new TaskExecutorAdapter(executor));
121+
}
122+
}
123+
124+
return this.executors.get(method);
125+
}
126+
127+
}

spring-aop/src/main/java/org/springframework/aop/interceptor/AsyncExecutionInterceptor.java

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.aop.interceptor;
1818

19+
import java.lang.reflect.Method;
20+
1921
import java.util.concurrent.Callable;
2022
import java.util.concurrent.Executor;
2123
import java.util.concurrent.Future;
@@ -25,8 +27,6 @@
2527

2628
import org.springframework.core.Ordered;
2729
import org.springframework.core.task.AsyncTaskExecutor;
28-
import org.springframework.core.task.support.TaskExecutorAdapter;
29-
import org.springframework.util.Assert;
3030
import org.springframework.util.ReflectionUtils;
3131

3232
/**
@@ -44,39 +44,39 @@
4444
* (like Spring's {@link org.springframework.scheduling.annotation.AsyncResult}
4545
* or EJB 3.1's <code>javax.ejb.AsyncResult</code>).
4646
*
47+
* <p>As of Spring 3.2 the {@code AnnotationAsyncExecutionInterceptor} subclass is
48+
* preferred for use due to its support for executor qualification in conjunction with
49+
* Spring's {@code @Async} annotation.
50+
*
4751
* @author Juergen Hoeller
52+
* @author Chris Beams
4853
* @since 3.0
4954
* @see org.springframework.scheduling.annotation.Async
5055
* @see org.springframework.scheduling.annotation.AsyncAnnotationAdvisor
56+
* @see org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor
5157
*/
52-
public class AsyncExecutionInterceptor implements MethodInterceptor, Ordered {
53-
54-
private final AsyncTaskExecutor asyncExecutor;
55-
58+
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport
59+
implements MethodInterceptor, Ordered {
5660

5761
/**
5862
* Create a new {@code AsyncExecutionInterceptor}.
5963
* @param executor the {@link Executor} (typically a Spring {@link AsyncTaskExecutor}
6064
* or {@link java.util.concurrent.ExecutorService}) to delegate to.
6165
*/
62-
public AsyncExecutionInterceptor(AsyncTaskExecutor asyncExecutor) {
63-
Assert.notNull(asyncExecutor, "TaskExecutor must not be null");
64-
this.asyncExecutor = asyncExecutor;
66+
public AsyncExecutionInterceptor(Executor executor) {
67+
super(executor);
6568
}
6669

6770

6871
/**
69-
* Create a new AsyncExecutionInterceptor.
70-
* @param asyncExecutor the <code>java.util.concurrent</code> Executor
71-
* to delegate to (typically a {@link java.util.concurrent.ExecutorService}
72+
* Intercept the given method invocation, submit the actual calling of the method to
73+
* the correct task executor and return immediately to the caller.
74+
* @param invocation the method to intercept and make asynchronous
75+
* @return {@link Future} if the original method returns {@code Future}; {@code null}
76+
* otherwise.
7277
*/
73-
public AsyncExecutionInterceptor(Executor asyncExecutor) {
74-
this.asyncExecutor = new TaskExecutorAdapter(asyncExecutor);
75-
}
76-
77-
7878
public Object invoke(final MethodInvocation invocation) throws Throwable {
79-
Future<?> result = this.asyncExecutor.submit(
79+
Future<?> result = this.determineAsyncExecutor(invocation.getMethod()).submit(
8080
new Callable<Object>() {
8181
public Object call() throws Exception {
8282
try {
@@ -99,6 +99,20 @@ public Object call() throws Exception {
9999
}
100100
}
101101

102+
/**
103+
* {@inheritDoc}
104+
* <p>This implementation is a no-op for compatibility in Spring 3.2. Subclasses may
105+
* override to provide support for extracting qualifier information, e.g. via an
106+
* annotation on the given method.
107+
* @return always {@code null}
108+
* @see #determineAsyncExecutor(Method)
109+
* @since 3.2
110+
*/
111+
@Override
112+
protected String getExecutorQualifier(Method method) {
113+
return null;
114+
}
115+
102116
public int getOrder() {
103117
return Ordered.HIGHEST_PRECEDENCE;
104118
}

spring-aspects/src/main/java/org/springframework/scheduling/aspectj/AbstractAsyncExecutionAspect.aj

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ import java.util.concurrent.Executor;
2121
import java.util.concurrent.Future;
2222

2323
import org.aspectj.lang.reflect.MethodSignature;
24+
25+
import org.springframework.aop.interceptor.AsyncExecutionAspectSupport;
2426
import org.springframework.core.task.AsyncTaskExecutor;
25-
import org.springframework.core.task.SimpleAsyncTaskExecutor;
26-
import org.springframework.core.task.support.TaskExecutorAdapter;
2727

2828
/**
2929
* Abstract aspect that routes selected methods asynchronously.
@@ -34,19 +34,18 @@ import org.springframework.core.task.support.TaskExecutorAdapter;
3434
*
3535
* @author Ramnivas Laddad
3636
* @author Juergen Hoeller
37+
* @author Chris Beams
3738
* @since 3.0.5
3839
*/
39-
public abstract aspect AbstractAsyncExecutionAspect {
40-
41-
private AsyncTaskExecutor asyncExecutor;
40+
public abstract aspect AbstractAsyncExecutionAspect extends AsyncExecutionAspectSupport {
4241

43-
public void setExecutor(Executor executor) {
44-
if (executor instanceof AsyncTaskExecutor) {
45-
this.asyncExecutor = (AsyncTaskExecutor) executor;
46-
}
47-
else {
48-
this.asyncExecutor = new TaskExecutorAdapter(executor);
49-
}
42+
/**
43+
* Create an {@code AnnotationAsyncExecutionAspect} with a {@code null} default
44+
* executor, which should instead be set via {@code #aspectOf} and
45+
* {@link #setExecutor(Executor)}.
46+
*/
47+
public AbstractAsyncExecutionAspect() {
48+
super(null);
5049
}
5150

5251
/**
@@ -57,7 +56,9 @@ public abstract aspect AbstractAsyncExecutionAspect {
5756
* otherwise.
5857
*/
5958
Object around() : asyncMethod() {
60-
if (this.asyncExecutor == null) {
59+
MethodSignature methodSignature = (MethodSignature) thisJoinPointStaticPart.getSignature();
60+
AsyncTaskExecutor executor = determineAsyncExecutor(methodSignature.getMethod());
61+
if (executor == null) {
6162
return proceed();
6263
}
6364
Callable<Object> callable = new Callable<Object>() {
@@ -68,8 +69,8 @@ public abstract aspect AbstractAsyncExecutionAspect {
6869
}
6970
return null;
7071
}};
71-
Future<?> result = this.asyncExecutor.submit(callable);
72-
if (Future.class.isAssignableFrom(((MethodSignature) thisJoinPointStaticPart.getSignature()).getReturnType())) {
72+
Future<?> result = executor.submit(callable);
73+
if (Future.class.isAssignableFrom(methodSignature.getReturnType())) {
7374
return result;
7475
}
7576
else {

spring-aspects/src/main/java/org/springframework/scheduling/aspectj/AnnotationAsyncExecutionAspect.aj

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616

1717
package org.springframework.scheduling.aspectj;
1818

19+
import java.lang.reflect.Method;
20+
1921
import java.util.concurrent.Future;
22+
23+
import org.springframework.core.annotation.AnnotationUtils;
2024
import org.springframework.scheduling.annotation.Async;
2125

2226
/**
@@ -31,6 +35,7 @@ import org.springframework.scheduling.annotation.Async;
3135
* constraint, it produces only a warning.
3236
*
3337
* @author Ramnivas Laddad
38+
* @author Chris Beams
3439
* @since 3.0.5
3540
*/
3641
public aspect AnnotationAsyncExecutionAspect extends AbstractAsyncExecutionAspect {
@@ -43,6 +48,28 @@ public aspect AnnotationAsyncExecutionAspect extends AbstractAsyncExecutionAspec
4348

4449
public pointcut asyncMethod() : asyncMarkedMethod() || asyncTypeMarkedMethod();
4550

51+
/**
52+
* {@inheritDoc}
53+
* <p>This implementation inspects the given method and its declaring class for the
54+
* {@code @Async} annotation, returning the qualifier value expressed by
55+
* {@link Async#value()}. If {@code @Async} is specified at both the method and class level, the
56+
* method's {@code #value} takes precedence (even if empty string, indicating that
57+
* the default executor should be used preferentially).
58+
* @return the qualifier if specified, otherwise empty string indicating that the
59+
* {@linkplain #setExecutor(Executor) default executor} should be used
60+
* @see #determineAsyncExecutor(Method)
61+
*/
62+
@Override
63+
protected String getExecutorQualifier(Method method) {
64+
// maintainer's note: changes made here should also be made in
65+
// AnnotationAsyncExecutionInterceptor#getExecutorQualifier
66+
Async async = AnnotationUtils.findAnnotation(method, Async.class);
67+
if (async == null) {
68+
async = AnnotationUtils.findAnnotation(method.getDeclaringClass(), Async.class);
69+
}
70+
return async == null ? null : async.value();
71+
}
72+
4673
declare error:
4774
execution(@Async !(void||Future) *(..)):
4875
"Only methods that return void or Future may have an @Async annotation";

spring-aspects/src/test/java/org/springframework/scheduling/aspectj/AnnotationAsyncExecutionAspectTests.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,16 @@
2222

2323
import org.junit.Before;
2424
import org.junit.Test;
25+
26+
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
27+
import org.springframework.beans.factory.support.RootBeanDefinition;
2528
import org.springframework.core.task.SimpleAsyncTaskExecutor;
2629
import org.springframework.scheduling.annotation.Async;
2730
import org.springframework.scheduling.annotation.AsyncResult;
31+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
32+
33+
import static org.hamcrest.CoreMatchers.not;
34+
import static org.hamcrest.Matchers.startsWith;
2835

2936
import static org.junit.Assert.*;
3037

@@ -104,6 +111,22 @@ public void methodReturningNonVoidNonFutureInAsyncClassGetsRoutedSynchronously()
104111
assertEquals(0, executor.submitCompleteCounter);
105112
}
106113

114+
@Test
115+
public void qualifiedAsyncMethodsAreRoutedToCorrectExecutor() throws InterruptedException, ExecutionException {
116+
DefaultListableBeanFactory beanFactory = new DefaultListableBeanFactory();
117+
beanFactory.registerBeanDefinition("e1", new RootBeanDefinition(ThreadPoolTaskExecutor.class));
118+
AnnotationAsyncExecutionAspect.aspectOf().setBeanFactory(beanFactory);
119+
120+
ClassWithQualifiedAsyncMethods obj = new ClassWithQualifiedAsyncMethods();
121+
122+
Future<Thread> defaultThread = obj.defaultWork();
123+
assertThat(defaultThread.get(), not(Thread.currentThread()));
124+
assertThat(defaultThread.get().getName(), not(startsWith("e1-")));
125+
126+
Future<Thread> e1Thread = obj.e1Work();
127+
assertThat(e1Thread.get().getName(), startsWith("e1-"));
128+
}
129+
107130

108131
@SuppressWarnings("serial")
109132
private static class CountingExecutor extends SimpleAsyncTaskExecutor {
@@ -180,4 +203,16 @@ public Future<Integer> incrementReturningAFuture() {
180203
}
181204
}
182205

206+
207+
static class ClassWithQualifiedAsyncMethods {
208+
@Async
209+
public Future<Thread> defaultWork() {
210+
return new AsyncResult<Thread>(Thread.currentThread());
211+
}
212+
213+
@Async("e1")
214+
public Future<Thread> e1Work() {
215+
return new AsyncResult<Thread>(Thread.currentThread());
216+
}
217+
}
183218
}

0 commit comments

Comments
 (0)