From cb7d4d49b29583e813b3bbb963d740c9b49eaed9 Mon Sep 17 00:00:00 2001 From: abilan Date: Mon, 26 Jun 2023 14:04:56 -0400 Subject: [PATCH] Fix Kotlin DSL delegation The `ConsumerEndpointSpec` extensions for Kotlin don't delegate to the provided `endpointFactoryBean` * Introduce `KotlinConsumerEndpointSpec` extension for `ConsumerEndpointSpec` with the proper delegation to the provided spec * Use `KotlinConsumerEndpointSpec` in the Kotlin-specific `Spec` classes **Cherry-pick to `6.1.x`, `6.0.x` & `5.5.x`** --- .../dsl/AbstractKotlinRouterSpec.kt | 5 +- .../dsl/KotlinConsumerEndpointSpec.kt | 115 ++++++++++++++++++ .../integration/dsl/KotlinEnricherSpec.kt | 4 +- .../dsl/KotlinFilterEndpointSpec.kt | 4 +- .../dsl/KotlinSplitterEndpointSpec.kt | 4 +- .../integration/dsl/KotlinDslTests.kt | 10 +- 6 files changed, 131 insertions(+), 11 deletions(-) create mode 100644 spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinConsumerEndpointSpec.kt diff --git a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/AbstractKotlinRouterSpec.kt b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/AbstractKotlinRouterSpec.kt index 69987c6df66..626e0d7896e 100644 --- a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/AbstractKotlinRouterSpec.kt +++ b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/AbstractKotlinRouterSpec.kt @@ -28,9 +28,8 @@ import org.springframework.messaging.MessageChannel * * @since 5.3 */ -abstract class AbstractKotlinRouterSpec, R : AbstractMessageRouter>( - open val delegate: AbstractRouterSpec) - : ConsumerEndpointSpec(delegate.handler) { +abstract class AbstractKotlinRouterSpec, R : AbstractMessageRouter>(override val delegate: S) + : KotlinConsumerEndpointSpec(delegate) { fun ignoreSendFailures(ignoreSendFailures: Boolean) { this.delegate.ignoreSendFailures(ignoreSendFailures) diff --git a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinConsumerEndpointSpec.kt b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinConsumerEndpointSpec.kt new file mode 100644 index 00000000000..c023f184523 --- /dev/null +++ b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinConsumerEndpointSpec.kt @@ -0,0 +1,115 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl + +import org.aopalliance.aop.Advice +import org.aopalliance.intercept.MethodInterceptor +import org.reactivestreams.Publisher +import org.springframework.integration.scheduling.PollerMetadata +import org.springframework.messaging.Message +import org.springframework.messaging.MessageHandler +import org.springframework.scheduling.TaskScheduler +import org.springframework.transaction.TransactionManager +import org.springframework.transaction.interceptor.TransactionInterceptor +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +/** + * A [ConsumerEndpointSpec] wrapped for Kotlin DSL. + * + * @property delegate the [ConsumerEndpointSpec] this instance is delegating to. + * + * @author Artem Bilan + * + * @since 5.5.19 + */ +abstract class KotlinConsumerEndpointSpec, H : MessageHandler>(open val delegate: S) + : ConsumerEndpointSpec(delegate.handler) { + + override fun phase(phase: Int): S { + return this.delegate.phase(phase) + } + + override fun autoStartup(autoStartup: Boolean): S { + return this.delegate.autoStartup(autoStartup) + } + + override fun poller(pollerMetadata: PollerMetadata): S { + return this.delegate.poller(pollerMetadata) + } + + override fun reactive(): S { + return this.delegate.reactive() + } + + fun reactive(reactiveCustomizer: (Flux>) -> Publisher>) { + this.delegate.reactive(reactiveCustomizer) + } + + override fun role(role: String): S { + return this.delegate.role(role) + } + + override fun taskScheduler(taskScheduler: TaskScheduler): S { + return this.delegate.taskScheduler(taskScheduler) + } + + override fun handleMessageAdvice(vararg interceptors: MethodInterceptor?): S { + return this.delegate.handleMessageAdvice(*interceptors) + } + + override fun advice(vararg advice: Advice?): S { + return this.delegate.advice(*advice) + } + + override fun transactional(transactionManager: TransactionManager): S { + return this.delegate.transactional(transactionManager) + } + + override fun transactional(transactionManager: TransactionManager, handleMessageAdvice: Boolean): S { + return this.delegate.transactional(transactionManager, handleMessageAdvice) + } + + override fun transactional(transactionInterceptor: TransactionInterceptor): S { + return this.delegate.transactional(transactionInterceptor) + } + + override fun transactional(): S { + return this.delegate.transactional() + } + + override fun transactional(handleMessageAdvice: Boolean): S { + return this.delegate.transactional(handleMessageAdvice) + } + + fun customizeMonoReply(replyCustomizer: (Message<*>, Mono) -> Publisher) { + this.delegate.customizeMonoReply(replyCustomizer) + } + + override fun id(id: String?): S { + return this.delegate.id(id) + } + + override fun poller(pollerMetadataSpec: PollerSpec): S { + return this.delegate.poller(pollerMetadataSpec) + } + + fun poller(pollers: (PollerFactory) -> PollerSpec) { + this.delegate.poller(pollers) + } + +} diff --git a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinEnricherSpec.kt b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinEnricherSpec.kt index 7c3b83899fd..a2bcb75a43e 100644 --- a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinEnricherSpec.kt +++ b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinEnricherSpec.kt @@ -30,8 +30,8 @@ import org.springframework.messaging.MessageChannel * * @since 5.3 */ -class KotlinEnricherSpec(val delegate: EnricherSpec) - : ConsumerEndpointSpec(delegate.handler) { +class KotlinEnricherSpec(override val delegate: EnricherSpec) + : KotlinConsumerEndpointSpec(delegate) { fun requestChannel(requestChannel: MessageChannel) { this.delegate.requestChannel(requestChannel) diff --git a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinFilterEndpointSpec.kt b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinFilterEndpointSpec.kt index 228defe1c7c..7115998ec83 100644 --- a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinFilterEndpointSpec.kt +++ b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinFilterEndpointSpec.kt @@ -28,8 +28,8 @@ import org.springframework.messaging.MessageChannel * * @since 5.3 */ -class KotlinFilterEndpointSpec(val delegate: FilterEndpointSpec) - : ConsumerEndpointSpec(delegate.handler) { +class KotlinFilterEndpointSpec(override val delegate: FilterEndpointSpec) + : KotlinConsumerEndpointSpec(delegate) { fun throwExceptionOnRejection(throwExceptionOnRejection: Boolean) { this.delegate.throwExceptionOnRejection(throwExceptionOnRejection) diff --git a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinSplitterEndpointSpec.kt b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinSplitterEndpointSpec.kt index b860f410a2e..8cf449be1fb 100644 --- a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinSplitterEndpointSpec.kt +++ b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinSplitterEndpointSpec.kt @@ -28,8 +28,8 @@ import org.springframework.messaging.MessageChannel * * @since 5.3 */ -class KotlinSplitterEndpointSpec(val delegate: SplitterEndpointSpec) - : ConsumerEndpointSpec, H>(delegate.handler) { +class KotlinSplitterEndpointSpec(override val delegate: SplitterEndpointSpec) + : KotlinConsumerEndpointSpec, H>(delegate) { fun applySequence(applySequence: Boolean) { this.delegate.applySequence(applySequence) diff --git a/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt b/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt index 83fce807f10..6bb5072998f 100644 --- a/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt +++ b/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt @@ -30,6 +30,7 @@ import org.springframework.integration.channel.QueueChannel import org.springframework.integration.config.EnableIntegration import org.springframework.integration.core.MessagingTemplate import org.springframework.integration.dsl.context.IntegrationFlowContext +import org.springframework.integration.endpoint.AbstractEndpoint import org.springframework.integration.endpoint.MessageProcessorMessageSource import org.springframework.integration.handler.LoggingHandler import org.springframework.integration.scheduling.PollerMetadata @@ -69,7 +70,7 @@ class KotlinDslTests { @Test fun `convert extension`() { - assertThat(this.beanFactory.containsBean("kotlinConverter")) + assertThat(this.beanFactory.containsBean("kotlinConverter")).isTrue() val replyChannel = QueueChannel() val date = Date() @@ -93,6 +94,8 @@ class KotlinDslTests { @Test fun `uppercase function`() { assertThat(beanFactory.containsBean("objectToStringTransformer")).isTrue() + assertThat(this.beanFactory.containsBean("splitterEndpoint")).isTrue() + assertThat(this.beanFactory.getBean("splitterEndpoint", AbstractEndpoint::class.java).phase).isEqualTo(257) assertThat(this.upperCaseFunction.apply("test".toByteArray())).isEqualTo("TEST") } @@ -258,7 +261,10 @@ class KotlinDslTests { } transform { it.uppercase() } split> { it.payload } - split({ it }) { id("splitterEndpoint") } + split({ it }) { + id("splitterEndpoint") + phase(257) + } resequence() aggregate { id("aggregator")