From 19e350cdf3dab98372cead979ccf603d098dc4d4 Mon Sep 17 00:00:00 2001 From: abilan Date: Fri, 16 Jun 2023 13:13:54 -0400 Subject: [PATCH 1/3] GH-8626: Provide cleaner `transform()` DSL Fixes https://github.com/spring-projects/spring-integration/issues/8626 * Add missed `transform(String beanName, @Nullable String methodName)` API * Introduce a `TransformerSpec` to expose a strict API to configure transformer variants. * Introduce `transformWith(Consumer)` as a single point of all possible transformer and its endpoint options * Deprecate those `IntegrationFlowDefinition.transform()` variants which are harder to configure as several lambda arguments This change will make Kotlin & Groovy DSL more readable and straightforward --- .../dsl/BaseIntegrationFlowDefinition.java | 76 ++++++- .../dsl/IntegrationFlowDefinition.java | 21 +- .../integration/dsl/TransformerSpec.java | 188 ++++++++++++++++++ .../MessageTransformingHandler.java | 25 ++- .../dsl/KotlinIntegrationFlowDefinition.kt | 5 + .../dsl/flows/IntegrationFlowTests.java | 14 +- .../dsl/manualflow/ManualFlowTests.java | 5 +- .../reactivestreams/ReactiveStreamsTests.java | 8 +- .../dsl/transformers/TransformerTests.java | 9 +- 9 files changed, 323 insertions(+), 28 deletions(-) create mode 100644 spring-integration-core/src/main/java/org/springframework/integration/dsl/TransformerSpec.java diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java index 86ffacafaa7..ee863ce49c3 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java @@ -538,12 +538,18 @@ public B controlBus(@Nullable Consumer + * {@code + * .transformWith((transformerSpec) -> transformerSpec.expression(expression)) + * } + * * @param expression the {@code Transformer} {@link Expression}. * @return the current {@link BaseIntegrationFlowDefinition}. * @see ExpressionEvaluatingTransformer */ public B transform(String expression) { - return transform(expression, (Consumer>) null); + return transformWith((transformerSpec) -> transformerSpec.expression(expression)); } /** @@ -552,8 +558,10 @@ public B transform(String expression) { * @param expression the {@code Transformer} {@link Expression}. * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. * @return the current {@link BaseIntegrationFlowDefinition}. + * @deprecated since 6.2 in favor of {@link #transformWith(Consumer)}. * @see ExpressionEvaluatingTransformer */ + @Deprecated(since = "6.2", forRemoval = true) public B transform(String expression, @Nullable Consumer> endpointConfigurer) { @@ -566,9 +574,15 @@ public B transform(String expression, /** * Populate the {@code MessageTransformingHandler} for the {@link MethodInvokingTransformer} * to invoke the discovered service method at runtime. + * Shortcut for: + *
+	 * {@code
+	 *  .transformWith((transformerSpec) -> transformerSpec.ref(service))
+	 * }
+	 * 
* @param service the service to use. * @return the current {@link BaseIntegrationFlowDefinition}. - * @see ExpressionEvaluatingTransformer + * @see MethodInvokingTransformer */ public B transform(Object service) { return transform(service, null); @@ -577,13 +591,36 @@ public B transform(Object service) { /** * Populate the {@code MessageTransformingHandler} for the {@link MethodInvokingTransformer} * to invoke the service method at runtime. + *
+	 * {@code
+	 *  .transformWith((transformerSpec) -> transformerSpec.ref(service).method(methodName))
+	 * }
+	 * 
* @param service the service to use. * @param methodName the method to invoke. * @return the current {@link BaseIntegrationFlowDefinition}. * @see MethodInvokingTransformer */ public B transform(Object service, @Nullable String methodName) { - return transform(service, methodName, null); + return transformWith((transformerSpec) -> transformerSpec.ref(service).method(methodName)); + } + + /** + * Populate the {@code MessageTransformingHandler} for the {@link MethodInvokingTransformer} + * to invoke the bean method at runtime. + *
+	 * {@code
+	 *  .transformWith((transformerSpec) -> transformerSpec.refName(beanName).method(methodName))
+	 * }
+	 * 
+ * @param beanName the name for bean to resolve lazily. + * @param methodName the method to invoke. + * @return the current {@link BaseIntegrationFlowDefinition}. + * @since 6.2 + * @see MethodInvokingTransformer + */ + public B transform(String beanName, @Nullable String methodName) { + return transformWith((transformerSpec) -> transformerSpec.refName(beanName).method(methodName)); } /** @@ -593,8 +630,10 @@ public B transform(Object service, @Nullable String methodName) { * @param methodName the method to invoke. * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. * @return the current {@link BaseIntegrationFlowDefinition}. - * @see ExpressionEvaluatingTransformer + * @deprecated since 6.2 in favor of {@link #transformWith(Consumer)}. + * @see MethodInvokingTransformer */ + @Deprecated(since = "6.2", forRemoval = true) public B transform(Object service, @Nullable String methodName, @Nullable Consumer> endpointConfigurer) { @@ -617,12 +656,18 @@ public B transform(Object service, @Nullable String methodName, * .transform(Scripts.script("classpath:myScript.py").variable("foo", bar())) * } * + * Shortcut for: + *
+	 * {@code
+	 *  .transformWith((transformerSpec) -> transformerSpec.processor(messageProcessorSpec))
+	 * }
+	 * 
* @param messageProcessorSpec the {@link MessageProcessorSpec} to use. * @return the current {@link BaseIntegrationFlowDefinition}. * @see MethodInvokingTransformer */ public B transform(MessageProcessorSpec messageProcessorSpec) { - return transform(messageProcessorSpec, (Consumer>) null); + return transformWith((transformerSpec) -> transformerSpec.processor(messageProcessorSpec)); } /** @@ -638,8 +683,10 @@ public B transform(MessageProcessorSpec messageProcessorSpec) { * @param messageProcessorSpec the {@link MessageProcessorSpec} to use. * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. * @return the current {@link BaseIntegrationFlowDefinition}. + * @deprecated since 6.2 in favor of {@link #transformWith(Consumer)}. * @see MethodInvokingTransformer */ + @Deprecated(since = "6.2", forRemoval = true) public B transform(MessageProcessorSpec messageProcessorSpec, @Nullable Consumer> endpointConfigurer) { @@ -679,7 +726,8 @@ public

B convert(Class

payloadType) { * @see LambdaMessageProcessor */ public B transform(@Nullable Class

expectedType, GenericTransformer genericTransformer) { - return transform(expectedType, genericTransformer, null); + return transformWith((transformerSpec) -> + transformerSpec.transformer(genericTransformer).expectedType(expectedType)); } /** @@ -714,10 +762,12 @@ public

B convert(Class

payloadType, * @param

the payload type - 'transform from', or {@code Message.class}. * @param the target type - 'transform to'. * @return the current {@link BaseIntegrationFlowDefinition}. + * @deprecated since 6.2 in favor of {@link #transformWith(Consumer)} * @see MethodInvokingTransformer * @see LambdaMessageProcessor * @see GenericEndpointSpec */ + @Deprecated(since = "6.2", forRemoval = true) public B transform(@Nullable Class

expectedType, GenericTransformer genericTransformer, @Nullable Consumer> endpointConfigurer) { @@ -730,6 +780,18 @@ public B transform(@Nullable Class

expectedType, GenericTransformer the payload type - 'transform from', or {@code Message.class}. + * @param the target type - 'transform to'. + * @return the current {@link BaseIntegrationFlowDefinition}. + * @since 6.2 + */ + public B transformWith(Consumer transformerConfigurer) { + return register(new TransformerSpec(), transformerConfigurer); + } + /** * Populate a {@link MessageFilter} with {@link MessageSelector} for the provided SpEL expression. * @param expression the SpEL expression. @@ -2787,7 +2849,7 @@ protected Publisher> toReactivePublisher(boolean autoStartOnSubsc } protected > B register(S endpointSpec, - @Nullable Consumer endpointConfigurer) { + @Nullable Consumer endpointConfigurer) { if (endpointConfigurer != null) { endpointConfigurer.accept(endpointSpec); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java index de2ce2ca78b..8f3f6e3bfd5 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java @@ -50,6 +50,12 @@ public abstract class IntegrationFlowDefinition + * {@code + * .transformWith((transformerSpec) -> transformerSpec.function(genericTransformer)) + * } + * * @param genericTransformer the {@link GenericTransformer} to populate. * @param the source type - 'transform from'. * @param the target type - 'transform to'. @@ -58,7 +64,7 @@ public abstract class IntegrationFlowDefinition B transform(GenericTransformer genericTransformer) { - return transform(null, genericTransformer); + return transformWith((transformerSpec) -> transformerSpec.transformer(genericTransformer)); } @@ -66,18 +72,21 @@ public B transform(GenericTransformer genericTransformer) { * Populate the {@link MessageTransformingHandler} instance for the provided * {@link GenericTransformer}. In addition, accept options for the integration endpoint * using {@link GenericEndpointSpec}. Use - * {@link #transform(Class, GenericTransformer, Consumer)} if you need to access the - * entire message. + * {@code .transform((transformerSpec) -> transformerSpec.function(genericTransformer).expectedType(Message.class))} + * if you need to access the entire message. * @param genericTransformer the {@link GenericTransformer} to populate. * @param endpointConfigurer the {@link Consumer} to provide integration endpoint * options. * @param the source type - 'transform from'. * @param the target type - 'transform to'. * @return the current {@link IntegrationFlowDefinition}. + * @deprecated since 6.2 in favor of {@link #transformWith(Consumer)} * @see org.springframework.integration.transformer.MethodInvokingTransformer * @see org.springframework.integration.handler.LambdaMessageProcessor * @see GenericEndpointSpec */ + @Deprecated(since = "6.2", forRemoval = true) + @SuppressWarnings("removal") public B transform(GenericTransformer genericTransformer, Consumer> endpointConfigurer) { @@ -108,7 +117,7 @@ public

B filter(GenericSelector

genericSelector) { * Populate a {@link org.springframework.integration.filter.MessageFilter} * with {@link org.springframework.integration.filter.MethodInvokingSelector} * for the provided {@link GenericSelector}. - * In addition accept options for the integration endpoint using {@link FilterEndpointSpec}. + * In addition, accept options for the integration endpoint using {@link FilterEndpointSpec}. * Typically used with a Java 8 Lambda expression: *

 	 * {@code
@@ -152,7 +161,7 @@ public 

B handle(GenericHandler

handler) { * Populate a {@link ServiceActivatingHandler} for the * {@link org.springframework.integration.handler.MethodInvokingMessageProcessor} * to invoke the provided {@link GenericHandler} at runtime. - * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}. * Typically used with a Java 8 Lambda expression: *

 	 * {@code
@@ -177,7 +186,7 @@ public 

B handle(GenericHandler

handler, /** * Populate the {@link MethodInvokingSplitter} to evaluate the provided * {@link Function} at runtime. - * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}. * Typically used with a Java 8 Lambda expression: *

 	 * {@code
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/TransformerSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/TransformerSpec.java
new file mode 100644
index 00000000000..f4b34b2970f
--- /dev/null
+++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/TransformerSpec.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2023-2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.dsl;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.springframework.expression.Expression;
+import org.springframework.integration.core.GenericTransformer;
+import org.springframework.integration.handler.BeanNameMessageProcessor;
+import org.springframework.integration.handler.LambdaMessageProcessor;
+import org.springframework.integration.handler.MessageProcessor;
+import org.springframework.integration.transformer.ExpressionEvaluatingTransformer;
+import org.springframework.integration.transformer.MessageTransformingHandler;
+import org.springframework.integration.transformer.MethodInvokingTransformer;
+import org.springframework.integration.transformer.Transformer;
+import org.springframework.integration.util.ClassUtils;
+import org.springframework.lang.Nullable;
+import org.springframework.util.Assert;
+
+/**
+ * A {@link ConsumerEndpointSpec} for a {@link MessageTransformingHandler} options.
+ * One of the {@link #expression(String)}, {@link #ref(Object)}, {@link #refName(String)},
+ * {@link #processor(MessageProcessorSpec)} or {@link #transformer(GenericTransformer)} must be provided.
+ *
+ * @author Artem Bilan
+ *
+ * @since 6.2
+ */
+public class TransformerSpec extends ConsumerEndpointSpec {
+
+	private final AtomicBoolean transformerSet = new AtomicBoolean();
+
+	private Expression expression;
+
+	private Object ref;
+
+	private String refName;
+
+	@Nullable
+	private String method;
+
+	private GenericTransformer transformer;
+
+	@Nullable
+	private Class expectedType;
+
+	private MessageProcessorSpec processor;
+
+	protected TransformerSpec() {
+		super(new MessageTransformingHandler());
+	}
+
+	public TransformerSpec expression(String expression) {
+		return expression(PARSER.parseExpression(expression));
+	}
+
+	public TransformerSpec expression(Expression expression) {
+		assertTransformerSet();
+		this.expression = expression;
+		return this;
+	}
+
+	public TransformerSpec ref(Object ref) {
+		assertTransformerSet();
+		this.ref = ref;
+		return this;
+	}
+
+	public TransformerSpec refName(String refName) {
+		assertTransformerSet();
+		this.refName = refName;
+		return this;
+	}
+
+	public TransformerSpec method(@Nullable String method) {
+		this.method = method;
+		return this;
+	}
+
+	public  TransformerSpec transformer(GenericTransformer transformer) {
+		assertTransformerSet();
+		this.transformer = transformer;
+		return this;
+	}
+
+	/**
+	 * Set a {@link GenericTransformer} input argument type.
+	 * Can be a {@link org.springframework.messaging.Message}.
+	 * Ignored for all other transformers, but {@link #transformer(GenericTransformer)}.
+	 * @param expectedType the {@link GenericTransformer} input argument type.
+	 * @param 

the type ot expect. + * @return the spec. + */ + public

TransformerSpec expectedType(@Nullable Class

expectedType) { + this.expectedType = expectedType; + return this; + } + + public TransformerSpec processor(MessageProcessorSpec processor) { + assertTransformerSet(); + this.processor = processor; + return this; + } + + private void assertTransformerSet() { + Assert.isTrue(this.transformerSet.compareAndSet(false, true), this::assertMessage); + } + + private String assertMessage() { + String currentTransformerValue = null; + if (this.expression != null) { + currentTransformerValue = "'expression'=" + this.expression; + } + else if (this.ref != null) { + currentTransformerValue = "'ref'=" + this.ref; + } + else if (this.refName != null) { + currentTransformerValue = "'refName'=" + this.refName; + } + else if (this.transformer != null) { + currentTransformerValue = "'transformer'=" + this.transformer; + } + else if (this.processor != null) { + currentTransformerValue = "'processor'=" + this.processor; + } + return "Only one of the 'expression', 'ref', 'refName', 'processor' or 'transformer' can be set. " + + "Current one is " + currentTransformerValue; + } + + @Override + public Map getComponentsToRegister() { + Transformer transformer; + if (this.expression != null) { + transformer = new ExpressionEvaluatingTransformer(this.expression); + } + else if (this.ref != null) { + if (this.method != null) { + transformer = new MethodInvokingTransformer(this.ref, this.method); + } + else { + transformer = new MethodInvokingTransformer(this.ref); + } + } + else if (this.refName != null) { + transformer = new MethodInvokingTransformer(new BeanNameMessageProcessor<>(this.refName, this.method)); + } + else if (this.processor != null) { + MessageProcessor targetProcessor = this.processor.getObject(); + this.componentsToRegister.put(targetProcessor, null); + transformer = new MethodInvokingTransformer(targetProcessor); + } + else if (this.transformer != null) { + transformer = wrapToTransformerIfAny(); + } + else { + throw new IllegalStateException( + "One of the 'expression', 'ref', 'refName', 'processor' or 'transformer' must be provided."); + } + + this.handler.setTransformer(transformer); + + this.componentsToRegister.put(transformer, null); + return super.getComponentsToRegister(); + } + + private Transformer wrapToTransformerIfAny() { + return this.transformer instanceof Transformer ? (Transformer) this.transformer : + (ClassUtils.isLambda(this.transformer.getClass()) + ? new MethodInvokingTransformer(new LambdaMessageProcessor(this.transformer, this.expectedType)) + : new MethodInvokingTransformer(this.transformer, ClassUtils.TRANSFORMER_TRANSFORM_METHOD)); + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/transformer/MessageTransformingHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/transformer/MessageTransformingHandler.java index 4948e8230be..96987516b97 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/transformer/MessageTransformingHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/transformer/MessageTransformingHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +41,16 @@ */ public class MessageTransformingHandler extends AbstractReplyProducingMessageHandler implements ManageableLifecycle { - private final Transformer transformer; + private Transformer transformer; + + /** + * Create a {@link MessageTransformingHandler} instance. + * The target delegate {@link Transformer} must be provided then via setter. + * @since 6.2 + */ + public MessageTransformingHandler() { + setRequiresReply(true); + } /** * Create a {@link MessageTransformingHandler} instance that delegates to @@ -49,11 +58,20 @@ public class MessageTransformingHandler extends AbstractReplyProducingMessageHan * @param transformer The transformer. */ public MessageTransformingHandler(Transformer transformer) { + this(); Assert.notNull(transformer, "transformer must not be null"); this.transformer = transformer; - this.setRequiresReply(true); } + /** + * Provide a {@link Transformer} delegate. + * @param transformer the {@link Transformer} to use. + * @since 6.2 + */ + public void setTransformer(Transformer transformer) { + Assert.notNull(transformer, "transformer must not be null"); + this.transformer = transformer; + } @Override public String getComponentType() { @@ -77,6 +95,7 @@ public void addNotPropagatedHeaders(String... headers) { @Override protected void doInit() { + Assert.notNull(this.transformer, "transformer must not be null"); BeanFactory beanFactory = getBeanFactory(); if (beanFactory != null && this.transformer instanceof BeanFactoryAware) { ((BeanFactoryAware) this.transformer).setBeanFactory(beanFactory); diff --git a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt index 1dab770f789..24ac93b4560 100644 --- a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt +++ b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt @@ -95,6 +95,7 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ * Inline function for [IntegrationFlowDefinition.transform] providing a `transform()` variant * with reified generic type. */ + @Suppress("DEPRECATION") inline fun transform( crossinline function: (P) -> Any, crossinline configurer: GenericEndpointSpec.() -> Unit @@ -305,6 +306,7 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ * for the provided `Transformer` instance. * @since 5.3.1 */ + @Suppress("DEPRECATION") fun transform( transformer: Transformer, endpointConfigurer: GenericEndpointSpec.() -> Unit = {} @@ -317,6 +319,7 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ * Populate the [Transformer] EI Pattern specific [MessageHandler] implementation * for the SpEL [Expression]. */ + @Suppress("DEPRECATION") fun transform( expression: String, endpointConfigurer: GenericEndpointSpec.() -> Unit = {} @@ -337,6 +340,7 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ * Populate the [MessageTransformingHandler] for the [MethodInvokingTransformer] * to invoke the service method at runtime. */ + @Suppress("DEPRECATION") fun transform( service: Any, methodName: String?, endpointConfigurer: GenericEndpointSpec.() -> Unit @@ -350,6 +354,7 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ * [org.springframework.integration.handler.MessageProcessor] from provided [MessageProcessorSpec]. * In addition, accept options for the integration endpoint using [GenericEndpointSpec]. */ + @Suppress("DEPRECATION") fun transform( messageProcessorSpec: MessageProcessorSpec<*>, endpointConfigurer: GenericEndpointSpec.() -> Unit = {} diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java index 79bb09afe96..0b88061e2dc 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java @@ -64,6 +64,7 @@ import org.springframework.integration.dsl.PollerSpec; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.dsl.QueueChannelSpec; +import org.springframework.integration.dsl.TransformerSpec; import org.springframework.integration.dsl.Transformers; import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.endpoint.EventDrivenConsumer; @@ -671,17 +672,24 @@ public IntegrationFlow flow2() { .fixedSubscriberChannel() .transform(Integer::parseInt) .transform(Foo::new) - .transform(new PayloadSerializingTransformer(), - c -> c.autoStartup(false).id("payloadSerializingTransformer")) + .transformWith(this::payloadSerializingTransformer) .channel(MessageChannels.queue(new SimpleMessageStore(), "fooQueue")) .transform(Transformers.deserializer(Foo.class.getName())) .transform(f -> f.value) .filter("true", e -> e.id("expressionFilter")) .channel(publishSubscribeChannel()) - .transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice())) + .transformWith(t -> t + .transformer((Integer p) -> p * 2) + .advice(expressionAdvice())) .get(); } + private void payloadSerializingTransformer(TransformerSpec spec) { + spec.transformer(new PayloadSerializingTransformer()) + .autoStartup(false) + .id("payloadSerializingTransformer"); + } + @Bean public MessageChannel publishSubscribeChannel() { return new PublishSubscribeChannel(); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/manualflow/ManualFlowTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/manualflow/ManualFlowTests.java index 694d62de371..646eb01ffc1 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/manualflow/ManualFlowTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/manualflow/ManualFlowTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -190,7 +190,8 @@ public void testManualFlowRegistration() throws InterruptedException { IntegrationFlow myFlow = f -> f .transform(String::toUpperCase) .channel(MessageChannels.queue()) - .transform("Hello, "::concat, e -> e + .transformWith(t -> t + .transformer("Hello, "::concat) .poller(p -> p .fixedDelay(10) .maxMessagesPerPoll(1) diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java index 4af374c7110..ef32cf87af6 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -262,8 +262,10 @@ public Publisher> pollableReactiveFlow() { return IntegrationFlow .from("inputChannel") .split(s -> s.delimiters(",")) - .transform(Integer::parseInt, - e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())).id("reactiveTransformer")) + .transformWith(t -> t + .transformer(Integer::parseInt) + .reactive(flux -> flux.publishOn(Schedulers.parallel())) + .id("reactiveTransformer")) .channel(MessageChannels.queue()) .log() .toReactivePublisher(); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java index 4bdaeb1401a..5b86c9e3c21 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java @@ -460,10 +460,11 @@ public ExpressionEvaluatingRequestHandlerAdvice expressionAdvice() { @Bean public IntegrationFlow transformFlowWithError() { return f -> f - .transform(p -> { - throw new RuntimeException("intentional"); - }, - e -> e.advice(expressionAdvice())) + .transformWith((t) -> + t.transformer(p -> { + throw new RuntimeException("intentional"); + }) + .advice(expressionAdvice())) .log(); } From 62c21aea4fc226ec81df2428552af3b88d7266fd Mon Sep 17 00:00:00 2001 From: abilan Date: Tue, 20 Jun 2023 10:12:23 -0400 Subject: [PATCH 2/3] * Use new `transformWith()` in tests where a deprecated API still used * Add JavaDocs to `TransformerSpec` * Fix generic types for `BaseIntegrationFlowDefinition.transformWith()` - they make sense exactly on the `TransformerSpec.transformer()` only * Apply `transformWith()` for Groovy DSL * Introduce a new `ClassUtils.isLambda(Object candidate)` and add a check for Groovy `Closure` * Fix `GroovyIntegrationFlowDefinition.createConfigurerIfAny()` to propagate a `Consumer` argument down to the `Closure` --- .../dsl/BaseIntegrationFlowDefinition.java | 2 - .../integration/dsl/TransformerSpec.java | 41 ++++++++++++++++++- .../integration/util/ClassUtils.java | 16 +++++++- .../integration/file/dsl/FileTests.java | 5 ++- .../GroovyIntegrationFlowDefinition.groovy | 31 +++++++++++++- .../groovy/dsl/test/GroovyDslTests.groovy | 36 ++++++++++------ .../WebFluxObservationPropagationTests.java | 4 +- 7 files changed, 115 insertions(+), 20 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java index ee863ce49c3..803aca6db12 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java @@ -783,8 +783,6 @@ public B transform(@Nullable Class

expectedType, GenericTransformer the payload type - 'transform from', or {@code Message.class}. - * @param the target type - 'transform to'. * @return the current {@link BaseIntegrationFlowDefinition}. * @since 6.2 */ diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/TransformerSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/TransformerSpec.java index f4b34b2970f..cf5377b387a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/TransformerSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/TransformerSpec.java @@ -65,33 +65,67 @@ protected TransformerSpec() { super(new MessageTransformingHandler()); } + /** + * Provide an expression to use an {@link ExpressionEvaluatingTransformer} for the target handler. + * @param expression the SpEL expression to use. + * @return the TransformerSpec + */ public TransformerSpec expression(String expression) { return expression(PARSER.parseExpression(expression)); } + /** + * Provide an expression to use an {@link ExpressionEvaluatingTransformer} for the target handler. + * @param expression the SpEL expression to use. + * @return the TransformerSpec + */ public TransformerSpec expression(Expression expression) { assertTransformerSet(); this.expression = expression; return this; } + /** + * Provide a service to use a {@link MethodInvokingTransformer} for the target handler. + * @param ref the service to call as a transformer POJO. + * @return the TransformerSpec + */ public TransformerSpec ref(Object ref) { assertTransformerSet(); this.ref = ref; return this; } + /** + * Provide a bean name to use a {@link MethodInvokingTransformer} + * (based on {@link BeanNameMessageProcessor})for the target handler. + * @param refName the bean name for service to call as a transformer POJO. + * @return the TransformerSpec + */ public TransformerSpec refName(String refName) { assertTransformerSet(); this.refName = refName; return this; } + /** + * Provide a service method name to call. Optional. + * Use only together with {@link #ref(Object)} or {@link #refName(String)}. + * @param method the service method name to call. + * @return the TransformerSpec + */ public TransformerSpec method(@Nullable String method) { this.method = method; return this; } + /** + * Provide a {@link GenericTransformer} as a direct delegate for {@link MessageTransformingHandler}. + * @param transformer the {@link GenericTransformer} instance to use. + * @param

the input type. + * @param the output type. + * @return the TransformerSpec + */ public TransformerSpec transformer(GenericTransformer transformer) { assertTransformerSet(); this.transformer = transformer; @@ -111,6 +145,11 @@ public

TransformerSpec expectedType(@Nullable Class

expectedType) { return this; } + /** + * Provide a {@link MessageProcessorSpec} as a factory for {@link MethodInvokingTransformer} delegate. + * @param processor the {@link MessageProcessorSpec} to use. + * @return the TransformerSpec + */ public TransformerSpec processor(MessageProcessorSpec processor) { assertTransformerSet(); this.processor = processor; @@ -180,7 +219,7 @@ else if (this.transformer != null) { private Transformer wrapToTransformerIfAny() { return this.transformer instanceof Transformer ? (Transformer) this.transformer : - (ClassUtils.isLambda(this.transformer.getClass()) + (ClassUtils.isLambda(this.transformer) ? new MethodInvokingTransformer(new LambdaMessageProcessor(this.transformer, this.expectedType)) : new MethodInvokingTransformer(this.transformer, ClassUtils.TRANSFORMER_TRANSFORM_METHOD)); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/util/ClassUtils.java b/spring-integration-core/src/main/java/org/springframework/integration/util/ClassUtils.java index c42bf1f904a..947c565fd82 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/util/ClassUtils.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/util/ClassUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.integration.util; import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -223,6 +224,19 @@ public static Class resolvePrimitiveType(Class clazz) { return PRIMITIVE_WRAPPER_TYPE_MAP.get(clazz); } + /** + * Check if object is Java, Kotlin or Groovy lambda. + * @param candidate the {@link Object} to check. + * @return true if object is a Java, Kotlin or Groovy lambda. + * @since 6.2 + */ + public static boolean isLambda(Object candidate) { + Class aClass = candidate.getClass(); + return isLambda(aClass) || + (Proxy.isProxyClass(aClass) // Groovy Closure is a Lambda in Java terms + && Proxy.getInvocationHandler(candidate).getClass().getSimpleName().equals("ConvertedClosure")); + } + /** * Check if class is Java or Kotlin lambda. * @param aClass the {@link Class} to check. diff --git a/spring-integration-file/src/test/java/org/springframework/integration/file/dsl/FileTests.java b/spring-integration-file/src/test/java/org/springframework/integration/file/dsl/FileTests.java index ae24e92c88e..8bfc11d62ac 100644 --- a/spring-integration-file/src/test/java/org/springframework/integration/file/dsl/FileTests.java +++ b/spring-integration-file/src/test/java/org/springframework/integration/file/dsl/FileTests.java @@ -448,8 +448,9 @@ void pollDirectories(File... directories) { .from(Files.inboundAdapter(directory).recursive(true), e -> e.poller(p -> p.fixedDelay(1000)) .id(directory.getName() + ".adapter")) - .transform(Files.toStringTransformer(), - e -> e.id(directory.getName() + ".transformer")) + .transformWith(t -> t + .transformer(Files.toStringTransformer()) + .id(directory.getName() + ".transformer")) .channel(this.dynamicAdaptersResult) .get(); this.beanFactory.initializeBean(integrationFlow, directory.getName()); diff --git a/spring-integration-groovy/src/main/groovy/org/springframework/integration/groovy/dsl/GroovyIntegrationFlowDefinition.groovy b/spring-integration-groovy/src/main/groovy/org/springframework/integration/groovy/dsl/GroovyIntegrationFlowDefinition.groovy index a6c0a48ed89..5d6515bf7a0 100644 --- a/spring-integration-groovy/src/main/groovy/org/springframework/integration/groovy/dsl/GroovyIntegrationFlowDefinition.groovy +++ b/spring-integration-groovy/src/main/groovy/org/springframework/integration/groovy/dsl/GroovyIntegrationFlowDefinition.groovy @@ -47,6 +47,7 @@ import org.springframework.integration.dsl.ResequencerSpec import org.springframework.integration.dsl.RouterSpec import org.springframework.integration.dsl.ScatterGatherSpec import org.springframework.integration.dsl.SplitterEndpointSpec +import org.springframework.integration.dsl.TransformerSpec import org.springframework.integration.dsl.WireTapSpec import org.springframework.integration.filter.MethodInvokingSelector import org.springframework.integration.handler.BridgeHandler @@ -289,8 +290,10 @@ class GroovyIntegrationFlowDefinition { * for the SpEL {@link org.springframework.expression.Expression}. * @param expression the {@code Transformer} {@link org.springframework.expression.Expression}. * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. * @see org.springframework.integration.transformer.ExpressionEvaluatingTransformer */ + @Deprecated(since = '6.2', forRemoval = true) GroovyIntegrationFlowDefinition transform( String expression, @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) @@ -308,8 +311,10 @@ class GroovyIntegrationFlowDefinition { * @param service the service to use. * @param methodName the method to invoke. * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @deprecated since 6.2 in favor of {@link #transform(Closure)} * @see ExpressionEvaluatingTransformer */ + @Deprecated(since = '6.2', forRemoval = true) GroovyIntegrationFlowDefinition transform( Object service, String methodName = null, @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) @@ -326,8 +331,10 @@ class GroovyIntegrationFlowDefinition { * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. * @param messageProcessorSpec the {@link MessageProcessorSpec} to use. * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @deprecated since 6.2 in favor of {@link #transform(Closure)} * @see MethodInvokingTransformer */ + @Deprecated(since = '6.2', forRemoval = true) GroovyIntegrationFlowDefinition transform( MessageProcessorSpec messageProcessorSpec, @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) @@ -338,6 +345,24 @@ class GroovyIntegrationFlowDefinition { this } + /** + * Populate the {@link MessageTransformingHandler} instance for the + * {@link org.springframework.integration.handler.MessageProcessor} from provided {@link MessageProcessorSpec}. + * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param messageProcessorSpec the {@link MessageProcessorSpec} to use. + * @param transformerConfigurer the {@link Consumer} to provide integration endpoint options. + * @see MethodInvokingTransformer + * @since 6.2 + */ + GroovyIntegrationFlowDefinition transform( + @DelegatesTo(value = TransformerSpec, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, options = 'org.springframework.integration.dsl.TransformerSpec') + Closure transformerConfigurer) { + + this.delegate.transformWith createConfigurerIfAny(transformerConfigurer) + this + } + /** * Populate the {@link MessageTransformingHandler} instance * for the provided {@code payloadType} to convert at runtime. @@ -367,7 +392,9 @@ class GroovyIntegrationFlowDefinition { * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. * @param < P > the payload type - 'transform from', or {@code Message.class}. * @param < T > the target type - 'transform to'. + * @deprecated since 6.2 in favor of {@link #transform(Closure)} */ + @Deprecated(since = '6.2', forRemoval = true) GroovyIntegrationFlowDefinition transform( GenericTransformer genericTransformer, @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) @@ -389,7 +416,9 @@ class GroovyIntegrationFlowDefinition { * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. * @param < P > the payload type - 'transform from', or {@code Message.class}. * @param < T > the target type - 'transform to'. + * @deprecated since 6.2 in favor of {@link #transform(Closure)} */ + @Deprecated(since = '6.2', forRemoval = true) GroovyIntegrationFlowDefinition transform( Class

expectedType, GenericTransformer genericTransformer, @@ -1336,7 +1365,7 @@ class GroovyIntegrationFlowDefinition { return { closure.delegate = it closure.resolveStrategy = Closure.DELEGATE_FIRST - closure() + closure(it) } as Consumer } null diff --git a/spring-integration-groovy/src/test/groovy/org/springframework/integration/groovy/dsl/test/GroovyDslTests.groovy b/spring-integration-groovy/src/test/groovy/org/springframework/integration/groovy/dsl/test/GroovyDslTests.groovy index 9b3c671c581..76399d3fbb5 100644 --- a/spring-integration-groovy/src/test/groovy/org/springframework/integration/groovy/dsl/test/GroovyDslTests.groovy +++ b/spring-integration-groovy/src/test/groovy/org/springframework/integration/groovy/dsl/test/GroovyDslTests.groovy @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -130,11 +130,14 @@ class GroovyDslTests { def publisher = Flux.just(2, 3).map { new GenericMessage<>(it) } def integrationFlow = - integrationFlow(publisher) - { - transform Message, { it.payload * 2 }, { id 'foo' } - channel fluxChannel - } + integrationFlow(publisher) { + transform { + it., Integer>transformer { it.payload * 2 } + expectedType Message + id 'foo' + } + channel fluxChannel + } def registration = this.integrationFlowContext.registration(integrationFlow).register() @@ -217,7 +220,7 @@ class GroovyDslTests { assert groovyTestService.result.get() == 'TEST' } - @Configuration + @Configuration(proxyBeanMethods = false) @EnableIntegration static class Config { @@ -240,7 +243,9 @@ class GroovyDslTests { requestReplyFlow() { integrationFlow { fluxTransform { it.map { it } } - transform String, { it.toUpperCase() } + transform { + transformer { it.toUpperCase() } + } } } @@ -257,8 +262,13 @@ class GroovyDslTests { integrationFlow Function, { beanName 'functionGateway' }, { - transform Transformers.objectToString(), { id 'objectToStringTransformer' } - transform String, { it.toUpperCase() } + transform { + transformer Transformers.objectToString() + id 'objectToStringTransformer' + } + transform { + transformer { it.toUpperCase() } + } split Message, { it.payload } split Object, { it }, { id 'splitterEndpoint' } resequence() @@ -314,11 +324,13 @@ class GroovyDslTests { wireTap integrationFlow { channel { queue 'wireTapChannel' } } - delay { + delay { messageGroupId 'delayGroup' defaultDelay 100 } - transform String, { it.toUpperCase() } + transform { + transformer { it.toUpperCase() } + } } } diff --git a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/observation/WebFluxObservationPropagationTests.java b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/observation/WebFluxObservationPropagationTests.java index 7588c62c6c0..fcfa5edaa21 100644 --- a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/observation/WebFluxObservationPropagationTests.java +++ b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/observation/WebFluxObservationPropagationTests.java @@ -212,7 +212,9 @@ IntegrationFlow webFluxRequestReplyFlow( .payloadExpression("#requestParams.name[0]") .requestChannel(webFluxRequestChannel) .id("webFluxGateway")) - .transform(String::toLowerCase, e -> e.id("testTransformer")) + .transformWith(t -> t + .transformer(String::toLowerCase) + .id("testTransformer")) .get(); } From aa219bf2e6ca28b964dc5df951152abbd6958608 Mon Sep 17 00:00:00 2001 From: abilan Date: Fri, 23 Jun 2023 14:56:12 -0400 Subject: [PATCH 3/3] * Rename `TransformerSpec -> TransformerEndpointSpec` for better context meaning of the class * Introduce `KotlinTransformerEndpointSpec` as an extension of the `TransformerEndpointSpec` to have an `inline fun transformer(crossinline function: (P) -> Any)` for Kotlin style * Add `KotlinIntegrationFlowDefinition.transformWith(KotlinTransformerEndpointSpec)` * Deprecate Kotlin methods which are covered by the mentioned `transformWith()` * Fix tests to use new API * Mentioned the change in the doc --- .../dsl/BaseIntegrationFlowDefinition.java | 6 +- ...Spec.java => TransformerEndpointSpec.java} | 22 ++--- .../dsl/KotlinIntegrationFlowDefinition.kt | 83 +++++++++++++++++-- .../dsl/KotlinTransformerEndpointSpec.kt | 42 ++++++++++ .../dsl/flows/IntegrationFlowTests.java | 4 +- .../integration/dsl/KotlinDslTests.kt | 15 +++- .../GroovyIntegrationFlowDefinition.groovy | 6 +- src/reference/asciidoc/dsl.adoc | 22 +++-- src/reference/asciidoc/groovy-dsl.adoc | 15 +++- src/reference/asciidoc/kotlin-dsl.adoc | 4 +- src/reference/asciidoc/whats-new.adoc | 3 + 11 files changed, 182 insertions(+), 40 deletions(-) rename spring-integration-core/src/main/java/org/springframework/integration/dsl/{TransformerSpec.java => TransformerEndpointSpec.java} (89%) create mode 100644 spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinTransformerEndpointSpec.kt diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java index 803aca6db12..9447f575928 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java @@ -781,13 +781,13 @@ public B transform(@Nullable Class

expectedType, GenericTransformer transformerConfigurer) { - return register(new TransformerSpec(), transformerConfigurer); + public B transformWith(Consumer transformerConfigurer) { + return register(new TransformerEndpointSpec(), transformerConfigurer); } /** diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/TransformerSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/TransformerEndpointSpec.java similarity index 89% rename from spring-integration-core/src/main/java/org/springframework/integration/dsl/TransformerSpec.java rename to spring-integration-core/src/main/java/org/springframework/integration/dsl/TransformerEndpointSpec.java index cf5377b387a..8ccb8fc1d52 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/TransformerSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/TransformerEndpointSpec.java @@ -41,7 +41,7 @@ * * @since 6.2 */ -public class TransformerSpec extends ConsumerEndpointSpec { +public class TransformerEndpointSpec extends ConsumerEndpointSpec { private final AtomicBoolean transformerSet = new AtomicBoolean(); @@ -61,7 +61,7 @@ public class TransformerSpec extends ConsumerEndpointSpec processor; - protected TransformerSpec() { + protected TransformerEndpointSpec() { super(new MessageTransformingHandler()); } @@ -70,7 +70,7 @@ protected TransformerSpec() { * @param expression the SpEL expression to use. * @return the TransformerSpec */ - public TransformerSpec expression(String expression) { + public TransformerEndpointSpec expression(String expression) { return expression(PARSER.parseExpression(expression)); } @@ -79,7 +79,7 @@ public TransformerSpec expression(String expression) { * @param expression the SpEL expression to use. * @return the TransformerSpec */ - public TransformerSpec expression(Expression expression) { + public TransformerEndpointSpec expression(Expression expression) { assertTransformerSet(); this.expression = expression; return this; @@ -90,7 +90,7 @@ public TransformerSpec expression(Expression expression) { * @param ref the service to call as a transformer POJO. * @return the TransformerSpec */ - public TransformerSpec ref(Object ref) { + public TransformerEndpointSpec ref(Object ref) { assertTransformerSet(); this.ref = ref; return this; @@ -98,11 +98,11 @@ public TransformerSpec ref(Object ref) { /** * Provide a bean name to use a {@link MethodInvokingTransformer} - * (based on {@link BeanNameMessageProcessor})for the target handler. + * (based on {@link BeanNameMessageProcessor}) for the target handler. * @param refName the bean name for service to call as a transformer POJO. * @return the TransformerSpec */ - public TransformerSpec refName(String refName) { + public TransformerEndpointSpec refName(String refName) { assertTransformerSet(); this.refName = refName; return this; @@ -114,7 +114,7 @@ public TransformerSpec refName(String refName) { * @param method the service method name to call. * @return the TransformerSpec */ - public TransformerSpec method(@Nullable String method) { + public TransformerEndpointSpec method(@Nullable String method) { this.method = method; return this; } @@ -126,7 +126,7 @@ public TransformerSpec method(@Nullable String method) { * @param the output type. * @return the TransformerSpec */ - public TransformerSpec transformer(GenericTransformer transformer) { + public TransformerEndpointSpec transformer(GenericTransformer transformer) { assertTransformerSet(); this.transformer = transformer; return this; @@ -140,7 +140,7 @@ public TransformerSpec transformer(GenericTransformer transformer) * @param

the type ot expect. * @return the spec. */ - public

TransformerSpec expectedType(@Nullable Class

expectedType) { + public

TransformerEndpointSpec expectedType(@Nullable Class

expectedType) { this.expectedType = expectedType; return this; } @@ -150,7 +150,7 @@ public

TransformerSpec expectedType(@Nullable Class

expectedType) { * @param processor the {@link MessageProcessorSpec} to use. * @return the TransformerSpec */ - public TransformerSpec processor(MessageProcessorSpec processor) { + public TransformerEndpointSpec processor(MessageProcessorSpec processor) { assertTransformerSet(); this.processor = processor; return this; diff --git a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt index 24ac93b4560..4d81bac3216 100644 --- a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt +++ b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt @@ -92,10 +92,16 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ } /** - * Inline function for [IntegrationFlowDefinition.transform] providing a `transform()` variant + * Inline function for [IntegrationFlowDefinition.transform] providing a `transform()` variant * with reified generic type. */ - @Suppress("DEPRECATION") + @Deprecated("since 6.2", + ReplaceWith(""" + transformWith { + transformer { } + id("value") + }""")) + @Suppress("DEPRECATION", "REMOVAL") inline fun transform( crossinline function: (P) -> Any, crossinline configurer: GenericEndpointSpec.() -> Unit @@ -104,6 +110,16 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ this.delegate.transform(P::class.java, { function(it) }) { configurer(it) } } + /** + * Inline function for [IntegrationFlowDefinition.transformWith] + * providing a `transform()` variant + * with reified generic type. + * @since 6.2 + */ + fun transformWith(configurer: KotlinTransformerEndpointSpec.() -> Unit) { + this.delegate.register(KotlinTransformerEndpointSpec(), configurer) + } + /** * Inline function for [IntegrationFlowDefinition.split] providing a `split()` variant * with reified generic type. @@ -306,7 +322,13 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ * for the provided `Transformer` instance. * @since 5.3.1 */ - @Suppress("DEPRECATION") + @Deprecated("since 6.2", + ReplaceWith(""" + transformWith { + transformer(transformer) + id("value") + }""")) + @Suppress("DEPRECATION", "REMOVAL") fun transform( transformer: Transformer, endpointConfigurer: GenericEndpointSpec.() -> Unit = {} @@ -315,11 +337,26 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ this.delegate.transform(transformer) { endpointConfigurer(it) } } + /** + * Populate the [Transformer] EI Pattern specific [MessageHandler] implementation + * for the provided [Transformer] instance. + * @since 6.2 + */ + fun transform(transformer: Transformer) { + this.delegate.transform(transformer) + } + /** * Populate the [Transformer] EI Pattern specific [MessageHandler] implementation * for the SpEL [Expression]. */ - @Suppress("DEPRECATION") + @Deprecated("since 6.2", + ReplaceWith(""" + transformWith { + expression("value") + id("value") + }""")) + @Suppress("DEPRECATION", "REMOVAL") fun transform( expression: String, endpointConfigurer: GenericEndpointSpec.() -> Unit = {} @@ -328,6 +365,16 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ this.delegate.transform(expression, endpointConfigurer) } + + /** + * Populate the [Transformer] EI Pattern specific [MessageHandler] implementation + * for the SpEL [Expression]. + * @since 6.2 + */ + fun transform(expression: String) { + this.delegate.transform(expression) + } + /** * Populate the [MessageTransformingHandler] for the [MethodInvokingTransformer] * to invoke the service method at runtime. @@ -340,7 +387,14 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ * Populate the [MessageTransformingHandler] for the [MethodInvokingTransformer] * to invoke the service method at runtime. */ - @Suppress("DEPRECATION") + @Deprecated("since 6.2", + ReplaceWith(""" + transformWith { + ref("value") + method("value") + id("value") + }""")) + @Suppress("DEPRECATION", "REMOVAL") fun transform( service: Any, methodName: String?, endpointConfigurer: GenericEndpointSpec.() -> Unit @@ -354,7 +408,13 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ * [org.springframework.integration.handler.MessageProcessor] from provided [MessageProcessorSpec]. * In addition, accept options for the integration endpoint using [GenericEndpointSpec]. */ - @Suppress("DEPRECATION") + @Deprecated("since 6.2", + ReplaceWith(""" + transformWith { + processor("value") + id("value") + }""")) + @Suppress("DEPRECATION", "REMOVAL") fun transform( messageProcessorSpec: MessageProcessorSpec<*>, endpointConfigurer: GenericEndpointSpec.() -> Unit = {} @@ -363,6 +423,15 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ this.delegate.transform(messageProcessorSpec, endpointConfigurer) } + /** + * Populate the [MessageTransformingHandler] instance for the + * [org.springframework.integration.handler.MessageProcessor] from provided [MessageProcessorSpec]. + * @since 6.2 + */ + fun transform(messageProcessorSpec: MessageProcessorSpec<*>) { + this.delegate.transform(messageProcessorSpec) + } + /** * Populate a [MessageFilter] with [MessageSelector] for the provided SpEL expression. * In addition, accept options for the integration endpoint using [KotlinFilterEndpointSpec]: @@ -571,7 +640,7 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ delay { messageGroupId(groupId) }""")) - @Suppress("DEPRECATION") + @Suppress("DEPRECATION", "REMOVAL") fun delay(groupId: String, endpointConfigurer: DelayerEndpointSpec.() -> Unit = {}) { this.delegate.delay(groupId, endpointConfigurer) } diff --git a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinTransformerEndpointSpec.kt b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinTransformerEndpointSpec.kt new file mode 100644 index 00000000000..c997a59020e --- /dev/null +++ b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinTransformerEndpointSpec.kt @@ -0,0 +1,42 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl + +import org.springframework.integration.transformer.MessageTransformingHandler + +/** + * A [TransformerEndpointSpec] wrapped for Kotlin DSL. + * + * @property delegate the [TransformerEndpointSpec] this instance is delegating to. + * + * @author Artem Bilan + * + * @since 6.2 + */ +class KotlinTransformerEndpointSpec : TransformerEndpointSpec() { + + /** + * Provide a Kotlin function as a direct delegate for [MessageTransformingHandler]. + * @param function the function instance to use. + * @param

the input type. + */ + inline fun transformer(crossinline function: (P) -> Any) { + expectedType(P::class.java) + transformer { function(it) } + } + +} diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java index 0b88061e2dc..e0f4547e25d 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java @@ -64,7 +64,7 @@ import org.springframework.integration.dsl.PollerSpec; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.dsl.QueueChannelSpec; -import org.springframework.integration.dsl.TransformerSpec; +import org.springframework.integration.dsl.TransformerEndpointSpec; import org.springframework.integration.dsl.Transformers; import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.endpoint.EventDrivenConsumer; @@ -684,7 +684,7 @@ public IntegrationFlow flow2() { .get(); } - private void payloadSerializingTransformer(TransformerSpec spec) { + private void payloadSerializingTransformer(TransformerEndpointSpec spec) { spec.transformer(new PayloadSerializingTransformer()) .autoStartup(false) .id("payloadSerializingTransformer"); diff --git a/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt b/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt index 94dc725e6c5..83fce807f10 100644 --- a/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt +++ b/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt @@ -161,7 +161,10 @@ class KotlinDslTests { val integrationFlow = integrationFlow(publisher) { - transform>({ it.payload * 2 }) { id("foo") } + transformWith { + transformer> { it.payload * 2 } + id("foo") + } channel(fluxChannel) } @@ -249,7 +252,10 @@ class KotlinDslTests { @Bean fun functionFlow() = integrationFlow>({ beanName("functionGateway") }) { - transform(Transformers.objectToString()) { id("objectToStringTransformer") } + transformWith { + transformer(Transformers.objectToString()) + id("objectToStringTransformer") + } transform { it.uppercase() } split> { it.payload } split({ it }) { id("splitterEndpoint") } @@ -292,7 +298,10 @@ class KotlinDslTests { fun fixedSubscriberFlow() = integrationFlow("fixedSubscriberInput", true) { log(LoggingHandler.Level.WARN) { it.payload } - transform("payload") { id("spelTransformer") } + transformWith { + expression("payload") + id("spelTransformer") + } } @Bean diff --git a/spring-integration-groovy/src/main/groovy/org/springframework/integration/groovy/dsl/GroovyIntegrationFlowDefinition.groovy b/spring-integration-groovy/src/main/groovy/org/springframework/integration/groovy/dsl/GroovyIntegrationFlowDefinition.groovy index 5d6515bf7a0..401f13f0951 100644 --- a/spring-integration-groovy/src/main/groovy/org/springframework/integration/groovy/dsl/GroovyIntegrationFlowDefinition.groovy +++ b/spring-integration-groovy/src/main/groovy/org/springframework/integration/groovy/dsl/GroovyIntegrationFlowDefinition.groovy @@ -47,7 +47,7 @@ import org.springframework.integration.dsl.ResequencerSpec import org.springframework.integration.dsl.RouterSpec import org.springframework.integration.dsl.ScatterGatherSpec import org.springframework.integration.dsl.SplitterEndpointSpec -import org.springframework.integration.dsl.TransformerSpec +import org.springframework.integration.dsl.TransformerEndpointSpec import org.springframework.integration.dsl.WireTapSpec import org.springframework.integration.filter.MethodInvokingSelector import org.springframework.integration.handler.BridgeHandler @@ -355,8 +355,8 @@ class GroovyIntegrationFlowDefinition { * @since 6.2 */ GroovyIntegrationFlowDefinition transform( - @DelegatesTo(value = TransformerSpec, strategy = Closure.DELEGATE_FIRST) - @ClosureParams(value = SimpleType.class, options = 'org.springframework.integration.dsl.TransformerSpec') + @DelegatesTo(value = TransformerEndpointSpec, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, options = 'org.springframework.integration.dsl.TransformerEndpointSpec') Closure transformerConfigurer) { this.delegate.transformWith createConfigurerIfAny(transformerConfigurer) diff --git a/src/reference/asciidoc/dsl.adoc b/src/reference/asciidoc/dsl.adoc index afa066b53a0..7afc3ffc48f 100644 --- a/src/reference/asciidoc/dsl.adoc +++ b/src/reference/asciidoc/dsl.adoc @@ -283,8 +283,10 @@ The following example demonstrates how to change the publishing thread from the public IntegrationFlow reactiveEndpointFlow() { return IntegrationFlow .from("inputChannel") - .transform(Integer::parseInt, - e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel()))) + .transformWith(t -> t + .transformer(Integer::parseInt) + .reactive(flux -> flux.publishOn(Schedulers.parallel())) + ) .get(); } ---- @@ -304,9 +306,13 @@ Each of them has generic arguments, so it lets you configure an endpoint and eve @Bean public IntegrationFlow flow2() { return IntegrationFlow.from(this.inputChannel) - .transform(new PayloadSerializingTransformer(), - c -> c.autoStartup(false).id("payloadSerializingTransformer")) - .transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice())) + .transformWith(t -> t + .transformer(new PayloadSerializingTransformer()) + .autoStartup(false) + .id("payloadSerializingTransformer")) + .transformWith(t -> t + .transformer((Integer p) -> p * 2) + .advice(expressionAdvice())) .get(); } ---- @@ -361,6 +367,10 @@ Nevertheless, the DSL parser takes care of bean declarations for inline objects, See https://docs.spring.io/spring-integration/api/org/springframework/integration/dsl/Transformers.html[Transformers] in the Javadoc for more information and supported factory methods. +Starting with version 6.2, a `transformWith(Consumer)` variant has been introduced to have all the transformer and its endpoint options to be configured via single builder argument. +This style gives DSL more readability and increases developer experience while modifying code. +This also make Groovy and Kotlin DSLs more straightforward. + Also see <>. [[java-dsl-inbound-adapters]] @@ -1428,4 +1438,4 @@ IntegrationFlow otherFlow() { The composition in the middle of the flow is simply achievable with an existing `gateway(IntegrationFlow)` EIP-method. This way we can build flows with any complexity by composing them from simpler, reusable logical blocks. -For example, you may add a library of `IntegrationFlow` beans as a dependency and it is just enough to have their configuration classes imported to the final project and autowired for your `IntegrationFlow` definitions. +For example, you may add a library of `IntegrationFlow` beans as a dependency, and it is just enough to have their configuration classes imported to the final project and autowired for your `IntegrationFlow` definitions. diff --git a/src/reference/asciidoc/groovy-dsl.adoc b/src/reference/asciidoc/groovy-dsl.adoc index c9a9b57d28e..cc3ba66af4e 100644 --- a/src/reference/asciidoc/groovy-dsl.adoc +++ b/src/reference/asciidoc/groovy-dsl.adoc @@ -39,7 +39,10 @@ flowLambda() { messageGroupId 'delayGroup' defaultDelay 100 } - transform String, { it.toUpperCase() } + transform { + transformer { it.toUpperCase() } + expectedType String + } } } ---- @@ -78,8 +81,14 @@ functionFlow() { integrationFlow Function, { beanName 'functionGateway' }, { - transform Transformers.objectToString(), { id 'objectToStringTransformer' } - transform String, { it.toUpperCase() } + transform { + transformer Transformers.objectToString() + id 'objectToStringTransformer' + } + transform { + transformer { it.toUpperCase() } + expectedType String + } split Message, { it.payload } split Object, { it }, { id 'splitterEndpoint' } resequence() diff --git a/src/reference/asciidoc/kotlin-dsl.adoc b/src/reference/asciidoc/kotlin-dsl.adoc index f08881dbb53..e78fd690844 100644 --- a/src/reference/asciidoc/kotlin-dsl.adoc +++ b/src/reference/asciidoc/kotlin-dsl.adoc @@ -32,7 +32,7 @@ fun flowLambda() = wireTap { handle { println(it.payload) } } - transform { it.toUpperCase() } + transform { it.toUpperCase() } } ---- ==== @@ -67,7 +67,7 @@ For example: @Bean fun functionFlow() = integrationFlow>({ beanName("functionGateway") }) { - transform { it.toUpperCase() } + transform { it.toUpperCase() } } @Bean diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 84149fd0903..17c6dbb83c4 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -28,3 +28,6 @@ See <<./debezium.adoc#debezium, Debezium Support>> for more information. - The XML configuration for `` and `@Poller` annotation now support ISO 8601 duration format for `fixed-delay`, `fixed-rate` and `initial-delay` options. See <<./endpoint.adoc#endpoint-pollingconsumer, Polling Consumer>> for more information. + +- Java, Groovy and Kotlin DSLs have now context-specific methods in the `IntegationFlowDefinition` with a single `Consumer` argument to configure an endpoint and its handler with one builder and readable options. +See, for example, `transformWith()` in <<./dsl.adoc#java-dsl, Java DSL Chapter>>.