From 34e23189bb28f80eed5e69b7e01823af5b295ebc Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Sat, 5 Nov 2016 14:28:05 -0400 Subject: [PATCH 1/8] INT-4162: TCP/UDP DSL JIRA: https://jira.spring.io/browse/INT-4162 INT-4162: IP DSL - Phase I Connection Factories Phase 2 - Adapters/Gateways Phase 3 - UDP --- .../TcpConnectionFactoryFactoryBean.java | 33 +-- .../ip/dsl/AbstractConnectionFactorySpec.java | 232 ++++++++++++++++++ .../integration/ip/dsl/Tcp.java | 115 +++++++++ .../dsl/TcpClientConnectionFactorySpec.java | 46 ++++ .../ip/dsl/TcpInboundChannelAdapterSpec.java | 69 ++++++ .../ip/dsl/TcpInboundGatewaySpec.java | 69 ++++++ .../ip/dsl/TcpOutboundChannelAdapterSpec.java | 69 ++++++ .../ip/dsl/TcpOutboundGatewaySpec.java | 42 ++++ .../dsl/TcpServerConnectionFactorySpec.java | 66 +++++ .../integration/ip/dsl/Udp.java | 50 ++++ .../ip/dsl/UdpInboundChannelAdapterSpec.java | 155 ++++++++++++ ...dpMulticastOutboundChannelAdapterSpec.java | 45 ++++ .../ip/dsl/UdpOutboundChannelAdapterSpec.java | 121 +++++++++ .../integration/ip/dsl/package-info.java | 4 + .../ip/tcp/TcpSendingMessageHandler.java | 10 +- .../connection/AbstractConnectionFactory.java | 43 ++-- .../CachingClientConnectionFactory.java | 15 -- .../ip/tcp/connection/ConnectionFactory.java | 4 +- .../udp/MulticastSendingMessageHandler.java | 25 ++ .../udp/UnicastReceivingChannelAdapter.java | 9 + .../ip/udp/UnicastSendingMessageHandler.java | 25 +- .../integration/ip/util/TestingUtilities.java | 32 +++ .../ip/dsl/ConnectionFacforyTests.java | 69 ++++++ .../ip/dsl/IpIntegrationTests.java | 178 ++++++++++++++ .../integration/ip/tcp/AutoStartTests.java | 3 +- 25 files changed, 1444 insertions(+), 85 deletions(-) create mode 100644 spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractConnectionFactorySpec.java create mode 100644 spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Tcp.java create mode 100644 spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpClientConnectionFactorySpec.java create mode 100644 spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java create mode 100644 spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java create mode 100644 spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java create mode 100644 spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java create mode 100644 spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpServerConnectionFactorySpec.java create mode 100644 spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java create mode 100644 spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpInboundChannelAdapterSpec.java create mode 100644 spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpMulticastOutboundChannelAdapterSpec.java create mode 100644 spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java create mode 100644 spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/package-info.java create mode 100644 spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFacforyTests.java create mode 100644 spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/config/TcpConnectionFactoryFactoryBean.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/config/TcpConnectionFactoryFactoryBean.java index c5c251eae57..f8bec67fdaa 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/config/TcpConnectionFactoryFactoryBean.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/config/TcpConnectionFactoryFactoryBean.java @@ -24,7 +24,7 @@ import org.springframework.beans.factory.config.AbstractFactoryBean; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; -import org.springframework.context.SmartLifecycle; +import org.springframework.context.Lifecycle; import org.springframework.core.serializer.Deserializer; import org.springframework.core.serializer.Serializer; import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; @@ -55,8 +55,8 @@ * @author Gary Russell * @since 2.0.5 */ -public class TcpConnectionFactoryFactoryBean extends AbstractFactoryBean implements SmartLifecycle, BeanNameAware, - BeanFactoryAware, ApplicationEventPublisherAware { +public class TcpConnectionFactoryFactoryBean extends AbstractFactoryBean + implements Lifecycle, BeanNameAware, BeanFactoryAware, ApplicationEventPublisherAware { private volatile AbstractConnectionFactory connectionFactory; @@ -446,33 +446,6 @@ public void stop() { this.connectionFactory.stop(); } - /** - * @return phase - * @see org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory#getPhase() - */ - @Override - public int getPhase() { - return this.connectionFactory.getPhase(); - } - - /** - * @return isAutoStartup - * @see org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory#isAutoStartup() - */ - @Override - public boolean isAutoStartup() { - return this.connectionFactory.isAutoStartup(); - } - - /** - * @param callback The Runnable to invoke. - * @see org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory#stop(java.lang.Runnable) - */ - @Override - public void stop(Runnable callback) { - this.connectionFactory.stop(callback); - } - @Override public boolean isRunning() { return this.connectionFactory.isRunning(); diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractConnectionFactorySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractConnectionFactorySpec.java new file mode 100644 index 00000000000..dfcf9400b8d --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractConnectionFactorySpec.java @@ -0,0 +1,232 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import java.util.concurrent.Executor; + +import org.springframework.core.serializer.Deserializer; +import org.springframework.core.serializer.Serializer; +import org.springframework.integration.dsl.IntegrationComponentSpec; +import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain; +import org.springframework.integration.ip.tcp.connection.TcpMessageMapper; +import org.springframework.integration.ip.tcp.connection.TcpSocketSupport; + +/** + * An {@link IntegrationComponentSpec} for {@link AbstractConnectionFactory}s. + * @param the target {@link AbstractConnectionFactorySpec} implementation type. + * @param the target {@link AbstractConnectionFactory} implementation type. + * + * @author Gary Russell + * @since 5.0 + * + */ +public abstract class AbstractConnectionFactorySpec + , C extends AbstractConnectionFactory> + extends IntegrationComponentSpec { + + protected C target; + + public AbstractConnectionFactorySpec(C connectionFactory) { + this.target = connectionFactory; + } + + @Override + public S id(String id) { + return super.id(id); + } + + /** + * @param soTimeout the socket timeout option. + * @return the spec. + * @see AbstractConnectionFactory#setSoTimeout(int) + */ + public S soTimeout(int soTimeout) { + this.target.setSoTimeout(soTimeout); + return _this(); + } + + /** + * @param soReceiveBufferSize the socket receive buffer size option. + * @return the spec. + * @see AbstractConnectionFactory#setSoReceiveBufferSize(int) + */ + public S soReceiveBufferSize(int soReceiveBufferSize) { + this.target.setSoReceiveBufferSize(soReceiveBufferSize); + return _this(); + } + + /** + * @param soSendBufferSize the socket send buffer size option. + * @return the spec. + * @see AbstractConnectionFactory#setSoSendBufferSize(int) + */ + public S soSendBufferSize(int soSendBufferSize) { + this.target.setSoSendBufferSize(soSendBufferSize); + return _this(); + } + + /** + * @param soTcpNoDelay the socket TCP no delay option (disable Nagle's algorithm). + * @return the spec. + * @see AbstractConnectionFactory#setSoTcpNoDelay(boolean) + */ + public S soTcpNoDelay(boolean soTcpNoDelay) { + this.target.setSoTcpNoDelay(soTcpNoDelay); + return _this(); + } + + /** + * @param soLinger the socket linger option. + * @return the spec. + * @see AbstractConnectionFactory#setSoLinger(int) + */ + public S soLinger(int soLinger) { + this.target.setSoLinger(soLinger); + return _this(); + } + + /** + * @param soKeepAlive the socket keepalive option. + * @return the spec. + * @see AbstractConnectionFactory#setSoKeepAlive(boolean) + */ + public S soKeepAlive(boolean soKeepAlive) { + this.target.setSoKeepAlive(soKeepAlive); + return _this(); + } + + /** + * @param soTrafficClass the socket traffic class option. + * @return the spec. + * @see AbstractConnectionFactory#setSoTrafficClass(int) + */ + public S soTrafficClass(int soTrafficClass) { + this.target.setSoTrafficClass(soTrafficClass); + return _this(); + } + + /** + * @param taskExecutor the task excecutor. + * @return the spec. + * @see AbstractConnectionFactory#setTaskExecutor(Executor) + */ + public S taskExecutor(Executor taskExecutor) { + this.target.setTaskExecutor(taskExecutor); + return _this(); + } + + /** + * @param deserializer the deserializer. + * @return the spec. + * @see AbstractConnectionFactory#setDeserializer(Deserializer) + */ + public S deserializer(Deserializer deserializer) { + this.target.setDeserializer(deserializer); + return _this(); + } + + /** + * @param serializer the serializer. + * @return the spec. + * @see AbstractConnectionFactory#setSerializer(Serializer) + */ + public S serializer(Serializer serializer) { + this.target.setSerializer(serializer); + return _this(); + } + + /** + * @param mapper the message mapper. + * @return the spec. + * @see AbstractConnectionFactory#setMapper(TcpMessageMapper) + */ + public S mapper(TcpMessageMapper mapper) { + this.target.setMapper(mapper); + return _this(); + } + + /** + * @param keepOpen true to keep the socket open for additional messages; inverse + * of {@link AbstractConnectionFactory#setSingleUse(boolean)}. + * @return the spec. + * @see AbstractConnectionFactory#setSingleUse(boolean) + */ + public S keepOpen(boolean keepOpen) { + this.target.setSingleUse(!keepOpen); + return _this(); + } + + /** + * @param interceptorFactoryChain the interceptor factory chain. + * @return the spec. + * @see AbstractConnectionFactory#setInterceptorFactoryChain(TcpConnectionInterceptorFactoryChain) + */ + public S interceptorFactoryChain(TcpConnectionInterceptorFactoryChain interceptorFactoryChain) { + this.target.setInterceptorFactoryChain(interceptorFactoryChain); + return _this(); + } + + /** + * @param lookupHost true to reverse lookup the host. + * @return the spec. + * @see AbstractConnectionFactory#setLookupHost(boolean) + */ + public S lookupHost(boolean lookupHost) { + this.target.setLookupHost(lookupHost); + return _this(); + } + + /** + * @param nioHarvestInterval the harvest interval when using NIO. + * @return the spec. + * @see AbstractConnectionFactory#setNioHarvestInterval(int) + */ + public S nioHarvestInterval(int nioHarvestInterval) { + this.target.setNioHarvestInterval(nioHarvestInterval); + return _this(); + } + + /** + * @param readDelay the read delay. + * @return the spec. + * @see AbstractConnectionFactory#setReadDelay(long) + */ + public S readDelay(long readDelay) { + this.target.setReadDelay(readDelay); + return _this(); + } + + /** + * @param tcpSocketSupport the {@link TcpSocketSupport}. + * @return the spec. + * @see AbstractConnectionFactory#setTcpSocketSupport(TcpSocketSupport) + */ + public S tcpSocketSupport(TcpSocketSupport tcpSocketSupport) { + this.target.setTcpSocketSupport(tcpSocketSupport); + return _this(); + } + + @Override + protected C doGet() { + if (getId() != null) { + this.target.setBeanName(getId()); + } + return this.target; + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Tcp.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Tcp.java new file mode 100644 index 00000000000..62a7472be02 --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Tcp.java @@ -0,0 +1,115 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; +import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; +import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; + +/** + * Factory methods for TCP. + * + * @author Gary Russell + * @since 5.0 + * + */ +public final class Tcp { + + /** + * Boolean indicating the connection factory should use NIO. + */ + public static final boolean NIO = true; + + /** + * Boolean indicating the connection factory should not use NIO + * (default). + */ + public static final boolean NET = true; + + private Tcp() { + super(); + } + + /** + * Create a server spec that uses NIO. + * @param port the port to listen on. + * @param the spec type. + * @param the connection factrory type. + * @return the spec. + */ + public static , C extends AbstractServerConnectionFactory> + TcpServerConnectionFactorySpec nioServer(int port) { + return new TcpServerConnectionFactorySpec<>(port, NIO); + } + + /** + * Create a server spec that does not use NIO. + * @param port the port to listen on. + * @param the spec type. + * @param the connection factrory type. + * @return the spec. + */ + public static , C extends AbstractServerConnectionFactory> + TcpServerConnectionFactorySpec netServer(int port) { + return new TcpServerConnectionFactorySpec<>(port, NET); + } + + /** + * Create a client spec that uses NIO. + * @param host the host to connect to. + * @param port the port to connect to. + * @param the spec type. + * @param the connection factrory type. + * @return the spec. + */ + public static , C extends AbstractClientConnectionFactory> + TcpClientConnectionFactorySpec nioClient(String host, int port) { + return new TcpClientConnectionFactorySpec<>(host, port, NIO); + } + + /** + * Create a client spec that does not use NIO. + * @param host the host to connect to. + * @param port the port to connect to. + * @param the spec type. + * @param the connection factrory type. + * @return the spec. + */ + public static , C extends AbstractClientConnectionFactory> + TcpClientConnectionFactorySpec netClient(String host, int port) { + return new TcpClientConnectionFactorySpec<>(host, port, NET); + } + + public static > TcpInboundGatewaySpec inboundGateway( + AbstractConnectionFactory connectionFactory) { + return new TcpInboundGatewaySpec<>(connectionFactory); + } + + public static > TcpInboundChannelAdapterSpec inboundAdapter( + AbstractConnectionFactory connectionFactory) { + return new TcpInboundChannelAdapterSpec<>(connectionFactory); + } + + public static TcpOutboundGatewaySpec outboundGateway(AbstractClientConnectionFactory connectionFactory) { + return new TcpOutboundGatewaySpec(connectionFactory); + } + + public static TcpOutboundChannelAdapterSpec outboundAdapter(AbstractConnectionFactory connectionFactory) { + return new TcpOutboundChannelAdapterSpec(connectionFactory); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpClientConnectionFactorySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpClientConnectionFactorySpec.java new file mode 100644 index 00000000000..116b2e79d49 --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpClientConnectionFactorySpec.java @@ -0,0 +1,46 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import org.springframework.integration.dsl.IntegrationComponentSpec; +import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory; + +/** + * An {@link IntegrationComponentSpec} for {@link AbstractClientConnectionFactory}s. + * @author Gary Russell + * + * @param the target {@link TcpClientConnectionFactorySpec} implementation type. + * @param the target {@link AbstractClientConnectionFactory} implementation type. + * + * @since 5.0 + * + */ +public class TcpClientConnectionFactorySpec + , C extends AbstractClientConnectionFactory> + extends AbstractConnectionFactorySpec { + + TcpClientConnectionFactorySpec(String host, int port) { + this(host, port, false); + } + + public TcpClientConnectionFactorySpec(String host, int port, boolean nio) { + super(nio ? new TcpNioClientConnectionFactory(host, port) : new TcpNetClientConnectionFactory(host, port)); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java new file mode 100644 index 00000000000..45551a6f771 --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java @@ -0,0 +1,69 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import org.springframework.integration.dsl.MessageProducerSpec; +import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter; +import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; +import org.springframework.scheduling.TaskScheduler; + +/** + * A {@link MessageProducerSpec} for {@link TcpReceivingChannelAdapter}s. + * + * @author Gary Russell + * @since 5.0 + * + */ +public class TcpInboundChannelAdapterSpec> + extends MessageProducerSpec { + + TcpInboundChannelAdapterSpec(AbstractConnectionFactory connectionFactory) { + super(new TcpReceivingChannelAdapter()); + this.target.setConnectionFactory(connectionFactory); + } + + /** + * @param isClientMode true to connect in client mode + * @return the spec. + * @see TcpReceivingChannelAdapter#setClientMode(boolean) + */ + public S clientMode(boolean isClientMode) { + this.target.setClientMode(isClientMode); + return _this(); + } + + /** + * @param retryInterval the client mode retry interval to set. + * @return the spec. + * @see TcpReceivingChannelAdapter#setRetryInterval(long) + */ + public S retryInterval(long retryInterval) { + this.target.setRetryInterval(retryInterval); + return _this(); + } + + /** + * @param taskScheduler the scheduler for connecting in client mode. + * @return the spec. + * @see TcpReceivingChannelAdapter#setTaskScheduler(TaskScheduler) + */ + public S taskScheduler(TaskScheduler taskScheduler) { + this.target.setTaskScheduler(taskScheduler); + return _this(); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java new file mode 100644 index 00000000000..d9428b6430c --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java @@ -0,0 +1,69 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import org.springframework.integration.dsl.MessagingGatewaySpec; +import org.springframework.integration.ip.tcp.TcpInboundGateway; +import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; +import org.springframework.scheduling.TaskScheduler; + +/** + * A {@link MessagingGatewaySpec} for {@link TcpInboundGateway}s. + * + * @author Gary Russell + * @since 5.0 + * + */ +public class TcpInboundGatewaySpec> + extends MessagingGatewaySpec { + + TcpInboundGatewaySpec(AbstractConnectionFactory connectionFactory) { + super(new TcpInboundGateway()); + this.target.setConnectionFactory(connectionFactory); + } + + /** + * @param isClientMode true to connect in client mode + * @return the spec. + * @see TcpInboundGateway#setClientMode(boolean) + */ + public S clientMode(boolean isClientMode) { + this.target.setClientMode(isClientMode); + return _this(); + } + + /** + * @param retryInterval the client mode retry interval to set. + * @return the spec. + * @see TcpInboundGateway#setRetryInterval(long) + */ + public S retryInterval(long retryInterval) { + this.target.setRetryInterval(retryInterval); + return _this(); + } + + /** + * @param taskScheduler the scheduler for connecting in client mode. + * @return the spec. + * @see TcpInboundGateway#setTaskScheduler(TaskScheduler) + */ + public S taskScheduler(TaskScheduler taskScheduler) { + this.target.setTaskScheduler(taskScheduler); + return _this(); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java new file mode 100644 index 00000000000..beb98074ada --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java @@ -0,0 +1,69 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.ip.tcp.TcpSendingMessageHandler; +import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; +import org.springframework.scheduling.TaskScheduler; + +/** + * A {@link MessageHandlerSpec} for {@link TcpSendingMessageHandler}s. + * + * @author Gary Russell + * @since 5.0 + * + */ +public class TcpOutboundChannelAdapterSpec + extends MessageHandlerSpec { + + TcpOutboundChannelAdapterSpec(AbstractConnectionFactory connectionFactory) { + this.target = new TcpSendingMessageHandler(); + this.target.setConnectionFactory(connectionFactory); + } + + /** + * @param isClientMode true to connect in client mode + * @return the spec. + * @see TcpSendingMessageHandler#setClientMode(boolean) + */ + public TcpOutboundChannelAdapterSpec clientMode(boolean isClientMode) { + this.target.setClientMode(isClientMode); + return _this(); + } + + /** + * @param retryInterval the client mode retry interval to set. + * @return the spec. + * @see TcpSendingMessageHandler#setRetryInterval(long) + */ + public TcpOutboundChannelAdapterSpec retryInterval(long retryInterval) { + this.target.setRetryInterval(retryInterval); + return _this(); + } + + /** + * @param taskScheduler the scheduler for connecting in client mode. + * @return the spec. + * @see TcpSendingMessageHandler#setTaskScheduler(TaskScheduler) + */ + public TcpOutboundChannelAdapterSpec taskScheduler(TaskScheduler taskScheduler) { + this.target.setTaskScheduler(taskScheduler); + return _this(); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java new file mode 100644 index 00000000000..4f9ff079599 --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java @@ -0,0 +1,42 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.ip.tcp.TcpOutboundGateway; +import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; + +/** + * A {@link MessageHandlerSpec} for {@link TcpOutboundGateway}s. + * + * @author Gary Russell + * @since 5.0 + * + */ +public class TcpOutboundGatewaySpec extends MessageHandlerSpec { + + public TcpOutboundGatewaySpec(AbstractClientConnectionFactory connectionFactory) { + this.target = new TcpOutboundGateway(); + this.target.setConnectionFactory(connectionFactory); + } + + public TcpOutboundGatewaySpec remoteTimeout(long remoteTimeout) { + this.target.setRemoteTimeout(remoteTimeout); + return _this(); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpServerConnectionFactorySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpServerConnectionFactorySpec.java new file mode 100644 index 00000000000..0799894df28 --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpServerConnectionFactorySpec.java @@ -0,0 +1,66 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import org.springframework.integration.dsl.IntegrationComponentSpec; +import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory; + +/** + * An {@link IntegrationComponentSpec} for {@link AbstractServerConnectionFactory}s. + * @author Gary Russell + * + * @param the target {@link TcpServerConnectionFactorySpec} implementation type. + * @param the target {@link AbstractServerConnectionFactory} implementation type. + * + * @since 5.0 + * + */ +public class TcpServerConnectionFactorySpec + , C extends AbstractServerConnectionFactory> + extends AbstractConnectionFactorySpec { + + TcpServerConnectionFactorySpec(int port) { + this(port, false); + } + + public TcpServerConnectionFactorySpec(int port, boolean nio) { + super(nio ? new TcpNioServerConnectionFactory(port) : new TcpNetServerConnectionFactory(port)); + } + + /** + * @param localAddress the local address. + * @return the spec. + * @see AbstractServerConnectionFactory#setLocalAddress(String) + */ + public S localAddress(String localAddress) { + this.target.setLocalAddress(localAddress); + return _this(); + } + + /** + * @param backlog the backlog. + * @return the spec. + * @see AbstractServerConnectionFactory#setBacklog(int) + */ + public S backlog(int backlog) { + this.target.setBacklog(backlog); + return _this(); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java new file mode 100644 index 00000000000..f84ff4cf89b --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java @@ -0,0 +1,50 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +/** + * Factory methods for UDP. + * + * @author Gary Russell + * @since 5.0 + * + */ +public final class Udp { + + private Udp() { + super(); + } + + public static > UdpInboundChannelAdapterSpec inboundAdapter( + int port) { + return new UdpInboundChannelAdapterSpec<>(port); + } + + public static > UdpInboundChannelAdapterSpec inboundMulticastAdapter( + int port, String multicastGroup) { + return new UdpInboundChannelAdapterSpec<>(port, multicastGroup); + } + + public static UdpOutboundChannelAdapterSpec outboundAdapter(String destinationExpression) { + return new UdpOutboundChannelAdapterSpec(destinationExpression); + } + + public static UdpOutboundChannelAdapterSpec outboundMultcastAdapter(String destinationExpression) { + return new UdpOutboundChannelAdapterSpec(destinationExpression); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpInboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpInboundChannelAdapterSpec.java new file mode 100644 index 00000000000..66d8e407085 --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpInboundChannelAdapterSpec.java @@ -0,0 +1,155 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import java.net.DatagramSocket; +import java.util.concurrent.Executor; + +import org.springframework.integration.dsl.MessageProducerSpec; +import org.springframework.integration.ip.udp.MulticastReceivingChannelAdapter; +import org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter; +import org.springframework.scheduling.TaskScheduler; + +/** + * A {@link MessageProducerSpec} for {@link UnicastReceivingChannelAdapter}s. + * + * @author Gary Russell + * @since 5.0 + * + */ +public class UdpInboundChannelAdapterSpec> + extends MessageProducerSpec { + + UdpInboundChannelAdapterSpec(int port) { + super(new UnicastReceivingChannelAdapter(port)); + } + + UdpInboundChannelAdapterSpec(int port, String multicastGroup) { + super(new MulticastReceivingChannelAdapter(multicastGroup, port)); + } + + /** + * @param soTimeout set the timeout socket option. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setSoTimeout(int) + */ + public S soTimeout(int soTimeout) { + this.target.setSoTimeout(soTimeout); + return _this(); + } + + /** + * @param taskScheduler set the task scheduler. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setTaskScheduler(TaskScheduler) + */ + public S taskScheduler(TaskScheduler taskScheduler) { + this.target.setTaskScheduler(taskScheduler); + return _this(); + } + + /** + * @param soReceiveBufferSize set the receive buffer size socket option. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setSoReceiveBufferSize(int) + */ + public S soReceiveBufferSize(int soReceiveBufferSize) { + this.target.setSoReceiveBufferSize(soReceiveBufferSize); + return _this(); + } + + /** + * @param receiveBufferSize set the receive buffer size. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setReceiveBufferSize(int) + */ + public S receiveBufferSize(int receiveBufferSize) { + this.target.setReceiveBufferSize(receiveBufferSize); + return _this(); + } + + /** + * @param lengthCheck set the length check boolean. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setLengthCheck(boolean) + */ + public S lengthCheck(boolean lengthCheck) { + this.target.setLengthCheck(lengthCheck); + return _this(); + } + + /** + * @param localAddress set the local address. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setLocalAddress(String) + */ + public S localAddress(String localAddress) { + this.target.setLocalAddress(localAddress); + return _this(); + } + + /** + * @param poolSize set the pool size. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setPoolSize(int) + */ + public S poolSize(int poolSize) { + this.target.setPoolSize(poolSize); + return _this(); + } + + /** + * @param taskExecutor set the task executor. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setTaskExecutor(Executor) + */ + public S TaskExecutor(Executor taskExecutor) { + this.target.setTaskExecutor(taskExecutor); + return _this(); + } + + /** + * @param socket set the socket. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setSocket(DatagramSocket) + */ + public S socket(DatagramSocket socket) { + this.target.setSocket(socket); + return _this(); + } + + /** + * @param soSendBufferSize set the send bufffer size socket option. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setSoSendBufferSize(int) + */ + public S soSendBufferSize(int soSendBufferSize) { + this.target.setSoSendBufferSize(soSendBufferSize); + return _this(); + } + + /** + * @param lookupHost set true to reverse lookup the host. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setLookupHost(boolean) + */ + public S LookupHost(boolean lookupHost) { + this.target.setLookupHost(lookupHost); + return _this(); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpMulticastOutboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpMulticastOutboundChannelAdapterSpec.java new file mode 100644 index 00000000000..3d40e41af78 --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpMulticastOutboundChannelAdapterSpec.java @@ -0,0 +1,45 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.ip.udp.MulticastSendingMessageHandler; + +/** + * A {@link MessageHandlerSpec} for {@link MulticastSendingMessageHandler}s. + * + * @author Gary Russell + * @since 5.0 + * + */ +public class UdpMulticastOutboundChannelAdapterSpec extends UdpOutboundChannelAdapterSpec { + + UdpMulticastOutboundChannelAdapterSpec(String destinationExpression) { + this.target = new MulticastSendingMessageHandler(destinationExpression); + } + + /** + * @param timeToLive the timeToLive. + * @return the spec. + * @see MulticastSendingMessageHandler#setTimeToLive(int) + */ + public UdpOutboundChannelAdapterSpec TimeToLive(int timeToLive) { + ((MulticastSendingMessageHandler) this.target).setTimeToLive(timeToLive); + return _this(); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java new file mode 100644 index 00000000000..fcc1262d401 --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java @@ -0,0 +1,121 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import org.springframework.expression.Expression; +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.ip.udp.UnicastSendingMessageHandler; + +/** + * A {@link MessageHandlerSpec} for {@link UnicastSendingMessageHandler}s. + * + * @author Gary Russell + * @since 5.0 + * + */ +public class UdpOutboundChannelAdapterSpec + extends MessageHandlerSpec { + + protected UdpOutboundChannelAdapterSpec() { + super(); + } + + UdpOutboundChannelAdapterSpec(String destinationExpression) { + this.target = new UnicastSendingMessageHandler(destinationExpression); + } + + /** + * @param timeout the timeout socket option. + * @return the spec. + * @see UnicastSendingMessageHandler#setSoTimeout(int) + */ + public UdpOutboundChannelAdapterSpec soTimeout(int timeout) { + this.target.setSoTimeout(timeout); + return _this(); + } + + /** + * @param size the send buffer size socket option. + * @return the spec. + * @see UnicastSendingMessageHandler#setSoSendBufferSize(int) + */ + public UdpOutboundChannelAdapterSpec soSendBufferSize(int size) { + this.target.setSoSendBufferSize(size); + return _this(); + } + + /** + * @param localAddress the local address. + * @return the spec. + * @see UnicastSendingMessageHandler#setLocalAddress(String) + */ + public UdpOutboundChannelAdapterSpec localAddress(String localAddress) { + this.target.setLocalAddress(localAddress); + return _this(); + } + + /** + * @param lengthCheck the length check boolean. + * @return the spec. + * @see UnicastSendingMessageHandler#setLengthCheck(boolean) + */ + public UdpOutboundChannelAdapterSpec lengthCheck(boolean lengthCheck) { + this.target.setLengthCheck(lengthCheck); + return _this(); + } + + /** + * @param size the receive buffer size socket option. + * @return the spec. + * @see UnicastSendingMessageHandler#setSoReceiveBufferSize(int) + */ + public UdpOutboundChannelAdapterSpec soReceiveBufferSize(int size) { + this.target.setSoReceiveBufferSize(size); + return _this(); + } + + /** + * @param ackCounter the ack counter. + * @return the spec. + * @see UnicastSendingMessageHandler#setAckCounter(int) + */ + public UdpOutboundChannelAdapterSpec ackCounter(int ackCounter) { + this.target.setAckCounter(ackCounter); + return _this(); + } + + /** + * @param socketExpression the socket expression. + * @return the spec. + * @see UnicastSendingMessageHandler#setSocketExpression(Expression) + */ + public UdpOutboundChannelAdapterSpec SocketExpression(Expression socketExpression) { + this.target.setSocketExpression(socketExpression); + return _this(); + } + + /** + * @param socketExpression the socket expression. + * @return the spec. + * @see UnicastSendingMessageHandler#setSocketExpressionString(String) + */ + public UdpOutboundChannelAdapterSpec SocketExpression(String socketExpression) { + this.target.setSocketExpressionString(socketExpression); + return _this(); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/package-info.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/package-info.java new file mode 100644 index 00000000000..8844ee096c4 --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/package-info.java @@ -0,0 +1,4 @@ +/** + * Provides TCP/UDP Component support for the Java DSL. + */ +package org.springframework.integration.ip.dsl; \ No newline at end of file diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandler.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandler.java index 595d74b1ea7..f3c75b11736 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandler.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandler.java @@ -291,10 +291,10 @@ public void stop(Runnable callback) { } this.clientModeConnectionManager = null; if (this.clientConnectionFactory != null) { - this.clientConnectionFactory.stop(callback); + this.clientConnectionFactory.stop(); } if (this.serverConnectionFactory != null) { - this.serverConnectionFactory.stop(callback); + this.serverConnectionFactory.stop(); } } } @@ -330,8 +330,7 @@ public boolean isClientMode() { } /** - * @param isClientMode - * the isClientMode to set + * @param isClientMode the isClientMode to set */ public void setClientMode(boolean isClientMode) { this.isClientMode = isClientMode; @@ -350,8 +349,7 @@ public long getRetryInterval() { } /** - * @param retryInterval - * the retryInterval to set + * @param retryInterval the retryInterval to set */ public void setRetryInterval(long retryInterval) { this.retryInterval = retryInterval; diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java index 3d5c4d4a0e7..6c1e4ce7abe 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java @@ -44,7 +44,6 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; -import org.springframework.context.SmartLifecycle; import org.springframework.core.serializer.Deserializer; import org.springframework.core.serializer.Serializer; import org.springframework.integration.context.IntegrationObjectSupport; @@ -60,7 +59,7 @@ * */ public abstract class AbstractConnectionFactory extends IntegrationObjectSupport - implements ConnectionFactory, SmartLifecycle, ApplicationEventPublisherAware { + implements ConnectionFactory, ApplicationEventPublisherAware { protected static final int DEFAULT_REPLY_TIMEOUT = 10000; @@ -275,6 +274,16 @@ public void setSoTrafficClass(int soTrafficClass) { this.soTrafficClass = soTrafficClass; } + /** + * Set the host; requires the factory to be stopped. + * @param host the host. + * @since 5.0 + */ + public void setHost(String host) { + Assert.state(!isRunning(), "Cannot change the host while running"); + this.host = host; + } + /** * @return the host */ @@ -282,6 +291,16 @@ public String getHost() { return this.host; } + /** + * Set the port; requires the factory to be stopped. + * @param port the port. + * @since 5.0 + */ + public void setPort(int port) { + Assert.state(!isRunning(), "Cannot change the host while running"); + this.port = port; + } + /** * @return the port */ @@ -752,26 +771,6 @@ protected void doAccept(final Selector selector, ServerSocketChannel server, lon throw new UnsupportedOperationException("Nio server factory must override this method"); } - @Override - public int getPhase() { - return 0; - } - - /** - * We are controlled by the startup options of - * the bound endpoint. - */ - @Override - public boolean isAutoStartup() { - return false; - } - - @Override - public void stop(Runnable callback) { - stop(); - callback.run(); - } - protected void addConnection(TcpConnectionSupport connection) { synchronized (this.connections) { if (!this.active) { diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java index ba87ca838ab..949cf6d3e6a 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java @@ -388,21 +388,6 @@ public synchronized void stop() { this.pool.removeAllIdleItems(); } - @Override - public int getPhase() { - return this.targetConnectionFactory.getPhase(); - } - - @Override - public boolean isAutoStartup() { - return this.targetConnectionFactory.isAutoStartup(); - } - - @Override - public void stop(Runnable callback) { - this.targetConnectionFactory.stop(callback); - } - private final class CachedConnection extends TcpConnectionInterceptorSupport { private final AtomicBoolean released = new AtomicBoolean(); diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/ConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/ConnectionFactory.java index ca74042ef49..b506dc168e2 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/ConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/ConnectionFactory.java @@ -16,7 +16,7 @@ package org.springframework.integration.ip.tcp.connection; -import org.springframework.context.SmartLifecycle; +import org.springframework.context.Lifecycle; @@ -27,7 +27,7 @@ * @since 2.0 * */ -public interface ConnectionFactory extends SmartLifecycle { +public interface ConnectionFactory extends Lifecycle { TcpConnection getConnection() throws Exception; diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/MulticastSendingMessageHandler.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/MulticastSendingMessageHandler.java index b07633d4c58..06be6d94ff0 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/MulticastSendingMessageHandler.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/MulticastSendingMessageHandler.java @@ -21,7 +21,10 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.MulticastSocket; +import java.net.SocketAddress; +import java.net.URI; +import org.springframework.expression.Expression; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; @@ -99,6 +102,28 @@ public MulticastSendingMessageHandler(String address, int port, super(address, port, lengthCheck, acknowledge, ackHost, ackPort, ackTimeout); } + /** + * Construct UnicastSendingMessageHandler based on the destination SpEL expression to + * determine the target destination at runtime against requestMessage. + * @param destinationExpression the SpEL expression to evaluate the target destination + * at runtime. Must evaluate to {@link String}, {@link URI} or {@link SocketAddress}. + * @since 5.0 + */ + public MulticastSendingMessageHandler(Expression destinationExpression) { + super(destinationExpression); + } + + /** + * Construct UnicastSendingMessageHandler based on the destination SpEL expression to + * determine the target destination at runtime against requestMessage. + * @param destinationExpression the SpEL expression to evaluate the target destination + * at runtime. Must evaluate to {@link String}, {@link URI} or {@link SocketAddress}. + * @since 5.0 + */ + public MulticastSendingMessageHandler(String destinationExpression) { + super(destinationExpression); + } + @Override protected DatagramSocket getSocket() throws IOException { if (this.getTheSocket() == null) { diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastReceivingChannelAdapter.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastReceivingChannelAdapter.java index 3af9ba2bf89..9d777303799 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastReceivingChannelAdapter.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastReceivingChannelAdapter.java @@ -74,6 +74,15 @@ public UnicastReceivingChannelAdapter(int port, boolean lengthCheck) { this.mapper.setLengthCheck(lengthCheck); } + /** + * @param lengthCheck if true, the incoming packet is expected to have a four + * byte binary length header. + * @since 5.0 + */ + public void setLengthCheck(boolean lengthCheck) { + this.mapper.setLengthCheck(lengthCheck); + } + @Override public boolean isLongLived() { return true; diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastSendingMessageHandler.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastSendingMessageHandler.java index 9d62a3df3af..bac767bb98a 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastSendingMessageHandler.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastSendingMessageHandler.java @@ -114,10 +114,10 @@ public UnicastSendingMessageHandler(String host, int port) { } /** - * Construct UnicastSendingMessageHandler based on the destination SpEL expression to determine - * the target destination at runtime against requestMessage. - * @param destinationExpression the SpEL expression to evaluate the target destination at runtime. - * Must evaluate to {@link String}, {@link URI} or {@link SocketAddress}. + * Construct UnicastSendingMessageHandler based on the destination SpEL expression to + * determine the target destination at runtime against requestMessage. + * @param destinationExpression the SpEL expression to evaluate the target destination + * at runtime. Must evaluate to {@link String}, {@link URI} or {@link SocketAddress}. * @since 4.3 */ public UnicastSendingMessageHandler(String destinationExpression) { @@ -129,10 +129,10 @@ public UnicastSendingMessageHandler(String destinationExpression) { } /** - * Construct UnicastSendingMessageHandler based on the destination SpEL expression to determine - * the target destination at runtime against requestMessage. - * @param destinationExpression the SpEL expression to evaluate the target destination at runtime. - * Must evaluate to {@link String}, {@link URI} or {@link SocketAddress}. + * Construct UnicastSendingMessageHandler based on the destination SpEL expression to + * determine the target destination at runtime against requestMessage. + * @param destinationExpression the SpEL expression to evaluate the target destination + * at runtime. Must evaluate to {@link String}, {@link URI} or {@link SocketAddress}. * @since 4.3 */ public UnicastSendingMessageHandler(Expression destinationExpression) { @@ -217,6 +217,15 @@ protected final void setReliabilityAttributes(boolean lengthCheck, } } + /** + * @param lengthCheck if true, a four byte binary length header is added to the + * packet, allowing the receiver to check for data truncation. + * @since 5.0 + */ + public void setLengthCheck(boolean lengthCheck) { + this.mapper.setLengthCheck(lengthCheck); + } + @Override public void doStart() { if (this.acknowledge) { diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/util/TestingUtilities.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/util/TestingUtilities.java index a4a24a816fa..d2c8e694191 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/util/TestingUtilities.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/util/TestingUtilities.java @@ -18,6 +18,7 @@ import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; +import org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter; /** * Convenience class providing methods for testing IP components. @@ -65,6 +66,37 @@ public static void waitListening(AbstractServerConnectionFactory serverConnectio } } + /** + * Wait for a server connection factory to actually start listening before + * starting a test. Waits for up to 10 seconds by default. + * @param adapter The server connection factory. + * @param delay How long to wait in milliseconds; default 10000 (10 seconds) if null. + * @throws IllegalStateException If the server does not start listening in time. + */ + public static void waitListening(UnicastReceivingChannelAdapter adapter, Long delay) + throws IllegalStateException { + if (delay == null) { + delay = 100L; + } + else { + delay = delay / 100; + } + int n = 0; + while (!adapter.isListening()) { + try { + Thread.sleep(100); + } + catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e1); + } + + if (n++ > delay) { + throw new IllegalStateException("Server didn't start listening."); + } + } + } + /** * Wait for a server connection factory to stop listening. * Waits for up to 10 seconds by default. diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFacforyTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFacforyTests.java new file mode 100644 index 00000000000..b343b0988da --- /dev/null +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFacforyTests.java @@ -0,0 +1,69 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; +import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; +import org.springframework.integration.ip.util.TestingUtilities; +import org.springframework.integration.transformer.ObjectToStringTransformer; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; + +/** + * @author Gary Russell + * @since 5.0 + * + */ +public class ConnectionFacforyTests { + + @Test + public void test() throws Exception { + ApplicationEventPublisher publisher = e -> { }; + AbstractServerConnectionFactory server = Tcp.netServer(0).backlog(2).soTimeout(5000).get(); + final AtomicReference> received = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + server.registerListener(m -> { + received.set(new ObjectToStringTransformer().transform(m)); + latch.countDown(); + return false; + }); + server.setApplicationEventPublisher(publisher); + server.afterPropertiesSet(); + server.start(); + TestingUtilities.waitListening(server, null); + AbstractClientConnectionFactory client = Tcp.netClient("localhost", server.getPort()).get(); + client.setApplicationEventPublisher(publisher); + client.afterPropertiesSet(); + client.start(); + client.getConnection().send(new GenericMessage<>("foo")); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals("foo", received.get().getPayload()); + client.stop(); + server.stop(); + } + +} diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java new file mode 100644 index 00000000000..9ed324de1bf --- /dev/null +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java @@ -0,0 +1,178 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.dsl.context.IntegrationFlowContext; +import org.springframework.integration.dsl.context.IntegrationFlowRegistration; +import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter; +import org.springframework.integration.ip.tcp.TcpSendingMessageHandler; +import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; +import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; +import org.springframework.integration.ip.tcp.serializer.TcpCodecs; +import org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter; +import org.springframework.integration.ip.util.TestingUtilities; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.transformer.ObjectToStringTransformer; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * @author Gary Russell + * @since 5.0 + * + */ +@RunWith(SpringRunner.class) +@DirtiesContext +public class IpIntegrationTests { + + @Autowired + private AbstractServerConnectionFactory server1; + + @Autowired + private AbstractClientConnectionFactory client1; + + @Autowired + private IntegrationFlowContext flowContext; + + @Autowired + @Qualifier("outUdpAdapter.input") + private MessageChannel udpOut; + + @Autowired + private UnicastReceivingChannelAdapter udpInbound; + + @Autowired + private QueueChannel udpIn; + + @Test + public void testTcpAdapters() throws Exception { + ApplicationEventPublisher publisher = e -> { }; + AbstractServerConnectionFactory server = Tcp.netServer(0).backlog(2).soTimeout(5000).id("server").get(); + assertEquals("server", server.getComponentName()); + server.setApplicationEventPublisher(publisher); + server.afterPropertiesSet(); + TcpReceivingChannelAdapter inbound = Tcp.inboundAdapter(server).get(); + QueueChannel received = new QueueChannel(); + inbound.setOutputChannel(received); + inbound.afterPropertiesSet(); + inbound.start(); + TestingUtilities.waitListening(server, null); + AbstractClientConnectionFactory client = Tcp.netClient("localhost", server.getPort()).id("client").get(); + assertEquals("client", client.getComponentName()); + client.setApplicationEventPublisher(publisher); + client.afterPropertiesSet(); + TcpSendingMessageHandler handler = Tcp.outboundAdapter(client).get(); + handler.start(); + handler.handleMessage(new GenericMessage<>("foo")); + Message receivedMessage = received.receive(10000); + assertNotNull(receivedMessage); + assertEquals("foo", new ObjectToStringTransformer().transform(receivedMessage).getPayload()); + client.stop(); + server.stop(); + } + + @Test + public void testTcpGateways() { + TestingUtilities.waitListening(this.server1, null); + this.client1.setPort(this.server1.getPort()); + IntegrationFlow flow = f -> f + .handle(Tcp.outboundGateway(this.client1)) + .transform(new ObjectToStringTransformer()); + IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register(); + assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO")); + } + + @Test + public void testUdp() { + TestingUtilities.waitListening(this.udpInbound, null); + Message outMessage = MessageBuilder.withPayload("foo") + .setHeader("udp_dest", "udp://localhost:" + this.udpInbound.getPort()) + .build(); + this.udpOut.send(outMessage); + Message received = this.udpIn.receive(10000); + assertNotNull(received); + assertEquals("foo", new ObjectToStringTransformer().transform(received).getPayload()); + } + + @Configuration + @EnableIntegration + public static class Config { + + @Bean + public AbstractServerConnectionFactory server1() { + return Tcp.netServer(0) + .serializer(TcpCodecs.lengthHeader1()) + .deserializer(TcpCodecs.crlf()) + .get(); + } + + @Bean + public AbstractClientConnectionFactory client1() { + return Tcp.netClient("localhost", 0) + .serializer(TcpCodecs.crlf()) + .deserializer(TcpCodecs.lengthHeader1()) + .get(); + } + + @Bean + public IntegrationFlow inTcpGateway() { + return IntegrationFlows.from(Tcp.inboundGateway(server1())) + .transform(new ObjectToStringTransformer()) + .transform(String::toUpperCase) + .get(); + } + + @Bean + public IntegrationFlow inUdpAdapter() { + return IntegrationFlows.from(Udp.inboundAdapter(0)) + .channel(udpIn()) + .get(); + } + + @Bean + public QueueChannel udpIn() { + return new QueueChannel(); + } + + @Bean + public IntegrationFlow outUdpAdapter() { + return f -> f.handle(Udp.outboundAdapter("headers['udp_dest']")); + } + + } + +} diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AutoStartTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AutoStartTests.java index 9c549569ed6..85ade45c710 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AutoStartTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AutoStartTests.java @@ -16,11 +16,11 @@ package org.springframework.integration.ip.tcp; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import org.junit.Test; import org.junit.runner.RunWith; + import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; @@ -45,7 +45,6 @@ public class AutoStartTests { @Test public void testNetIn() throws Exception { - assertFalse(cfS1.isAutoStartup()); DirectFieldAccessor dfa = new DirectFieldAccessor(cfS1); assertNull(dfa.getPropertyValue("serverSocket")); startAndStop(); From 2e095a1377395104fd932b94602cdfbd48764795 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 8 Nov 2016 09:28:56 -0500 Subject: [PATCH 2/8] Polishing - PR Comments Remove unnecessary generics. --- .../ip/dsl/AbstractConnectionFactorySpec.java | 15 +---- .../integration/ip/dsl/Tcp.java | 59 ++++++++++--------- .../dsl/TcpClientConnectionFactorySpec.java | 8 +-- .../ip/dsl/TcpInboundChannelAdapterSpec.java | 10 ++-- .../ip/dsl/TcpInboundGatewaySpec.java | 9 ++- .../dsl/TcpServerConnectionFactorySpec.java | 12 ++-- .../integration/ip/dsl/Udp.java | 14 ++--- .../ip/dsl/UdpInboundChannelAdapterSpec.java | 28 ++++----- ...dpMulticastOutboundChannelAdapterSpec.java | 2 +- .../ip/dsl/UdpOutboundChannelAdapterSpec.java | 4 +- .../integration/ip/dsl/package-info.java | 2 +- .../ip/tcp/TcpSendingMessageHandler.java | 19 +----- .../ip/dsl/IpIntegrationTests.java | 10 ++-- 13 files changed, 80 insertions(+), 112 deletions(-) diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractConnectionFactorySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractConnectionFactorySpec.java index dfcf9400b8d..f06590da046 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractConnectionFactorySpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractConnectionFactorySpec.java @@ -39,15 +39,14 @@ public abstract class AbstractConnectionFactorySpec , C extends AbstractConnectionFactory> extends IntegrationComponentSpec { - protected C target; - - public AbstractConnectionFactorySpec(C connectionFactory) { + AbstractConnectionFactorySpec(C connectionFactory) { this.target = connectionFactory; } @Override public S id(String id) { - return super.id(id); + this.target.setBeanName(id); + return _this(); } /** @@ -221,12 +220,4 @@ public S tcpSocketSupport(TcpSocketSupport tcpSocketSupport) { return _this(); } - @Override - protected C doGet() { - if (getId() != null) { - this.target.setBeanName(getId()); - } - return this.target; - } - } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Tcp.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Tcp.java index 62a7472be02..635783b58e6 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Tcp.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Tcp.java @@ -18,7 +18,6 @@ import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; -import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; /** * Factory methods for TCP. @@ -47,67 +46,73 @@ private Tcp() { /** * Create a server spec that uses NIO. * @param port the port to listen on. - * @param the spec type. - * @param the connection factrory type. * @return the spec. */ - public static , C extends AbstractServerConnectionFactory> - TcpServerConnectionFactorySpec nioServer(int port) { - return new TcpServerConnectionFactorySpec<>(port, NIO); + public static TcpServerConnectionFactorySpec nioServer(int port) { + return new TcpServerConnectionFactorySpec(port, NIO); } /** * Create a server spec that does not use NIO. * @param port the port to listen on. - * @param the spec type. - * @param the connection factrory type. * @return the spec. */ - public static , C extends AbstractServerConnectionFactory> - TcpServerConnectionFactorySpec netServer(int port) { - return new TcpServerConnectionFactorySpec<>(port, NET); + public static TcpServerConnectionFactorySpec netServer(int port) { + return new TcpServerConnectionFactorySpec(port, NET); } /** * Create a client spec that uses NIO. * @param host the host to connect to. * @param port the port to connect to. - * @param the spec type. - * @param the connection factrory type. * @return the spec. */ - public static , C extends AbstractClientConnectionFactory> - TcpClientConnectionFactorySpec nioClient(String host, int port) { - return new TcpClientConnectionFactorySpec<>(host, port, NIO); + public static TcpClientConnectionFactorySpec nioClient(String host, int port) { + return new TcpClientConnectionFactorySpec(host, port, NIO); } /** * Create a client spec that does not use NIO. * @param host the host to connect to. * @param port the port to connect to. - * @param the spec type. - * @param the connection factrory type. * @return the spec. */ - public static , C extends AbstractClientConnectionFactory> - TcpClientConnectionFactorySpec netClient(String host, int port) { - return new TcpClientConnectionFactorySpec<>(host, port, NET); + public static TcpClientConnectionFactorySpec netClient(String host, int port) { + return new TcpClientConnectionFactorySpec(host, port, NET); } - public static > TcpInboundGatewaySpec inboundGateway( - AbstractConnectionFactory connectionFactory) { - return new TcpInboundGatewaySpec<>(connectionFactory); + /** + * Create an inbound gateway using the supplied connection factory. + * @param connectionFactory the connection factory. + * @return the spec. + */ + public static TcpInboundGatewaySpec inboundGateway(AbstractConnectionFactory connectionFactory) { + return new TcpInboundGatewaySpec(connectionFactory); } - public static > TcpInboundChannelAdapterSpec inboundAdapter( - AbstractConnectionFactory connectionFactory) { - return new TcpInboundChannelAdapterSpec<>(connectionFactory); + /** + * Create an inbound channel adapter using the supplied connection factory. + * @param connectionFactory the connection factory. + * @return the spec. + */ + public static TcpInboundChannelAdapterSpec inboundAdapter(AbstractConnectionFactory connectionFactory) { + return new TcpInboundChannelAdapterSpec(connectionFactory); } + /** + * Create an outbound gateway using the supplied client connection factory. + * @param connectionFactory the connection factory. + * @return the spec. + */ public static TcpOutboundGatewaySpec outboundGateway(AbstractClientConnectionFactory connectionFactory) { return new TcpOutboundGatewaySpec(connectionFactory); } + /** + * Create an outbound gateway using the supplied connection factory. + * @param connectionFactory the connection factory. + * @return the spec. + */ public static TcpOutboundChannelAdapterSpec outboundAdapter(AbstractConnectionFactory connectionFactory) { return new TcpOutboundChannelAdapterSpec(connectionFactory); } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpClientConnectionFactorySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpClientConnectionFactorySpec.java index 116b2e79d49..ab27049d3b9 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpClientConnectionFactorySpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpClientConnectionFactorySpec.java @@ -25,21 +25,17 @@ * An {@link IntegrationComponentSpec} for {@link AbstractClientConnectionFactory}s. * @author Gary Russell * - * @param the target {@link TcpClientConnectionFactorySpec} implementation type. - * @param the target {@link AbstractClientConnectionFactory} implementation type. - * * @since 5.0 * */ public class TcpClientConnectionFactorySpec - , C extends AbstractClientConnectionFactory> - extends AbstractConnectionFactorySpec { + extends AbstractConnectionFactorySpec { TcpClientConnectionFactorySpec(String host, int port) { this(host, port, false); } - public TcpClientConnectionFactorySpec(String host, int port, boolean nio) { + TcpClientConnectionFactorySpec(String host, int port, boolean nio) { super(nio ? new TcpNioClientConnectionFactory(host, port) : new TcpNetClientConnectionFactory(host, port)); } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java index 45551a6f771..3c4adc87a2f 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java @@ -28,8 +28,8 @@ * @since 5.0 * */ -public class TcpInboundChannelAdapterSpec> - extends MessageProducerSpec { +public class TcpInboundChannelAdapterSpec + extends MessageProducerSpec { TcpInboundChannelAdapterSpec(AbstractConnectionFactory connectionFactory) { super(new TcpReceivingChannelAdapter()); @@ -41,7 +41,7 @@ public class TcpInboundChannelAdapterSpec> - extends MessagingGatewaySpec { +public class TcpInboundGatewaySpec extends MessagingGatewaySpec { TcpInboundGatewaySpec(AbstractConnectionFactory connectionFactory) { super(new TcpInboundGateway()); @@ -41,7 +40,7 @@ public class TcpInboundGatewaySpec> * @return the spec. * @see TcpInboundGateway#setClientMode(boolean) */ - public S clientMode(boolean isClientMode) { + public TcpInboundGatewaySpec clientMode(boolean isClientMode) { this.target.setClientMode(isClientMode); return _this(); } @@ -51,7 +50,7 @@ public S clientMode(boolean isClientMode) { * @return the spec. * @see TcpInboundGateway#setRetryInterval(long) */ - public S retryInterval(long retryInterval) { + public TcpInboundGatewaySpec retryInterval(long retryInterval) { this.target.setRetryInterval(retryInterval); return _this(); } @@ -61,7 +60,7 @@ public S retryInterval(long retryInterval) { * @return the spec. * @see TcpInboundGateway#setTaskScheduler(TaskScheduler) */ - public S taskScheduler(TaskScheduler taskScheduler) { + public TcpInboundGatewaySpec taskScheduler(TaskScheduler taskScheduler) { this.target.setTaskScheduler(taskScheduler); return _this(); } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpServerConnectionFactorySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpServerConnectionFactorySpec.java index 0799894df28..82085d93542 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpServerConnectionFactorySpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpServerConnectionFactorySpec.java @@ -25,21 +25,17 @@ * An {@link IntegrationComponentSpec} for {@link AbstractServerConnectionFactory}s. * @author Gary Russell * - * @param the target {@link TcpServerConnectionFactorySpec} implementation type. - * @param the target {@link AbstractServerConnectionFactory} implementation type. - * * @since 5.0 * */ public class TcpServerConnectionFactorySpec - , C extends AbstractServerConnectionFactory> - extends AbstractConnectionFactorySpec { + extends AbstractConnectionFactorySpec { TcpServerConnectionFactorySpec(int port) { this(port, false); } - public TcpServerConnectionFactorySpec(int port, boolean nio) { + TcpServerConnectionFactorySpec(int port, boolean nio) { super(nio ? new TcpNioServerConnectionFactory(port) : new TcpNetServerConnectionFactory(port)); } @@ -48,7 +44,7 @@ public TcpServerConnectionFactorySpec(int port, boolean nio) { * @return the spec. * @see AbstractServerConnectionFactory#setLocalAddress(String) */ - public S localAddress(String localAddress) { + public TcpServerConnectionFactorySpec localAddress(String localAddress) { this.target.setLocalAddress(localAddress); return _this(); } @@ -58,7 +54,7 @@ public S localAddress(String localAddress) { * @return the spec. * @see AbstractServerConnectionFactory#setBacklog(int) */ - public S backlog(int backlog) { + public TcpServerConnectionFactorySpec backlog(int backlog) { this.target.setBacklog(backlog); return _this(); } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java index f84ff4cf89b..03f9540a22d 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java @@ -29,22 +29,20 @@ private Udp() { super(); } - public static > UdpInboundChannelAdapterSpec inboundAdapter( - int port) { - return new UdpInboundChannelAdapterSpec<>(port); + public static UdpInboundChannelAdapterSpec inboundAdapter(int port) { + return new UdpInboundChannelAdapterSpec(port); } - public static > UdpInboundChannelAdapterSpec inboundMulticastAdapter( - int port, String multicastGroup) { - return new UdpInboundChannelAdapterSpec<>(port, multicastGroup); + public static UdpInboundChannelAdapterSpec inboundMulticastAdapter(int port, String multicastGroup) { + return new UdpInboundChannelAdapterSpec(port, multicastGroup); } public static UdpOutboundChannelAdapterSpec outboundAdapter(String destinationExpression) { return new UdpOutboundChannelAdapterSpec(destinationExpression); } - public static UdpOutboundChannelAdapterSpec outboundMultcastAdapter(String destinationExpression) { - return new UdpOutboundChannelAdapterSpec(destinationExpression); + public static UdpOutboundChannelAdapterSpec outboundMulticastAdapter(String destinationExpression) { + return new UdpMulticastOutboundChannelAdapterSpec(destinationExpression); } } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpInboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpInboundChannelAdapterSpec.java index 66d8e407085..acde49046d6 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpInboundChannelAdapterSpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpInboundChannelAdapterSpec.java @@ -25,14 +25,14 @@ import org.springframework.scheduling.TaskScheduler; /** - * A {@link MessageProducerSpec} for {@link UnicastReceivingChannelAdapter}s. + * A {@link MessageProducerSpec} for {@link UnicastReceivingChannelAdapter}UdpInboundChannelAdapterSpec. * * @author Gary Russell * @since 5.0 * */ -public class UdpInboundChannelAdapterSpec> - extends MessageProducerSpec { +public class UdpInboundChannelAdapterSpec + extends MessageProducerSpec { UdpInboundChannelAdapterSpec(int port) { super(new UnicastReceivingChannelAdapter(port)); @@ -47,7 +47,7 @@ public class UdpInboundChannelAdapterSpec("foo")); Message receivedMessage = received.receive(10000); assertNotNull(receivedMessage); - assertEquals("foo", new ObjectToStringTransformer().transform(receivedMessage).getPayload()); + assertEquals("foo", Transformers.objectToString().transform(receivedMessage).getPayload()); client.stop(); server.stop(); } @@ -111,7 +111,7 @@ public void testTcpGateways() { this.client1.setPort(this.server1.getPort()); IntegrationFlow flow = f -> f .handle(Tcp.outboundGateway(this.client1)) - .transform(new ObjectToStringTransformer()); + .transform(Transformers.objectToString()); IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register(); assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO")); } @@ -125,7 +125,7 @@ public void testUdp() { this.udpOut.send(outMessage); Message received = this.udpIn.receive(10000); assertNotNull(received); - assertEquals("foo", new ObjectToStringTransformer().transform(received).getPayload()); + assertEquals("foo", Transformers.objectToString().transform(received).getPayload()); } @Configuration @@ -151,7 +151,7 @@ public AbstractClientConnectionFactory client1() { @Bean public IntegrationFlow inTcpGateway() { return IntegrationFlows.from(Tcp.inboundGateway(server1())) - .transform(new ObjectToStringTransformer()) + .transform(Transformers.objectToString()) .transform(String::toUpperCase) .get(); } From 8f658cd510c630474d2100a3415db9b5ddc466dc Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 9 Nov 2016 10:37:03 -0500 Subject: [PATCH 3/8] More Polishing - PR Comments --- .../ip/dsl/AbstractConnectionFactorySpec.java | 25 ++++++------ .../dsl/TcpClientConnectionFactorySpec.java | 3 +- .../ip/dsl/TcpInboundChannelAdapterSpec.java | 15 ++++++- .../ip/dsl/TcpInboundGatewaySpec.java | 15 ++++++- .../ip/dsl/TcpOutboundChannelAdapterSpec.java | 15 ++++++- .../ip/dsl/TcpOutboundGatewaySpec.java | 40 ++++++++++++++++++- .../dsl/TcpServerConnectionFactorySpec.java | 3 +- .../integration/ip/dsl/Udp.java | 26 +++++++++++- .../ip/dsl/UdpInboundChannelAdapterSpec.java | 4 +- .../ip/dsl/UdpOutboundChannelAdapterSpec.java | 8 +++- .../connection/AbstractConnectionFactory.java | 9 +++++ .../udp/MulticastSendingMessageHandler.java | 4 +- .../ip/dsl/IpIntegrationTests.java | 19 +++------ .../jms/dsl/JmsOutboundGatewaySpec.java | 2 +- 14 files changed, 146 insertions(+), 42 deletions(-) diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractConnectionFactorySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractConnectionFactorySpec.java index f06590da046..8380ee333c4 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractConnectionFactorySpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractConnectionFactorySpec.java @@ -50,7 +50,7 @@ public S id(String id) { } /** - * @param soTimeout the socket timeout option. + * @param soTimeout the timeout socket option. * @return the spec. * @see AbstractConnectionFactory#setSoTimeout(int) */ @@ -60,7 +60,7 @@ public S soTimeout(int soTimeout) { } /** - * @param soReceiveBufferSize the socket receive buffer size option. + * @param soReceiveBufferSize the receive buffer size socket option. * @return the spec. * @see AbstractConnectionFactory#setSoReceiveBufferSize(int) */ @@ -70,7 +70,7 @@ public S soReceiveBufferSize(int soReceiveBufferSize) { } /** - * @param soSendBufferSize the socket send buffer size option. + * @param soSendBufferSize the send buffer size socket option. * @return the spec. * @see AbstractConnectionFactory#setSoSendBufferSize(int) */ @@ -80,7 +80,7 @@ public S soSendBufferSize(int soSendBufferSize) { } /** - * @param soTcpNoDelay the socket TCP no delay option (disable Nagle's algorithm). + * @param soTcpNoDelay the TCP no delay socket option (disable Nagle's algorithm). * @return the spec. * @see AbstractConnectionFactory#setSoTcpNoDelay(boolean) */ @@ -90,7 +90,7 @@ public S soTcpNoDelay(boolean soTcpNoDelay) { } /** - * @param soLinger the socket linger option. + * @param soLinger the linger socket option. * @return the spec. * @see AbstractConnectionFactory#setSoLinger(int) */ @@ -100,7 +100,7 @@ public S soLinger(int soLinger) { } /** - * @param soKeepAlive the socket keepalive option. + * @param soKeepAlive the keep alive socket option. * @return the spec. * @see AbstractConnectionFactory#setSoKeepAlive(boolean) */ @@ -110,7 +110,7 @@ public S soKeepAlive(boolean soKeepAlive) { } /** - * @param soTrafficClass the socket traffic class option. + * @param soTrafficClass the traffic class socket option. * @return the spec. * @see AbstractConnectionFactory#setSoTrafficClass(int) */ @@ -120,7 +120,7 @@ public S soTrafficClass(int soTrafficClass) { } /** - * @param taskExecutor the task excecutor. + * @param taskExecutor the task executor. * @return the spec. * @see AbstractConnectionFactory#setTaskExecutor(Executor) */ @@ -160,13 +160,12 @@ public S mapper(TcpMessageMapper mapper) { } /** - * @param keepOpen true to keep the socket open for additional messages; inverse - * of {@link AbstractConnectionFactory#setSingleUse(boolean)}. + * @param leaveOpen true to leave the socket open for additional messages. * @return the spec. - * @see AbstractConnectionFactory#setSingleUse(boolean) + * @see AbstractConnectionFactory#setLeaveOpen(boolean) */ - public S keepOpen(boolean keepOpen) { - this.target.setSingleUse(!keepOpen); + public S leaveOpen(boolean leaveOpen) { + this.target.setLeaveOpen(leaveOpen); return _this(); } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpClientConnectionFactorySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpClientConnectionFactorySpec.java index ab27049d3b9..7b29328698a 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpClientConnectionFactorySpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpClientConnectionFactorySpec.java @@ -16,13 +16,12 @@ package org.springframework.integration.ip.dsl; -import org.springframework.integration.dsl.IntegrationComponentSpec; import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory; import org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory; /** - * An {@link IntegrationComponentSpec} for {@link AbstractClientConnectionFactory}s. + * An {@link AbstractConnectionFactorySpec} for {@link AbstractClientConnectionFactory}s. * @author Gary Russell * * @since 5.0 diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java index 3c4adc87a2f..3633d98a51d 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java @@ -16,6 +16,10 @@ package org.springframework.integration.ip.dsl; +import java.util.Collection; +import java.util.Collections; + +import org.springframework.integration.dsl.ComponentsRegistration; import org.springframework.integration.dsl.MessageProducerSpec; import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter; import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; @@ -29,10 +33,14 @@ * */ public class TcpInboundChannelAdapterSpec - extends MessageProducerSpec { + extends MessageProducerSpec + implements ComponentsRegistration { + + private final AbstractConnectionFactory connectionFactory; TcpInboundChannelAdapterSpec(AbstractConnectionFactory connectionFactory) { super(new TcpReceivingChannelAdapter()); + this.connectionFactory = connectionFactory; this.target.setConnectionFactory(connectionFactory); } @@ -66,4 +74,9 @@ public TcpInboundChannelAdapterSpec taskScheduler(TaskScheduler taskScheduler) { return _this(); } + @Override + public Collection getComponentsToRegister() { + return Collections.singletonList(this.connectionFactory); + } + } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java index 817470f852e..057ccc37cb2 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java @@ -16,6 +16,10 @@ package org.springframework.integration.ip.dsl; +import java.util.Collection; +import java.util.Collections; + +import org.springframework.integration.dsl.ComponentsRegistration; import org.springframework.integration.dsl.MessagingGatewaySpec; import org.springframework.integration.ip.tcp.TcpInboundGateway; import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; @@ -28,10 +32,14 @@ * @since 5.0 * */ -public class TcpInboundGatewaySpec extends MessagingGatewaySpec { +public class TcpInboundGatewaySpec extends MessagingGatewaySpec + implements ComponentsRegistration { + + private final AbstractConnectionFactory connectionFactory; TcpInboundGatewaySpec(AbstractConnectionFactory connectionFactory) { super(new TcpInboundGateway()); + this.connectionFactory = connectionFactory; this.target.setConnectionFactory(connectionFactory); } @@ -65,4 +73,9 @@ public TcpInboundGatewaySpec taskScheduler(TaskScheduler taskScheduler) { return _this(); } + @Override + public Collection getComponentsToRegister() { + return Collections.singletonList(this.connectionFactory); + } + } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java index beb98074ada..c66425af2a6 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java @@ -16,6 +16,10 @@ package org.springframework.integration.ip.dsl; +import java.util.Collection; +import java.util.Collections; + +import org.springframework.integration.dsl.ComponentsRegistration; import org.springframework.integration.dsl.MessageHandlerSpec; import org.springframework.integration.ip.tcp.TcpSendingMessageHandler; import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; @@ -29,10 +33,14 @@ * */ public class TcpOutboundChannelAdapterSpec - extends MessageHandlerSpec { + extends MessageHandlerSpec + implements ComponentsRegistration { + + private final AbstractConnectionFactory connectionFactory; TcpOutboundChannelAdapterSpec(AbstractConnectionFactory connectionFactory) { this.target = new TcpSendingMessageHandler(); + this.connectionFactory = connectionFactory; this.target.setConnectionFactory(connectionFactory); } @@ -66,4 +74,9 @@ public TcpOutboundChannelAdapterSpec taskScheduler(TaskScheduler taskScheduler) return _this(); } + @Override + public Collection getComponentsToRegister() { + return Collections.singletonList(this.connectionFactory); + } + } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java index 4f9ff079599..ef2309bfb42 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java @@ -16,9 +16,16 @@ package org.springframework.integration.ip.dsl; +import java.util.Collection; +import java.util.Collections; +import java.util.function.Function; + +import org.springframework.integration.dsl.ComponentsRegistration; import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.expression.FunctionExpression; import org.springframework.integration.ip.tcp.TcpOutboundGateway; import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; +import org.springframework.messaging.Message; /** * A {@link MessageHandlerSpec} for {@link TcpOutboundGateway}s. @@ -27,16 +34,47 @@ * @since 5.0 * */ -public class TcpOutboundGatewaySpec extends MessageHandlerSpec { +public class TcpOutboundGatewaySpec extends MessageHandlerSpec + implements ComponentsRegistration { + + private final AbstractClientConnectionFactory connectionFactory; public TcpOutboundGatewaySpec(AbstractClientConnectionFactory connectionFactory) { this.target = new TcpOutboundGateway(); + this.connectionFactory = connectionFactory; this.target.setConnectionFactory(connectionFactory); } + /** + * @param remoteTimeout the remote timeout to set. + * @return the spec. + * @see TcpOutboundGateway#setRemoteTimeout(long) + */ public TcpOutboundGatewaySpec remoteTimeout(long remoteTimeout) { this.target.setRemoteTimeout(remoteTimeout); return _this(); } + /** + * Configure a {@link Function} that will be invoked at runtime to determine the destination to + * which a message will be sent. Typically used with a Java 8 Lambda expression: + *
+	 * {@code
+	 * .remoteTimeout(m -> m.getHeaders().get('rto'))
+	 * }
+	 * 
+ * @param remoteTimeoutFunction the function. + * @return the spec. + * @see TcpOutboundGateway#setRemoteTimeoutExpression(org.springframework.expression.Expression) + */ + public

TcpOutboundGatewaySpec remoteTimeout(Function, ?> remoteTimeoutFunction) { + this.target.setRemoteTimeoutExpression(new FunctionExpression<>(remoteTimeoutFunction)); + return _this(); + } + + @Override + public Collection getComponentsToRegister() { + return Collections.singletonList(this.connectionFactory); + } + } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpServerConnectionFactorySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpServerConnectionFactorySpec.java index 82085d93542..36a030331cb 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpServerConnectionFactorySpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpServerConnectionFactorySpec.java @@ -16,13 +16,12 @@ package org.springframework.integration.ip.dsl; -import org.springframework.integration.dsl.IntegrationComponentSpec; import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory; import org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory; /** - * An {@link IntegrationComponentSpec} for {@link AbstractServerConnectionFactory}s. + * An {@link AbstractConnectionFactorySpec} for {@link AbstractServerConnectionFactory}s. * @author Gary Russell * * @since 5.0 diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java index 03f9540a22d..fb56ade3ca5 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java @@ -29,19 +29,43 @@ private Udp() { super(); } + /** + * Create an inbound unicast channel adapter using the supplied port. + * @param port the port. + * @return the spec. + */ public static UdpInboundChannelAdapterSpec inboundAdapter(int port) { return new UdpInboundChannelAdapterSpec(port); } + /** + * Create an inbound multicast channel adapter using the supplied port and + * group. + * @param port the port. + * @param multicastGroup the group. + * @return the spec. + */ public static UdpInboundChannelAdapterSpec inboundMulticastAdapter(int port, String multicastGroup) { return new UdpInboundChannelAdapterSpec(port, multicastGroup); } + /** + * Create an inbound unicast channel adapter using the supplied destination + * expression. + * @param destinationExpression destination expression. + * @return the spec. + */ public static UdpOutboundChannelAdapterSpec outboundAdapter(String destinationExpression) { return new UdpOutboundChannelAdapterSpec(destinationExpression); } - public static UdpOutboundChannelAdapterSpec outboundMulticastAdapter(String destinationExpression) { + /** + * Create an inbound multicast channel adapter using the supplied destination + * expression. + * @param destinationExpression destination expression. + * @return the spec. + */ + public static UdpMulticastOutboundChannelAdapterSpec outboundMulticastAdapter(String destinationExpression) { return new UdpMulticastOutboundChannelAdapterSpec(destinationExpression); } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpInboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpInboundChannelAdapterSpec.java index acde49046d6..537a2d0ebfd 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpInboundChannelAdapterSpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpInboundChannelAdapterSpec.java @@ -25,7 +25,7 @@ import org.springframework.scheduling.TaskScheduler; /** - * A {@link MessageProducerSpec} for {@link UnicastReceivingChannelAdapter}UdpInboundChannelAdapterSpec. + * A {@link MessageProducerSpec} for {@link UnicastReceivingChannelAdapter}s. * * @author Gary Russell * @since 5.0 @@ -133,7 +133,7 @@ public UdpInboundChannelAdapterSpec socket(DatagramSocket socket) { } /** - * @param soSendBufferSize set the send bufffer size socket option. + * @param soSendBufferSize set the send buffer size socket option. * @return the spec. * @see UnicastReceivingChannelAdapter#setSoSendBufferSize(int) */ diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java index 17605cbe6b7..47c48a44729 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java @@ -16,9 +16,13 @@ package org.springframework.integration.ip.dsl; +import java.util.function.Function; + import org.springframework.expression.Expression; import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.expression.FunctionExpression; import org.springframework.integration.ip.udp.UnicastSendingMessageHandler; +import org.springframework.messaging.Message; /** * A {@link MessageHandlerSpec} for {@link UnicastSendingMessageHandler}s. @@ -103,8 +107,8 @@ public UdpOutboundChannelAdapterSpec ackCounter(int ackCounter) { * @return the spec. * @see UnicastSendingMessageHandler#setSocketExpression(Expression) */ - public UdpOutboundChannelAdapterSpec socketExpression(Expression socketExpression) { - this.target.setSocketExpression(socketExpression); + public UdpOutboundChannelAdapterSpec socketExpression(Function, ?> socketExpressionFunction) { + this.target.setSocketExpression(new FunctionExpression<>(socketExpressionFunction)); return _this(); } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java index 6c1e4ce7abe..acb3156a0cd 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java @@ -414,6 +414,15 @@ public void setSingleUse(boolean singleUse) { this.singleUse = singleUse; } + /** + * If true, sockets created by this factory will be reused. + * Inverse of {@link #setSingleUse(boolean)}. + * @param leaveOpen The keepOpen to set. + * @since 5.0 + */ + public void setLeaveOpen(boolean leaveOpen) { + this.singleUse = !leaveOpen; + } public void setInterceptorFactoryChain(TcpConnectionInterceptorFactoryChain interceptorFactoryChain) { this.interceptorFactoryChain = interceptorFactoryChain; diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/MulticastSendingMessageHandler.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/MulticastSendingMessageHandler.java index 06be6d94ff0..36c92f137ac 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/MulticastSendingMessageHandler.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/MulticastSendingMessageHandler.java @@ -103,7 +103,7 @@ public MulticastSendingMessageHandler(String address, int port, } /** - * Construct UnicastSendingMessageHandler based on the destination SpEL expression to + * Construct MulticastSendingMessageHandler based on the destination SpEL expression to * determine the target destination at runtime against requestMessage. * @param destinationExpression the SpEL expression to evaluate the target destination * at runtime. Must evaluate to {@link String}, {@link URI} or {@link SocketAddress}. @@ -114,7 +114,7 @@ public MulticastSendingMessageHandler(Expression destinationExpression) { } /** - * Construct UnicastSendingMessageHandler based on the destination SpEL expression to + * Construct MulticastSendingMessageHandler based on the destination SpEL expression to * determine the target destination at runtime against requestMessage. * @param destinationExpression the SpEL expression to evaluate the target destination * at runtime. Must evaluate to {@link String}, {@link URI} or {@link SocketAddress}. diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java index dc0ecf1b358..817a5102908 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java @@ -62,9 +62,6 @@ public class IpIntegrationTests { @Autowired private AbstractServerConnectionFactory server1; - @Autowired - private AbstractClientConnectionFactory client1; - @Autowired private IntegrationFlowContext flowContext; @@ -108,9 +105,13 @@ public void testTcpAdapters() throws Exception { @Test public void testTcpGateways() { TestingUtilities.waitListening(this.server1, null); - this.client1.setPort(this.server1.getPort()); IntegrationFlow flow = f -> f - .handle(Tcp.outboundGateway(this.client1)) + .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort()) + .serializer(TcpCodecs.crlf()) + .deserializer(TcpCodecs.lengthHeader1()) + .id("client1") + .get()) + .remoteTimeout(m -> 5000)) .transform(Transformers.objectToString()); IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register(); assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO")); @@ -140,14 +141,6 @@ public AbstractServerConnectionFactory server1() { .get(); } - @Bean - public AbstractClientConnectionFactory client1() { - return Tcp.netClient("localhost", 0) - .serializer(TcpCodecs.crlf()) - .deserializer(TcpCodecs.lengthHeader1()) - .get(); - } - @Bean public IntegrationFlow inTcpGateway() { return IntegrationFlows.from(Tcp.inboundGateway(server1())) diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsOutboundGatewaySpec.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsOutboundGatewaySpec.java index 8c25f36e417..a0810b608e2 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsOutboundGatewaySpec.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsOutboundGatewaySpec.java @@ -108,7 +108,7 @@ public JmsOutboundGatewaySpec requestDestinationExpression(String destination) { } /** - * Configure a {@link Function} that will be invoked at run time to determine the destination to + * Configure a {@link Function} that will be invoked at runtime to determine the destination to * which a message will be sent. Typically used with a Java 8 Lambda expression: *
 	 * {@code

From e546eab073bb363bd2b8173015ba1391e2b7402a Mon Sep 17 00:00:00 2001
From: Gary Russell 
Date: Wed, 9 Nov 2016 12:59:11 -0500
Subject: [PATCH 4/8] Checkstyle/Javadoc Fixes

---
 .../integration/ip/dsl/TcpOutboundGatewaySpec.java           | 1 +
 .../integration/ip/dsl/UdpOutboundChannelAdapterSpec.java    | 5 ++---
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java
index ef2309bfb42..0b2c67db7df 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java
@@ -64,6 +64,7 @@ public TcpOutboundGatewaySpec remoteTimeout(long remoteTimeout) {
 	 * }
 	 * 
* @param remoteTimeoutFunction the function. + * @param

the message payload type. * @return the spec. * @see TcpOutboundGateway#setRemoteTimeoutExpression(org.springframework.expression.Expression) */ diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java index 47c48a44729..5a280c1334c 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java @@ -18,7 +18,6 @@ import java.util.function.Function; -import org.springframework.expression.Expression; import org.springframework.integration.dsl.MessageHandlerSpec; import org.springframework.integration.expression.FunctionExpression; import org.springframework.integration.ip.udp.UnicastSendingMessageHandler; @@ -103,9 +102,9 @@ public UdpOutboundChannelAdapterSpec ackCounter(int ackCounter) { } /** - * @param socketExpression the socket expression. + * @param socketExpressionFunction the socket expression. * @return the spec. - * @see UnicastSendingMessageHandler#setSocketExpression(Expression) + * @see UnicastSendingMessageHandler#setSocketExpression(org.springframework.expression.Expression) */ public UdpOutboundChannelAdapterSpec socketExpression(Function, ?> socketExpressionFunction) { this.target.setSocketExpression(new FunctionExpression<>(socketExpressionFunction)); From b018c0b3ece689605cf048c49c96c67bea09fcfe Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 9 Nov 2016 15:07:39 -0500 Subject: [PATCH 5/8] ComponentsRegistration Improvements Add ...Spec CTORs to avoid the issue described here: https://github.com/spring-projects/spring-integration-java-dsl/issues/137 Also other PR comments. --- .../integration/ip/dsl/Tcp.java | 50 +++++++++++++++++-- .../ip/dsl/TcpInboundChannelAdapterSpec.java | 23 +++++++-- .../ip/dsl/TcpInboundGatewaySpec.java | 23 +++++++-- .../ip/dsl/TcpOutboundChannelAdapterSpec.java | 23 +++++++-- .../ip/dsl/TcpOutboundGatewaySpec.java | 23 +++++++-- .../ip/dsl/UdpOutboundChannelAdapterSpec.java | 11 ++-- .../ip/dsl/IpIntegrationTests.java | 3 +- 7 files changed, 131 insertions(+), 25 deletions(-) diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Tcp.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Tcp.java index 635783b58e6..e28de38537c 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Tcp.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Tcp.java @@ -83,38 +83,80 @@ public static TcpClientConnectionFactorySpec netClient(String host, int port) { /** * Create an inbound gateway using the supplied connection factory. - * @param connectionFactory the connection factory. + * @param connectionFactory the connection factory - must be an existing bean - it + * will not be initialized. * @return the spec. */ public static TcpInboundGatewaySpec inboundGateway(AbstractConnectionFactory connectionFactory) { return new TcpInboundGatewaySpec(connectionFactory); } + /** + * Create an inbound gateway using the supplied connection factory. + * @param connectionFactorySpec the connection factory spec. + * @return the spec. + */ + public static TcpInboundGatewaySpec inboundGateway(AbstractConnectionFactorySpec connectionFactorySpec) { + return new TcpInboundGatewaySpec(connectionFactorySpec); + } + /** * Create an inbound channel adapter using the supplied connection factory. - * @param connectionFactory the connection factory. + * @param connectionFactory the connection factory - must be an existing bean - it + * will not be initialized. * @return the spec. */ public static TcpInboundChannelAdapterSpec inboundAdapter(AbstractConnectionFactory connectionFactory) { return new TcpInboundChannelAdapterSpec(connectionFactory); } + /** + * Create an inbound channel adapter using the supplied connection factory. + * @param connectionFactorySpec the connection factory spec. + * @return the spec. + */ + public static TcpInboundChannelAdapterSpec inboundAdapter( + AbstractConnectionFactorySpec connectionFactorySpec) { + return new TcpInboundChannelAdapterSpec(connectionFactorySpec); + } + /** * Create an outbound gateway using the supplied client connection factory. - * @param connectionFactory the connection factory. + * @param connectionFactory the connection factory - must be an existing bean - it + * will not be initialized. * @return the spec. */ public static TcpOutboundGatewaySpec outboundGateway(AbstractClientConnectionFactory connectionFactory) { return new TcpOutboundGatewaySpec(connectionFactory); } + /** + * Create an outbound gateway using the supplied client connection factory. + * @param connectionFactory the connection factory spec. + * @return the spec. + */ + public static TcpOutboundGatewaySpec outboundGateway(TcpClientConnectionFactorySpec connectionFactory) { + return new TcpOutboundGatewaySpec(connectionFactory); + } + /** * Create an outbound gateway using the supplied connection factory. - * @param connectionFactory the connection factory. + * @param connectionFactory the connection factory - must be an existing bean - it + * will not be initialized. * @return the spec. */ public static TcpOutboundChannelAdapterSpec outboundAdapter(AbstractConnectionFactory connectionFactory) { return new TcpOutboundChannelAdapterSpec(connectionFactory); } + /** + * Create an outbound gateway using the supplied connection factory. + * @param connectionFactorySpec the connection factory. + * @return the spec. + */ + public static TcpOutboundChannelAdapterSpec outboundAdapter( + AbstractConnectionFactorySpec connectionFactorySpec) { + return new TcpOutboundChannelAdapterSpec(connectionFactorySpec); + } + } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java index 3633d98a51d..bb7d9283504 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java @@ -38,10 +38,24 @@ public class TcpInboundChannelAdapterSpec private final AbstractConnectionFactory connectionFactory; - TcpInboundChannelAdapterSpec(AbstractConnectionFactory connectionFactory) { + /** + * Construct an instance using an existing spring-managed connection factory. + * @param connectionFactoryBean the spring-managed bean. + */ + TcpInboundChannelAdapterSpec(AbstractConnectionFactory connectionFactoryBean) { + super(new TcpReceivingChannelAdapter()); + this.connectionFactory = null; + this.target.setConnectionFactory(connectionFactoryBean); + } + + /** + * Construct an instance using the provided connection factory spec. + * @param connectionFactorySpec the spec. + */ + TcpInboundChannelAdapterSpec(AbstractConnectionFactorySpec connectionFactorySpec) { super(new TcpReceivingChannelAdapter()); - this.connectionFactory = connectionFactory; - this.target.setConnectionFactory(connectionFactory); + this.connectionFactory = connectionFactorySpec.get(); + this.target.setConnectionFactory(this.connectionFactory); } /** @@ -76,7 +90,8 @@ public TcpInboundChannelAdapterSpec taskScheduler(TaskScheduler taskScheduler) { @Override public Collection getComponentsToRegister() { - return Collections.singletonList(this.connectionFactory); + return this.connectionFactory == null ? Collections.emptyList() + : Collections.singletonList(this.connectionFactory); } } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java index 057ccc37cb2..f632352c97d 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java @@ -37,10 +37,24 @@ public class TcpInboundGatewaySpec extends MessagingGatewaySpec connectionFactorySpec) { super(new TcpInboundGateway()); - this.connectionFactory = connectionFactory; - this.target.setConnectionFactory(connectionFactory); + this.connectionFactory = connectionFactorySpec.get(); + this.target.setConnectionFactory(this.connectionFactory); } /** @@ -75,7 +89,8 @@ public TcpInboundGatewaySpec taskScheduler(TaskScheduler taskScheduler) { @Override public Collection getComponentsToRegister() { - return Collections.singletonList(this.connectionFactory); + return this.connectionFactory == null ? Collections.emptyList() + : Collections.singletonList(this.connectionFactory); } } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java index c66425af2a6..17df57766a7 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java @@ -38,10 +38,24 @@ public class TcpOutboundChannelAdapterSpec private final AbstractConnectionFactory connectionFactory; - TcpOutboundChannelAdapterSpec(AbstractConnectionFactory connectionFactory) { + /** + * Construct an instance using an existing spring-managed connection factory. + * @param connectionFactoryBean the spring-managed bean. + */ + TcpOutboundChannelAdapterSpec(AbstractConnectionFactory connectionFactoryBean) { + this.target = new TcpSendingMessageHandler(); + this.connectionFactory = null; + this.target.setConnectionFactory(connectionFactoryBean); + } + + /** + * Construct an instance using the provided connection factory spec. + * @param connectionFactorySpec the spec. + */ + TcpOutboundChannelAdapterSpec(AbstractConnectionFactorySpec connectionFactorySpec) { this.target = new TcpSendingMessageHandler(); - this.connectionFactory = connectionFactory; - this.target.setConnectionFactory(connectionFactory); + this.connectionFactory = connectionFactorySpec.get(); + this.target.setConnectionFactory(this.connectionFactory); } /** @@ -76,7 +90,8 @@ public TcpOutboundChannelAdapterSpec taskScheduler(TaskScheduler taskScheduler) @Override public Collection getComponentsToRegister() { - return Collections.singletonList(this.connectionFactory); + return this.connectionFactory == null ? Collections.emptyList() + : Collections.singletonList(this.connectionFactory); } } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java index 0b2c67db7df..bbb5d6c2e91 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java @@ -39,10 +39,24 @@ public class TcpOutboundGatewaySpec extends MessageHandlerSpec TcpOutboundGatewaySpec remoteTimeout(Function, ?> remoteTi @Override public Collection getComponentsToRegister() { - return Collections.singletonList(this.connectionFactory); + return this.connectionFactory == null ? Collections.emptyList() + : Collections.singletonList(this.connectionFactory); } } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java index 5a280c1334c..a1a451bc05d 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java @@ -16,6 +16,7 @@ package org.springframework.integration.ip.dsl; +import java.net.DatagramSocket; import java.util.function.Function; import org.springframework.integration.dsl.MessageHandlerSpec; @@ -41,6 +42,10 @@ protected UdpOutboundChannelAdapterSpec() { this.target = new UnicastSendingMessageHandler(destinationExpression); } + UdpOutboundChannelAdapterSpec(Function, ?> destinationFunction) { + this.target = new UnicastSendingMessageHandler(new FunctionExpression<>(destinationFunction)); + } + /** * @param timeout the timeout socket option. * @return the spec. @@ -102,12 +107,12 @@ public UdpOutboundChannelAdapterSpec ackCounter(int ackCounter) { } /** - * @param socketExpressionFunction the socket expression. + * @param socketFunction the socket function. * @return the spec. * @see UnicastSendingMessageHandler#setSocketExpression(org.springframework.expression.Expression) */ - public UdpOutboundChannelAdapterSpec socketExpression(Function, ?> socketExpressionFunction) { - this.target.setSocketExpression(new FunctionExpression<>(socketExpressionFunction)); + public UdpOutboundChannelAdapterSpec socketExpression(Function, DatagramSocket> socketFunction) { + this.target.setSocketExpression(new FunctionExpression<>(socketFunction)); return _this(); } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java index 817a5102908..de7496ad702 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java @@ -109,8 +109,7 @@ public void testTcpGateways() { .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort()) .serializer(TcpCodecs.crlf()) .deserializer(TcpCodecs.lengthHeader1()) - .id("client1") - .get()) + .id("client1")) .remoteTimeout(m -> 5000)) .transform(Transformers.objectToString()); IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register(); From 3a05789ccedd51acd8d90c6ec7c61dbfa42fe57c Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 9 Nov 2016 16:19:45 -0500 Subject: [PATCH 6/8] Fix UDP Spec Inheritance --- ...bstractUdpOutboundChannelAdapterSpec.java} | 29 +++++++------ .../integration/ip/dsl/Udp.java | 29 ++++++++++++- ...dpMulticastOutboundChannelAdapterSpec.java | 13 +++++- .../UdpUnicastOutboundChannelAdapterSpec.java | 43 +++++++++++++++++++ .../ip/dsl/IpIntegrationTests.java | 7 +++ 5 files changed, 103 insertions(+), 18 deletions(-) rename spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/{UdpOutboundChannelAdapterSpec.java => AbstractUdpOutboundChannelAdapterSpec.java} (74%) create mode 100644 spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpUnicastOutboundChannelAdapterSpec.java diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractUdpOutboundChannelAdapterSpec.java similarity index 74% rename from spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java rename to spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractUdpOutboundChannelAdapterSpec.java index a1a451bc05d..bda5006b6ec 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractUdpOutboundChannelAdapterSpec.java @@ -25,24 +25,25 @@ import org.springframework.messaging.Message; /** - * A {@link MessageHandlerSpec} for {@link UnicastSendingMessageHandler}s. + * A {@link MessageHandlerSpec} for UDB {@link org.springframework.messaging.MessageHandler}s. * + * @param the target {@link UdpUnicastOutboundChannelAdapterSpec} implementation type. * @author Gary Russell * @since 5.0 * */ -public class UdpOutboundChannelAdapterSpec - extends MessageHandlerSpec { +public abstract class AbstractUdpOutboundChannelAdapterSpec> + extends MessageHandlerSpec { - protected UdpOutboundChannelAdapterSpec() { + protected AbstractUdpOutboundChannelAdapterSpec() { super(); } - UdpOutboundChannelAdapterSpec(String destinationExpression) { + AbstractUdpOutboundChannelAdapterSpec(String destinationExpression) { this.target = new UnicastSendingMessageHandler(destinationExpression); } - UdpOutboundChannelAdapterSpec(Function, ?> destinationFunction) { + AbstractUdpOutboundChannelAdapterSpec(Function, ?> destinationFunction) { this.target = new UnicastSendingMessageHandler(new FunctionExpression<>(destinationFunction)); } @@ -51,7 +52,7 @@ protected UdpOutboundChannelAdapterSpec() { * @return the spec. * @see UnicastSendingMessageHandler#setSoTimeout(int) */ - public UdpOutboundChannelAdapterSpec soTimeout(int timeout) { + public S soTimeout(int timeout) { this.target.setSoTimeout(timeout); return _this(); } @@ -61,7 +62,7 @@ public UdpOutboundChannelAdapterSpec soTimeout(int timeout) { * @return the spec. * @see UnicastSendingMessageHandler#setSoSendBufferSize(int) */ - public UdpOutboundChannelAdapterSpec soSendBufferSize(int size) { + public S soSendBufferSize(int size) { this.target.setSoSendBufferSize(size); return _this(); } @@ -71,7 +72,7 @@ public UdpOutboundChannelAdapterSpec soSendBufferSize(int size) { * @return the spec. * @see UnicastSendingMessageHandler#setLocalAddress(String) */ - public UdpOutboundChannelAdapterSpec localAddress(String localAddress) { + public S localAddress(String localAddress) { this.target.setLocalAddress(localAddress); return _this(); } @@ -81,7 +82,7 @@ public UdpOutboundChannelAdapterSpec localAddress(String localAddress) { * @return the spec. * @see UnicastSendingMessageHandler#setLengthCheck(boolean) */ - public UdpOutboundChannelAdapterSpec lengthCheck(boolean lengthCheck) { + public S lengthCheck(boolean lengthCheck) { this.target.setLengthCheck(lengthCheck); return _this(); } @@ -91,7 +92,7 @@ public UdpOutboundChannelAdapterSpec lengthCheck(boolean lengthCheck) { * @return the spec. * @see UnicastSendingMessageHandler#setSoReceiveBufferSize(int) */ - public UdpOutboundChannelAdapterSpec soReceiveBufferSize(int size) { + public S soReceiveBufferSize(int size) { this.target.setSoReceiveBufferSize(size); return _this(); } @@ -101,7 +102,7 @@ public UdpOutboundChannelAdapterSpec soReceiveBufferSize(int size) { * @return the spec. * @see UnicastSendingMessageHandler#setAckCounter(int) */ - public UdpOutboundChannelAdapterSpec ackCounter(int ackCounter) { + public S ackCounter(int ackCounter) { this.target.setAckCounter(ackCounter); return _this(); } @@ -111,7 +112,7 @@ public UdpOutboundChannelAdapterSpec ackCounter(int ackCounter) { * @return the spec. * @see UnicastSendingMessageHandler#setSocketExpression(org.springframework.expression.Expression) */ - public UdpOutboundChannelAdapterSpec socketExpression(Function, DatagramSocket> socketFunction) { + public S socketFunction(Function, DatagramSocket> socketFunction) { this.target.setSocketExpression(new FunctionExpression<>(socketFunction)); return _this(); } @@ -121,7 +122,7 @@ public UdpOutboundChannelAdapterSpec socketExpression(Function, Datag * @return the spec. * @see UnicastSendingMessageHandler#setSocketExpressionString(String) */ - public UdpOutboundChannelAdapterSpec socketExpression(String socketExpression) { + public S socketExpression(String socketExpression) { this.target.setSocketExpressionString(socketExpression); return _this(); } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java index fb56ade3ca5..8b2263537f8 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java @@ -16,6 +16,10 @@ package org.springframework.integration.ip.dsl; +import java.util.function.Function; + +import org.springframework.messaging.Message; + /** * Factory methods for UDP. * @@ -55,8 +59,18 @@ public static UdpInboundChannelAdapterSpec inboundMulticastAdapter(int port, Str * @param destinationExpression destination expression. * @return the spec. */ - public static UdpOutboundChannelAdapterSpec outboundAdapter(String destinationExpression) { - return new UdpOutboundChannelAdapterSpec(destinationExpression); + public static UdpUnicastOutboundChannelAdapterSpec outboundAdapter(String destinationExpression) { + return new UdpUnicastOutboundChannelAdapterSpec(destinationExpression); + } + + /** + * Create an inbound unicast channel adapter using the supplied destination + * expression. + * @param destinationFunction function that will provide the destination based on the message. + * @return the spec. + */ + public static UdpUnicastOutboundChannelAdapterSpec outboundAdapter(Function, ?> destinationFunction) { + return new UdpUnicastOutboundChannelAdapterSpec(destinationFunction); } /** @@ -69,4 +83,15 @@ public static UdpMulticastOutboundChannelAdapterSpec outboundMulticastAdapter(St return new UdpMulticastOutboundChannelAdapterSpec(destinationExpression); } + /** + * Create an inbound multicast channel adapter using the supplied destination + * expression. + * @param destinationFunction function that will provide the destination based on the message. + * @return the spec. + */ + public static UdpMulticastOutboundChannelAdapterSpec outboundMulticastAdapter(Function, ?> + destinationFunction) { + return new UdpMulticastOutboundChannelAdapterSpec(destinationFunction); + } + } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpMulticastOutboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpMulticastOutboundChannelAdapterSpec.java index b5f3f4901d1..c8ad0b2f022 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpMulticastOutboundChannelAdapterSpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpMulticastOutboundChannelAdapterSpec.java @@ -16,8 +16,12 @@ package org.springframework.integration.ip.dsl; +import java.util.function.Function; + import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.expression.FunctionExpression; import org.springframework.integration.ip.udp.MulticastSendingMessageHandler; +import org.springframework.messaging.Message; /** * A {@link MessageHandlerSpec} for {@link MulticastSendingMessageHandler}s. @@ -26,18 +30,23 @@ * @since 5.0 * */ -public class UdpMulticastOutboundChannelAdapterSpec extends UdpOutboundChannelAdapterSpec { +public class UdpMulticastOutboundChannelAdapterSpec + extends AbstractUdpOutboundChannelAdapterSpec { UdpMulticastOutboundChannelAdapterSpec(String destinationExpression) { this.target = new MulticastSendingMessageHandler(destinationExpression); } + UdpMulticastOutboundChannelAdapterSpec(Function, ?> destinationFunction) { + this.target = new MulticastSendingMessageHandler(new FunctionExpression<>(destinationFunction)); + } + /** * @param timeToLive the timeToLive. * @return the spec. * @see MulticastSendingMessageHandler#setTimeToLive(int) */ - public UdpOutboundChannelAdapterSpec timeToLive(int timeToLive) { + public UdpMulticastOutboundChannelAdapterSpec timeToLive(int timeToLive) { ((MulticastSendingMessageHandler) this.target).setTimeToLive(timeToLive); return _this(); } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpUnicastOutboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpUnicastOutboundChannelAdapterSpec.java new file mode 100644 index 00000000000..5988d7dad8a --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpUnicastOutboundChannelAdapterSpec.java @@ -0,0 +1,43 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import java.util.function.Function; + +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.messaging.Message; + +/** + * A {@link MessageHandlerSpec} for + * {@link org.springframework.integration.ip.udp.UnicastSendingMessageHandler}s. + * + * @author Gary Russell + * @since 5.0 + * + */ +public class UdpUnicastOutboundChannelAdapterSpec + extends AbstractUdpOutboundChannelAdapterSpec { + + UdpUnicastOutboundChannelAdapterSpec(Function, ?> destinationFunction) { + super(destinationFunction); + } + + UdpUnicastOutboundChannelAdapterSpec(String destinationExpression) { + super(destinationExpression); + } + +} diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java index de7496ad702..2b39c79bd82 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java @@ -128,6 +128,13 @@ public void testUdp() { assertEquals("foo", Transformers.objectToString().transform(received).getPayload()); } + @Test + public void testUdpInheritance() { + Udp.outboundMulticastAdapter("headers['udp_dest']") + .lengthCheck(true) + .timeToLive(10); + } + @Configuration @EnableIntegration public static class Config { From f4e876f6fb56deb8c283569fab81022cfa7e595c Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 9 Nov 2016 16:24:48 -0500 Subject: [PATCH 7/8] Jdbc Lock Test Log Adjuster --- .../leader/JdbcLockRegistryLeaderInitiatorTests.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java index 68304f142ff..450b4902997 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java @@ -25,8 +25,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.log4j.Level; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.springframework.integration.jdbc.lock.DefaultLockRepository; @@ -35,6 +37,7 @@ import org.springframework.integration.leader.DefaultCandidate; import org.springframework.integration.leader.event.LeaderEventPublisher; import org.springframework.integration.support.leader.LockRegistryLeaderInitiator; +import org.springframework.integration.test.rule.Log4jLevelAdjuster; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; @@ -45,6 +48,9 @@ */ public class JdbcLockRegistryLeaderInitiatorTests { + @Rule + public Log4jLevelAdjuster logAdjuster = new Log4jLevelAdjuster(Level.DEBUG, "org.springframework.integration"); + public static EmbeddedDatabase dataSource; @BeforeClass @@ -162,9 +168,9 @@ public void publishOnRevoked(Object source, Context context, String role) { private static class CountingPublisher implements LeaderEventPublisher { - private CountDownLatch granted; + private final CountDownLatch granted; - private CountDownLatch revoked; + private final CountDownLatch revoked; private volatile LockRegistryLeaderInitiator initiator; From 106d585b8e83eca3c4a276e90ea1809c0e28187b Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 9 Nov 2016 17:13:43 -0500 Subject: [PATCH 8/8] Fix Mongo Test Race Conditions --- .../MongoDbInboundChannelAdapterIntegrationTests.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/config/MongoDbInboundChannelAdapterIntegrationTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/config/MongoDbInboundChannelAdapterIntegrationTests.java index 70a37d5351d..9d2b67abc0d 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/config/MongoDbInboundChannelAdapterIntegrationTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/config/MongoDbInboundChannelAdapterIntegrationTests.java @@ -35,12 +35,12 @@ import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.integration.aop.AbstractMessageSourceAdvice; +import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.core.MessageSource; import org.springframework.integration.endpoint.SourcePollingChannelAdapter; import org.springframework.integration.mongodb.rules.MongoDbAvailable; import org.springframework.integration.mongodb.rules.MongoDbAvailableTests; import org.springframework.messaging.Message; -import org.springframework.messaging.PollableChannel; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @@ -62,7 +62,7 @@ public class MongoDbInboundChannelAdapterIntegrationTests extends MongoDbAvailab private MongoTemplate mongoTemplate; @Autowired - private PollableChannel replyChannel; + private QueueChannel replyChannel; @Autowired @Qualifier("mongoInboundAdapter") @@ -106,6 +106,7 @@ public void testWithDefaultMongoFactory() throws Exception { assertNotNull(this.replyChannel.receive(10000)); this.mongoInboundAdapter.stop(); + this.replyChannel.purge(null); } @Test @@ -121,6 +122,7 @@ public void testWithNamedMongoFactory() throws Exception { assertEquals("Bob", message.getPayload().get(0).get("name")); this.mongoInboundAdapterNamedFactory.stop(); + this.replyChannel.purge(null); } @Test @@ -136,6 +138,7 @@ public void testWithMongoTemplate() throws Exception { assertEquals("Bob", message.getPayload().getName()); this.mongoInboundAdapterWithTemplate.stop(); + this.replyChannel.purge(null); } @Test @@ -151,6 +154,7 @@ public void testWithNamedCollection() throws Exception { assertEquals("Bob", message.getPayload().get(0).getName()); this.mongoInboundAdapterWithNamedCollection.stop(); + this.replyChannel.purge(null); } @Test @@ -166,6 +170,7 @@ public void testWithNamedCollectionExpression() throws Exception { assertEquals("Bob", message.getPayload().get(0).getName()); this.mongoInboundAdapterWithNamedCollectionExpression.stop(); + this.replyChannel.purge(null); } @Test @@ -181,6 +186,7 @@ public void testWithOnSuccessDisposition() throws Exception { this.inboundAdapterWithOnSuccessDisposition.stop(); assertNull(this.mongoTemplate.findOne(new Query(Criteria.where("name").is("Bob")), Person.class, "data")); + this.replyChannel.purge(null); } @Test @@ -197,6 +203,7 @@ public void testWithMongoConverter() throws Exception { assertNotNull(replyChannel.receive(10000)); this.mongoInboundAdapterWithConverter.stop(); + this.replyChannel.purge(null); } @Test(expected = BeanDefinitionParsingException.class)