diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/config/TcpConnectionFactoryFactoryBean.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/config/TcpConnectionFactoryFactoryBean.java index c5c251eae57..f8bec67fdaa 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/config/TcpConnectionFactoryFactoryBean.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/config/TcpConnectionFactoryFactoryBean.java @@ -24,7 +24,7 @@ import org.springframework.beans.factory.config.AbstractFactoryBean; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; -import org.springframework.context.SmartLifecycle; +import org.springframework.context.Lifecycle; import org.springframework.core.serializer.Deserializer; import org.springframework.core.serializer.Serializer; import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; @@ -55,8 +55,8 @@ * @author Gary Russell * @since 2.0.5 */ -public class TcpConnectionFactoryFactoryBean extends AbstractFactoryBean implements SmartLifecycle, BeanNameAware, - BeanFactoryAware, ApplicationEventPublisherAware { +public class TcpConnectionFactoryFactoryBean extends AbstractFactoryBean + implements Lifecycle, BeanNameAware, BeanFactoryAware, ApplicationEventPublisherAware { private volatile AbstractConnectionFactory connectionFactory; @@ -446,33 +446,6 @@ public void stop() { this.connectionFactory.stop(); } - /** - * @return phase - * @see org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory#getPhase() - */ - @Override - public int getPhase() { - return this.connectionFactory.getPhase(); - } - - /** - * @return isAutoStartup - * @see org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory#isAutoStartup() - */ - @Override - public boolean isAutoStartup() { - return this.connectionFactory.isAutoStartup(); - } - - /** - * @param callback The Runnable to invoke. - * @see org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory#stop(java.lang.Runnable) - */ - @Override - public void stop(Runnable callback) { - this.connectionFactory.stop(callback); - } - @Override public boolean isRunning() { return this.connectionFactory.isRunning(); diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractConnectionFactorySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractConnectionFactorySpec.java new file mode 100644 index 00000000000..8380ee333c4 --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractConnectionFactorySpec.java @@ -0,0 +1,222 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import java.util.concurrent.Executor; + +import org.springframework.core.serializer.Deserializer; +import org.springframework.core.serializer.Serializer; +import org.springframework.integration.dsl.IntegrationComponentSpec; +import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain; +import org.springframework.integration.ip.tcp.connection.TcpMessageMapper; +import org.springframework.integration.ip.tcp.connection.TcpSocketSupport; + +/** + * An {@link IntegrationComponentSpec} for {@link AbstractConnectionFactory}s. + * @param the target {@link AbstractConnectionFactorySpec} implementation type. + * @param the target {@link AbstractConnectionFactory} implementation type. + * + * @author Gary Russell + * @since 5.0 + * + */ +public abstract class AbstractConnectionFactorySpec + , C extends AbstractConnectionFactory> + extends IntegrationComponentSpec { + + AbstractConnectionFactorySpec(C connectionFactory) { + this.target = connectionFactory; + } + + @Override + public S id(String id) { + this.target.setBeanName(id); + return _this(); + } + + /** + * @param soTimeout the timeout socket option. + * @return the spec. + * @see AbstractConnectionFactory#setSoTimeout(int) + */ + public S soTimeout(int soTimeout) { + this.target.setSoTimeout(soTimeout); + return _this(); + } + + /** + * @param soReceiveBufferSize the receive buffer size socket option. + * @return the spec. + * @see AbstractConnectionFactory#setSoReceiveBufferSize(int) + */ + public S soReceiveBufferSize(int soReceiveBufferSize) { + this.target.setSoReceiveBufferSize(soReceiveBufferSize); + return _this(); + } + + /** + * @param soSendBufferSize the send buffer size socket option. + * @return the spec. + * @see AbstractConnectionFactory#setSoSendBufferSize(int) + */ + public S soSendBufferSize(int soSendBufferSize) { + this.target.setSoSendBufferSize(soSendBufferSize); + return _this(); + } + + /** + * @param soTcpNoDelay the TCP no delay socket option (disable Nagle's algorithm). + * @return the spec. + * @see AbstractConnectionFactory#setSoTcpNoDelay(boolean) + */ + public S soTcpNoDelay(boolean soTcpNoDelay) { + this.target.setSoTcpNoDelay(soTcpNoDelay); + return _this(); + } + + /** + * @param soLinger the linger socket option. + * @return the spec. + * @see AbstractConnectionFactory#setSoLinger(int) + */ + public S soLinger(int soLinger) { + this.target.setSoLinger(soLinger); + return _this(); + } + + /** + * @param soKeepAlive the keep alive socket option. + * @return the spec. + * @see AbstractConnectionFactory#setSoKeepAlive(boolean) + */ + public S soKeepAlive(boolean soKeepAlive) { + this.target.setSoKeepAlive(soKeepAlive); + return _this(); + } + + /** + * @param soTrafficClass the traffic class socket option. + * @return the spec. + * @see AbstractConnectionFactory#setSoTrafficClass(int) + */ + public S soTrafficClass(int soTrafficClass) { + this.target.setSoTrafficClass(soTrafficClass); + return _this(); + } + + /** + * @param taskExecutor the task executor. + * @return the spec. + * @see AbstractConnectionFactory#setTaskExecutor(Executor) + */ + public S taskExecutor(Executor taskExecutor) { + this.target.setTaskExecutor(taskExecutor); + return _this(); + } + + /** + * @param deserializer the deserializer. + * @return the spec. + * @see AbstractConnectionFactory#setDeserializer(Deserializer) + */ + public S deserializer(Deserializer deserializer) { + this.target.setDeserializer(deserializer); + return _this(); + } + + /** + * @param serializer the serializer. + * @return the spec. + * @see AbstractConnectionFactory#setSerializer(Serializer) + */ + public S serializer(Serializer serializer) { + this.target.setSerializer(serializer); + return _this(); + } + + /** + * @param mapper the message mapper. + * @return the spec. + * @see AbstractConnectionFactory#setMapper(TcpMessageMapper) + */ + public S mapper(TcpMessageMapper mapper) { + this.target.setMapper(mapper); + return _this(); + } + + /** + * @param leaveOpen true to leave the socket open for additional messages. + * @return the spec. + * @see AbstractConnectionFactory#setLeaveOpen(boolean) + */ + public S leaveOpen(boolean leaveOpen) { + this.target.setLeaveOpen(leaveOpen); + return _this(); + } + + /** + * @param interceptorFactoryChain the interceptor factory chain. + * @return the spec. + * @see AbstractConnectionFactory#setInterceptorFactoryChain(TcpConnectionInterceptorFactoryChain) + */ + public S interceptorFactoryChain(TcpConnectionInterceptorFactoryChain interceptorFactoryChain) { + this.target.setInterceptorFactoryChain(interceptorFactoryChain); + return _this(); + } + + /** + * @param lookupHost true to reverse lookup the host. + * @return the spec. + * @see AbstractConnectionFactory#setLookupHost(boolean) + */ + public S lookupHost(boolean lookupHost) { + this.target.setLookupHost(lookupHost); + return _this(); + } + + /** + * @param nioHarvestInterval the harvest interval when using NIO. + * @return the spec. + * @see AbstractConnectionFactory#setNioHarvestInterval(int) + */ + public S nioHarvestInterval(int nioHarvestInterval) { + this.target.setNioHarvestInterval(nioHarvestInterval); + return _this(); + } + + /** + * @param readDelay the read delay. + * @return the spec. + * @see AbstractConnectionFactory#setReadDelay(long) + */ + public S readDelay(long readDelay) { + this.target.setReadDelay(readDelay); + return _this(); + } + + /** + * @param tcpSocketSupport the {@link TcpSocketSupport}. + * @return the spec. + * @see AbstractConnectionFactory#setTcpSocketSupport(TcpSocketSupport) + */ + public S tcpSocketSupport(TcpSocketSupport tcpSocketSupport) { + this.target.setTcpSocketSupport(tcpSocketSupport); + return _this(); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractUdpOutboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractUdpOutboundChannelAdapterSpec.java new file mode 100644 index 00000000000..bda5006b6ec --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractUdpOutboundChannelAdapterSpec.java @@ -0,0 +1,130 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import java.net.DatagramSocket; +import java.util.function.Function; + +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.expression.FunctionExpression; +import org.springframework.integration.ip.udp.UnicastSendingMessageHandler; +import org.springframework.messaging.Message; + +/** + * A {@link MessageHandlerSpec} for UDB {@link org.springframework.messaging.MessageHandler}s. + * + * @param the target {@link UdpUnicastOutboundChannelAdapterSpec} implementation type. + * @author Gary Russell + * @since 5.0 + * + */ +public abstract class AbstractUdpOutboundChannelAdapterSpec> + extends MessageHandlerSpec { + + protected AbstractUdpOutboundChannelAdapterSpec() { + super(); + } + + AbstractUdpOutboundChannelAdapterSpec(String destinationExpression) { + this.target = new UnicastSendingMessageHandler(destinationExpression); + } + + AbstractUdpOutboundChannelAdapterSpec(Function, ?> destinationFunction) { + this.target = new UnicastSendingMessageHandler(new FunctionExpression<>(destinationFunction)); + } + + /** + * @param timeout the timeout socket option. + * @return the spec. + * @see UnicastSendingMessageHandler#setSoTimeout(int) + */ + public S soTimeout(int timeout) { + this.target.setSoTimeout(timeout); + return _this(); + } + + /** + * @param size the send buffer size socket option. + * @return the spec. + * @see UnicastSendingMessageHandler#setSoSendBufferSize(int) + */ + public S soSendBufferSize(int size) { + this.target.setSoSendBufferSize(size); + return _this(); + } + + /** + * @param localAddress the local address. + * @return the spec. + * @see UnicastSendingMessageHandler#setLocalAddress(String) + */ + public S localAddress(String localAddress) { + this.target.setLocalAddress(localAddress); + return _this(); + } + + /** + * @param lengthCheck the length check boolean. + * @return the spec. + * @see UnicastSendingMessageHandler#setLengthCheck(boolean) + */ + public S lengthCheck(boolean lengthCheck) { + this.target.setLengthCheck(lengthCheck); + return _this(); + } + + /** + * @param size the receive buffer size socket option. + * @return the spec. + * @see UnicastSendingMessageHandler#setSoReceiveBufferSize(int) + */ + public S soReceiveBufferSize(int size) { + this.target.setSoReceiveBufferSize(size); + return _this(); + } + + /** + * @param ackCounter the ack counter. + * @return the spec. + * @see UnicastSendingMessageHandler#setAckCounter(int) + */ + public S ackCounter(int ackCounter) { + this.target.setAckCounter(ackCounter); + return _this(); + } + + /** + * @param socketFunction the socket function. + * @return the spec. + * @see UnicastSendingMessageHandler#setSocketExpression(org.springframework.expression.Expression) + */ + public S socketFunction(Function, DatagramSocket> socketFunction) { + this.target.setSocketExpression(new FunctionExpression<>(socketFunction)); + return _this(); + } + + /** + * @param socketExpression the socket expression. + * @return the spec. + * @see UnicastSendingMessageHandler#setSocketExpressionString(String) + */ + public S socketExpression(String socketExpression) { + this.target.setSocketExpressionString(socketExpression); + return _this(); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Tcp.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Tcp.java new file mode 100644 index 00000000000..e28de38537c --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Tcp.java @@ -0,0 +1,162 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; +import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; + +/** + * Factory methods for TCP. + * + * @author Gary Russell + * @since 5.0 + * + */ +public final class Tcp { + + /** + * Boolean indicating the connection factory should use NIO. + */ + public static final boolean NIO = true; + + /** + * Boolean indicating the connection factory should not use NIO + * (default). + */ + public static final boolean NET = true; + + private Tcp() { + super(); + } + + /** + * Create a server spec that uses NIO. + * @param port the port to listen on. + * @return the spec. + */ + public static TcpServerConnectionFactorySpec nioServer(int port) { + return new TcpServerConnectionFactorySpec(port, NIO); + } + + /** + * Create a server spec that does not use NIO. + * @param port the port to listen on. + * @return the spec. + */ + public static TcpServerConnectionFactorySpec netServer(int port) { + return new TcpServerConnectionFactorySpec(port, NET); + } + + /** + * Create a client spec that uses NIO. + * @param host the host to connect to. + * @param port the port to connect to. + * @return the spec. + */ + public static TcpClientConnectionFactorySpec nioClient(String host, int port) { + return new TcpClientConnectionFactorySpec(host, port, NIO); + } + + /** + * Create a client spec that does not use NIO. + * @param host the host to connect to. + * @param port the port to connect to. + * @return the spec. + */ + public static TcpClientConnectionFactorySpec netClient(String host, int port) { + return new TcpClientConnectionFactorySpec(host, port, NET); + } + + /** + * Create an inbound gateway using the supplied connection factory. + * @param connectionFactory the connection factory - must be an existing bean - it + * will not be initialized. + * @return the spec. + */ + public static TcpInboundGatewaySpec inboundGateway(AbstractConnectionFactory connectionFactory) { + return new TcpInboundGatewaySpec(connectionFactory); + } + + /** + * Create an inbound gateway using the supplied connection factory. + * @param connectionFactorySpec the connection factory spec. + * @return the spec. + */ + public static TcpInboundGatewaySpec inboundGateway(AbstractConnectionFactorySpec connectionFactorySpec) { + return new TcpInboundGatewaySpec(connectionFactorySpec); + } + + /** + * Create an inbound channel adapter using the supplied connection factory. + * @param connectionFactory the connection factory - must be an existing bean - it + * will not be initialized. + * @return the spec. + */ + public static TcpInboundChannelAdapterSpec inboundAdapter(AbstractConnectionFactory connectionFactory) { + return new TcpInboundChannelAdapterSpec(connectionFactory); + } + + /** + * Create an inbound channel adapter using the supplied connection factory. + * @param connectionFactorySpec the connection factory spec. + * @return the spec. + */ + public static TcpInboundChannelAdapterSpec inboundAdapter( + AbstractConnectionFactorySpec connectionFactorySpec) { + return new TcpInboundChannelAdapterSpec(connectionFactorySpec); + } + + /** + * Create an outbound gateway using the supplied client connection factory. + * @param connectionFactory the connection factory - must be an existing bean - it + * will not be initialized. + * @return the spec. + */ + public static TcpOutboundGatewaySpec outboundGateway(AbstractClientConnectionFactory connectionFactory) { + return new TcpOutboundGatewaySpec(connectionFactory); + } + + /** + * Create an outbound gateway using the supplied client connection factory. + * @param connectionFactory the connection factory spec. + * @return the spec. + */ + public static TcpOutboundGatewaySpec outboundGateway(TcpClientConnectionFactorySpec connectionFactory) { + return new TcpOutboundGatewaySpec(connectionFactory); + } + + /** + * Create an outbound gateway using the supplied connection factory. + * @param connectionFactory the connection factory - must be an existing bean - it + * will not be initialized. + * @return the spec. + */ + public static TcpOutboundChannelAdapterSpec outboundAdapter(AbstractConnectionFactory connectionFactory) { + return new TcpOutboundChannelAdapterSpec(connectionFactory); + } + + /** + * Create an outbound gateway using the supplied connection factory. + * @param connectionFactorySpec the connection factory. + * @return the spec. + */ + public static TcpOutboundChannelAdapterSpec outboundAdapter( + AbstractConnectionFactorySpec connectionFactorySpec) { + return new TcpOutboundChannelAdapterSpec(connectionFactorySpec); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpClientConnectionFactorySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpClientConnectionFactorySpec.java new file mode 100644 index 00000000000..7b29328698a --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpClientConnectionFactorySpec.java @@ -0,0 +1,41 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory; + +/** + * An {@link AbstractConnectionFactorySpec} for {@link AbstractClientConnectionFactory}s. + * @author Gary Russell + * + * @since 5.0 + * + */ +public class TcpClientConnectionFactorySpec + extends AbstractConnectionFactorySpec { + + TcpClientConnectionFactorySpec(String host, int port) { + this(host, port, false); + } + + TcpClientConnectionFactorySpec(String host, int port, boolean nio) { + super(nio ? new TcpNioClientConnectionFactory(host, port) : new TcpNetClientConnectionFactory(host, port)); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java new file mode 100644 index 00000000000..bb7d9283504 --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java @@ -0,0 +1,97 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import java.util.Collection; +import java.util.Collections; + +import org.springframework.integration.dsl.ComponentsRegistration; +import org.springframework.integration.dsl.MessageProducerSpec; +import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter; +import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; +import org.springframework.scheduling.TaskScheduler; + +/** + * A {@link MessageProducerSpec} for {@link TcpReceivingChannelAdapter}s. + * + * @author Gary Russell + * @since 5.0 + * + */ +public class TcpInboundChannelAdapterSpec + extends MessageProducerSpec + implements ComponentsRegistration { + + private final AbstractConnectionFactory connectionFactory; + + /** + * Construct an instance using an existing spring-managed connection factory. + * @param connectionFactoryBean the spring-managed bean. + */ + TcpInboundChannelAdapterSpec(AbstractConnectionFactory connectionFactoryBean) { + super(new TcpReceivingChannelAdapter()); + this.connectionFactory = null; + this.target.setConnectionFactory(connectionFactoryBean); + } + + /** + * Construct an instance using the provided connection factory spec. + * @param connectionFactorySpec the spec. + */ + TcpInboundChannelAdapterSpec(AbstractConnectionFactorySpec connectionFactorySpec) { + super(new TcpReceivingChannelAdapter()); + this.connectionFactory = connectionFactorySpec.get(); + this.target.setConnectionFactory(this.connectionFactory); + } + + /** + * @param isClientMode true to connect in client mode + * @return the spec. + * @see TcpReceivingChannelAdapter#setClientMode(boolean) + */ + public TcpInboundChannelAdapterSpec clientMode(boolean isClientMode) { + this.target.setClientMode(isClientMode); + return _this(); + } + + /** + * @param retryInterval the client mode retry interval to set. + * @return the spec. + * @see TcpReceivingChannelAdapter#setRetryInterval(long) + */ + public TcpInboundChannelAdapterSpec retryInterval(long retryInterval) { + this.target.setRetryInterval(retryInterval); + return _this(); + } + + /** + * @param taskScheduler the scheduler for connecting in client mode. + * @return the spec. + * @see TcpReceivingChannelAdapter#setTaskScheduler(TaskScheduler) + */ + public TcpInboundChannelAdapterSpec taskScheduler(TaskScheduler taskScheduler) { + this.target.setTaskScheduler(taskScheduler); + return _this(); + } + + @Override + public Collection getComponentsToRegister() { + return this.connectionFactory == null ? Collections.emptyList() + : Collections.singletonList(this.connectionFactory); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java new file mode 100644 index 00000000000..f632352c97d --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java @@ -0,0 +1,96 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import java.util.Collection; +import java.util.Collections; + +import org.springframework.integration.dsl.ComponentsRegistration; +import org.springframework.integration.dsl.MessagingGatewaySpec; +import org.springframework.integration.ip.tcp.TcpInboundGateway; +import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; +import org.springframework.scheduling.TaskScheduler; + +/** + * A {@link MessagingGatewaySpec} for {@link TcpInboundGateway}s. + * + * @author Gary Russell + * @since 5.0 + * + */ +public class TcpInboundGatewaySpec extends MessagingGatewaySpec + implements ComponentsRegistration { + + private final AbstractConnectionFactory connectionFactory; + + /** + * Construct an instance using an existing spring-managed connection factory. + * @param connectionFactoryBean the spring-managed bean. + */ + TcpInboundGatewaySpec(AbstractConnectionFactory connectionFactoryBean) { + super(new TcpInboundGateway()); + this.connectionFactory = null; + this.target.setConnectionFactory(connectionFactoryBean); + } + + /** + * Construct an instance using a connection factory spec. + * @param connectionFactorySpec the spec. + */ + TcpInboundGatewaySpec(AbstractConnectionFactorySpec connectionFactorySpec) { + super(new TcpInboundGateway()); + this.connectionFactory = connectionFactorySpec.get(); + this.target.setConnectionFactory(this.connectionFactory); + } + + /** + * @param isClientMode true to connect in client mode + * @return the spec. + * @see TcpInboundGateway#setClientMode(boolean) + */ + public TcpInboundGatewaySpec clientMode(boolean isClientMode) { + this.target.setClientMode(isClientMode); + return _this(); + } + + /** + * @param retryInterval the client mode retry interval to set. + * @return the spec. + * @see TcpInboundGateway#setRetryInterval(long) + */ + public TcpInboundGatewaySpec retryInterval(long retryInterval) { + this.target.setRetryInterval(retryInterval); + return _this(); + } + + /** + * @param taskScheduler the scheduler for connecting in client mode. + * @return the spec. + * @see TcpInboundGateway#setTaskScheduler(TaskScheduler) + */ + public TcpInboundGatewaySpec taskScheduler(TaskScheduler taskScheduler) { + this.target.setTaskScheduler(taskScheduler); + return _this(); + } + + @Override + public Collection getComponentsToRegister() { + return this.connectionFactory == null ? Collections.emptyList() + : Collections.singletonList(this.connectionFactory); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java new file mode 100644 index 00000000000..17df57766a7 --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java @@ -0,0 +1,97 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import java.util.Collection; +import java.util.Collections; + +import org.springframework.integration.dsl.ComponentsRegistration; +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.ip.tcp.TcpSendingMessageHandler; +import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; +import org.springframework.scheduling.TaskScheduler; + +/** + * A {@link MessageHandlerSpec} for {@link TcpSendingMessageHandler}s. + * + * @author Gary Russell + * @since 5.0 + * + */ +public class TcpOutboundChannelAdapterSpec + extends MessageHandlerSpec + implements ComponentsRegistration { + + private final AbstractConnectionFactory connectionFactory; + + /** + * Construct an instance using an existing spring-managed connection factory. + * @param connectionFactoryBean the spring-managed bean. + */ + TcpOutboundChannelAdapterSpec(AbstractConnectionFactory connectionFactoryBean) { + this.target = new TcpSendingMessageHandler(); + this.connectionFactory = null; + this.target.setConnectionFactory(connectionFactoryBean); + } + + /** + * Construct an instance using the provided connection factory spec. + * @param connectionFactorySpec the spec. + */ + TcpOutboundChannelAdapterSpec(AbstractConnectionFactorySpec connectionFactorySpec) { + this.target = new TcpSendingMessageHandler(); + this.connectionFactory = connectionFactorySpec.get(); + this.target.setConnectionFactory(this.connectionFactory); + } + + /** + * @param isClientMode true to connect in client mode + * @return the spec. + * @see TcpSendingMessageHandler#setClientMode(boolean) + */ + public TcpOutboundChannelAdapterSpec clientMode(boolean isClientMode) { + this.target.setClientMode(isClientMode); + return _this(); + } + + /** + * @param retryInterval the client mode retry interval to set. + * @return the spec. + * @see TcpSendingMessageHandler#setRetryInterval(long) + */ + public TcpOutboundChannelAdapterSpec retryInterval(long retryInterval) { + this.target.setRetryInterval(retryInterval); + return _this(); + } + + /** + * @param taskScheduler the scheduler for connecting in client mode. + * @return the spec. + * @see TcpSendingMessageHandler#setTaskScheduler(TaskScheduler) + */ + public TcpOutboundChannelAdapterSpec taskScheduler(TaskScheduler taskScheduler) { + this.target.setTaskScheduler(taskScheduler); + return _this(); + } + + @Override + public Collection getComponentsToRegister() { + return this.connectionFactory == null ? Collections.emptyList() + : Collections.singletonList(this.connectionFactory); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java new file mode 100644 index 00000000000..bbb5d6c2e91 --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java @@ -0,0 +1,96 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import java.util.Collection; +import java.util.Collections; +import java.util.function.Function; + +import org.springframework.integration.dsl.ComponentsRegistration; +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.expression.FunctionExpression; +import org.springframework.integration.ip.tcp.TcpOutboundGateway; +import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; +import org.springframework.messaging.Message; + +/** + * A {@link MessageHandlerSpec} for {@link TcpOutboundGateway}s. + * + * @author Gary Russell + * @since 5.0 + * + */ +public class TcpOutboundGatewaySpec extends MessageHandlerSpec + implements ComponentsRegistration { + + private final AbstractClientConnectionFactory connectionFactory; + + /** + * Construct an instance using an existing spring-managed connection factory. + * @param connectionFactoryBean the spring-managed bean. + */ + public TcpOutboundGatewaySpec(AbstractClientConnectionFactory connectionFactoryBean) { + this.target = new TcpOutboundGateway(); + this.connectionFactory = null; + this.target.setConnectionFactory(connectionFactoryBean); + } + + /** + * Construct an instance using the supplied connection factory spec. + * @param connectionFactorySpec the spec. + */ + public TcpOutboundGatewaySpec(TcpClientConnectionFactorySpec connectionFactorySpec) { + this.target = new TcpOutboundGateway(); + this.connectionFactory = connectionFactorySpec.get(); + this.target.setConnectionFactory(this.connectionFactory); + } + + /** + * @param remoteTimeout the remote timeout to set. + * @return the spec. + * @see TcpOutboundGateway#setRemoteTimeout(long) + */ + public TcpOutboundGatewaySpec remoteTimeout(long remoteTimeout) { + this.target.setRemoteTimeout(remoteTimeout); + return _this(); + } + + /** + * Configure a {@link Function} that will be invoked at runtime to determine the destination to + * which a message will be sent. Typically used with a Java 8 Lambda expression: + *
+	 * {@code
+	 * .remoteTimeout(m -> m.getHeaders().get('rto'))
+	 * }
+	 * 
+ * @param remoteTimeoutFunction the function. + * @param

the message payload type. + * @return the spec. + * @see TcpOutboundGateway#setRemoteTimeoutExpression(org.springframework.expression.Expression) + */ + public

TcpOutboundGatewaySpec remoteTimeout(Function, ?> remoteTimeoutFunction) { + this.target.setRemoteTimeoutExpression(new FunctionExpression<>(remoteTimeoutFunction)); + return _this(); + } + + @Override + public Collection getComponentsToRegister() { + return this.connectionFactory == null ? Collections.emptyList() + : Collections.singletonList(this.connectionFactory); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpServerConnectionFactorySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpServerConnectionFactorySpec.java new file mode 100644 index 00000000000..36a030331cb --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpServerConnectionFactorySpec.java @@ -0,0 +1,61 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory; + +/** + * An {@link AbstractConnectionFactorySpec} for {@link AbstractServerConnectionFactory}s. + * @author Gary Russell + * + * @since 5.0 + * + */ +public class TcpServerConnectionFactorySpec + extends AbstractConnectionFactorySpec { + + TcpServerConnectionFactorySpec(int port) { + this(port, false); + } + + TcpServerConnectionFactorySpec(int port, boolean nio) { + super(nio ? new TcpNioServerConnectionFactory(port) : new TcpNetServerConnectionFactory(port)); + } + + /** + * @param localAddress the local address. + * @return the spec. + * @see AbstractServerConnectionFactory#setLocalAddress(String) + */ + public TcpServerConnectionFactorySpec localAddress(String localAddress) { + this.target.setLocalAddress(localAddress); + return _this(); + } + + /** + * @param backlog the backlog. + * @return the spec. + * @see AbstractServerConnectionFactory#setBacklog(int) + */ + public TcpServerConnectionFactorySpec backlog(int backlog) { + this.target.setBacklog(backlog); + return _this(); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java new file mode 100644 index 00000000000..8b2263537f8 --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Udp.java @@ -0,0 +1,97 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import java.util.function.Function; + +import org.springframework.messaging.Message; + +/** + * Factory methods for UDP. + * + * @author Gary Russell + * @since 5.0 + * + */ +public final class Udp { + + private Udp() { + super(); + } + + /** + * Create an inbound unicast channel adapter using the supplied port. + * @param port the port. + * @return the spec. + */ + public static UdpInboundChannelAdapterSpec inboundAdapter(int port) { + return new UdpInboundChannelAdapterSpec(port); + } + + /** + * Create an inbound multicast channel adapter using the supplied port and + * group. + * @param port the port. + * @param multicastGroup the group. + * @return the spec. + */ + public static UdpInboundChannelAdapterSpec inboundMulticastAdapter(int port, String multicastGroup) { + return new UdpInboundChannelAdapterSpec(port, multicastGroup); + } + + /** + * Create an inbound unicast channel adapter using the supplied destination + * expression. + * @param destinationExpression destination expression. + * @return the spec. + */ + public static UdpUnicastOutboundChannelAdapterSpec outboundAdapter(String destinationExpression) { + return new UdpUnicastOutboundChannelAdapterSpec(destinationExpression); + } + + /** + * Create an inbound unicast channel adapter using the supplied destination + * expression. + * @param destinationFunction function that will provide the destination based on the message. + * @return the spec. + */ + public static UdpUnicastOutboundChannelAdapterSpec outboundAdapter(Function, ?> destinationFunction) { + return new UdpUnicastOutboundChannelAdapterSpec(destinationFunction); + } + + /** + * Create an inbound multicast channel adapter using the supplied destination + * expression. + * @param destinationExpression destination expression. + * @return the spec. + */ + public static UdpMulticastOutboundChannelAdapterSpec outboundMulticastAdapter(String destinationExpression) { + return new UdpMulticastOutboundChannelAdapterSpec(destinationExpression); + } + + /** + * Create an inbound multicast channel adapter using the supplied destination + * expression. + * @param destinationFunction function that will provide the destination based on the message. + * @return the spec. + */ + public static UdpMulticastOutboundChannelAdapterSpec outboundMulticastAdapter(Function, ?> + destinationFunction) { + return new UdpMulticastOutboundChannelAdapterSpec(destinationFunction); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpInboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpInboundChannelAdapterSpec.java new file mode 100644 index 00000000000..537a2d0ebfd --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpInboundChannelAdapterSpec.java @@ -0,0 +1,155 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import java.net.DatagramSocket; +import java.util.concurrent.Executor; + +import org.springframework.integration.dsl.MessageProducerSpec; +import org.springframework.integration.ip.udp.MulticastReceivingChannelAdapter; +import org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter; +import org.springframework.scheduling.TaskScheduler; + +/** + * A {@link MessageProducerSpec} for {@link UnicastReceivingChannelAdapter}s. + * + * @author Gary Russell + * @since 5.0 + * + */ +public class UdpInboundChannelAdapterSpec + extends MessageProducerSpec { + + UdpInboundChannelAdapterSpec(int port) { + super(new UnicastReceivingChannelAdapter(port)); + } + + UdpInboundChannelAdapterSpec(int port, String multicastGroup) { + super(new MulticastReceivingChannelAdapter(multicastGroup, port)); + } + + /** + * @param soTimeout set the timeout socket option. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setSoTimeout(int) + */ + public UdpInboundChannelAdapterSpec soTimeout(int soTimeout) { + this.target.setSoTimeout(soTimeout); + return _this(); + } + + /** + * @param taskScheduler set the task scheduler. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setTaskScheduler(TaskScheduler) + */ + public UdpInboundChannelAdapterSpec taskScheduler(TaskScheduler taskScheduler) { + this.target.setTaskScheduler(taskScheduler); + return _this(); + } + + /** + * @param soReceiveBufferSize set the receive buffer size socket option. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setSoReceiveBufferSize(int) + */ + public UdpInboundChannelAdapterSpec soReceiveBufferSize(int soReceiveBufferSize) { + this.target.setSoReceiveBufferSize(soReceiveBufferSize); + return _this(); + } + + /** + * @param receiveBufferSize set the receive buffer size. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setReceiveBufferSize(int) + */ + public UdpInboundChannelAdapterSpec receiveBufferSize(int receiveBufferSize) { + this.target.setReceiveBufferSize(receiveBufferSize); + return _this(); + } + + /** + * @param lengthCheck set the length check boolean. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setLengthCheck(boolean) + */ + public UdpInboundChannelAdapterSpec lengthCheck(boolean lengthCheck) { + this.target.setLengthCheck(lengthCheck); + return _this(); + } + + /** + * @param localAddress set the local address. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setLocalAddress(String) + */ + public UdpInboundChannelAdapterSpec localAddress(String localAddress) { + this.target.setLocalAddress(localAddress); + return _this(); + } + + /** + * @param poolSize set the pool size. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setPoolSize(int) + */ + public UdpInboundChannelAdapterSpec poolSize(int poolSize) { + this.target.setPoolSize(poolSize); + return _this(); + } + + /** + * @param taskExecutor set the task executor. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setTaskExecutor(Executor) + */ + public UdpInboundChannelAdapterSpec taskExecutor(Executor taskExecutor) { + this.target.setTaskExecutor(taskExecutor); + return _this(); + } + + /** + * @param socket set the socket. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setSocket(DatagramSocket) + */ + public UdpInboundChannelAdapterSpec socket(DatagramSocket socket) { + this.target.setSocket(socket); + return _this(); + } + + /** + * @param soSendBufferSize set the send buffer size socket option. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setSoSendBufferSize(int) + */ + public UdpInboundChannelAdapterSpec soSendBufferSize(int soSendBufferSize) { + this.target.setSoSendBufferSize(soSendBufferSize); + return _this(); + } + + /** + * @param lookupHost set true to reverse lookup the host. + * @return the spec. + * @see UnicastReceivingChannelAdapter#setLookupHost(boolean) + */ + public UdpInboundChannelAdapterSpec lookupHost(boolean lookupHost) { + this.target.setLookupHost(lookupHost); + return _this(); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpMulticastOutboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpMulticastOutboundChannelAdapterSpec.java new file mode 100644 index 00000000000..c8ad0b2f022 --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpMulticastOutboundChannelAdapterSpec.java @@ -0,0 +1,54 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import java.util.function.Function; + +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.expression.FunctionExpression; +import org.springframework.integration.ip.udp.MulticastSendingMessageHandler; +import org.springframework.messaging.Message; + +/** + * A {@link MessageHandlerSpec} for {@link MulticastSendingMessageHandler}s. + * + * @author Gary Russell + * @since 5.0 + * + */ +public class UdpMulticastOutboundChannelAdapterSpec + extends AbstractUdpOutboundChannelAdapterSpec { + + UdpMulticastOutboundChannelAdapterSpec(String destinationExpression) { + this.target = new MulticastSendingMessageHandler(destinationExpression); + } + + UdpMulticastOutboundChannelAdapterSpec(Function, ?> destinationFunction) { + this.target = new MulticastSendingMessageHandler(new FunctionExpression<>(destinationFunction)); + } + + /** + * @param timeToLive the timeToLive. + * @return the spec. + * @see MulticastSendingMessageHandler#setTimeToLive(int) + */ + public UdpMulticastOutboundChannelAdapterSpec timeToLive(int timeToLive) { + ((MulticastSendingMessageHandler) this.target).setTimeToLive(timeToLive); + return _this(); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpUnicastOutboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpUnicastOutboundChannelAdapterSpec.java new file mode 100644 index 00000000000..5988d7dad8a --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpUnicastOutboundChannelAdapterSpec.java @@ -0,0 +1,43 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import java.util.function.Function; + +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.messaging.Message; + +/** + * A {@link MessageHandlerSpec} for + * {@link org.springframework.integration.ip.udp.UnicastSendingMessageHandler}s. + * + * @author Gary Russell + * @since 5.0 + * + */ +public class UdpUnicastOutboundChannelAdapterSpec + extends AbstractUdpOutboundChannelAdapterSpec { + + UdpUnicastOutboundChannelAdapterSpec(Function, ?> destinationFunction) { + super(destinationFunction); + } + + UdpUnicastOutboundChannelAdapterSpec(String destinationExpression) { + super(destinationExpression); + } + +} diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/package-info.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/package-info.java new file mode 100644 index 00000000000..f93424236d0 --- /dev/null +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/package-info.java @@ -0,0 +1,4 @@ +/** + * Provides TCP/UDP Component support for the Java DSL. + */ +package org.springframework.integration.ip.dsl; diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandler.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandler.java index 595d74b1ea7..5858fd310d7 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandler.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandler.java @@ -267,6 +267,7 @@ public void stop() { if (this.scheduledFuture != null) { this.scheduledFuture.cancel(true); } + this.clientModeConnectionManager = null; if (this.clientConnectionFactory != null) { this.clientConnectionFactory.stop(); } @@ -282,24 +283,6 @@ public boolean isRunning() { return this.active; } - public void stop(Runnable callback) { - synchronized (this.lifecycleMonitor) { - if (this.active) { - this.active = false; - if (this.scheduledFuture != null) { - this.scheduledFuture.cancel(true); - } - this.clientModeConnectionManager = null; - if (this.clientConnectionFactory != null) { - this.clientConnectionFactory.stop(callback); - } - if (this.serverConnectionFactory != null) { - this.serverConnectionFactory.stop(callback); - } - } - } - } - /** * @return the clientConnectionFactory */ @@ -330,8 +313,7 @@ public boolean isClientMode() { } /** - * @param isClientMode - * the isClientMode to set + * @param isClientMode the isClientMode to set */ public void setClientMode(boolean isClientMode) { this.isClientMode = isClientMode; @@ -350,8 +332,7 @@ public long getRetryInterval() { } /** - * @param retryInterval - * the retryInterval to set + * @param retryInterval the retryInterval to set */ public void setRetryInterval(long retryInterval) { this.retryInterval = retryInterval; diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java index 3d5c4d4a0e7..acb3156a0cd 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java @@ -44,7 +44,6 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; -import org.springframework.context.SmartLifecycle; import org.springframework.core.serializer.Deserializer; import org.springframework.core.serializer.Serializer; import org.springframework.integration.context.IntegrationObjectSupport; @@ -60,7 +59,7 @@ * */ public abstract class AbstractConnectionFactory extends IntegrationObjectSupport - implements ConnectionFactory, SmartLifecycle, ApplicationEventPublisherAware { + implements ConnectionFactory, ApplicationEventPublisherAware { protected static final int DEFAULT_REPLY_TIMEOUT = 10000; @@ -275,6 +274,16 @@ public void setSoTrafficClass(int soTrafficClass) { this.soTrafficClass = soTrafficClass; } + /** + * Set the host; requires the factory to be stopped. + * @param host the host. + * @since 5.0 + */ + public void setHost(String host) { + Assert.state(!isRunning(), "Cannot change the host while running"); + this.host = host; + } + /** * @return the host */ @@ -282,6 +291,16 @@ public String getHost() { return this.host; } + /** + * Set the port; requires the factory to be stopped. + * @param port the port. + * @since 5.0 + */ + public void setPort(int port) { + Assert.state(!isRunning(), "Cannot change the host while running"); + this.port = port; + } + /** * @return the port */ @@ -395,6 +414,15 @@ public void setSingleUse(boolean singleUse) { this.singleUse = singleUse; } + /** + * If true, sockets created by this factory will be reused. + * Inverse of {@link #setSingleUse(boolean)}. + * @param leaveOpen The keepOpen to set. + * @since 5.0 + */ + public void setLeaveOpen(boolean leaveOpen) { + this.singleUse = !leaveOpen; + } public void setInterceptorFactoryChain(TcpConnectionInterceptorFactoryChain interceptorFactoryChain) { this.interceptorFactoryChain = interceptorFactoryChain; @@ -752,26 +780,6 @@ protected void doAccept(final Selector selector, ServerSocketChannel server, lon throw new UnsupportedOperationException("Nio server factory must override this method"); } - @Override - public int getPhase() { - return 0; - } - - /** - * We are controlled by the startup options of - * the bound endpoint. - */ - @Override - public boolean isAutoStartup() { - return false; - } - - @Override - public void stop(Runnable callback) { - stop(); - callback.run(); - } - protected void addConnection(TcpConnectionSupport connection) { synchronized (this.connections) { if (!this.active) { diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java index ba87ca838ab..949cf6d3e6a 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java @@ -388,21 +388,6 @@ public synchronized void stop() { this.pool.removeAllIdleItems(); } - @Override - public int getPhase() { - return this.targetConnectionFactory.getPhase(); - } - - @Override - public boolean isAutoStartup() { - return this.targetConnectionFactory.isAutoStartup(); - } - - @Override - public void stop(Runnable callback) { - this.targetConnectionFactory.stop(callback); - } - private final class CachedConnection extends TcpConnectionInterceptorSupport { private final AtomicBoolean released = new AtomicBoolean(); diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/ConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/ConnectionFactory.java index ca74042ef49..b506dc168e2 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/ConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/ConnectionFactory.java @@ -16,7 +16,7 @@ package org.springframework.integration.ip.tcp.connection; -import org.springframework.context.SmartLifecycle; +import org.springframework.context.Lifecycle; @@ -27,7 +27,7 @@ * @since 2.0 * */ -public interface ConnectionFactory extends SmartLifecycle { +public interface ConnectionFactory extends Lifecycle { TcpConnection getConnection() throws Exception; diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/MulticastSendingMessageHandler.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/MulticastSendingMessageHandler.java index b07633d4c58..36c92f137ac 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/MulticastSendingMessageHandler.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/MulticastSendingMessageHandler.java @@ -21,7 +21,10 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.MulticastSocket; +import java.net.SocketAddress; +import java.net.URI; +import org.springframework.expression.Expression; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; @@ -99,6 +102,28 @@ public MulticastSendingMessageHandler(String address, int port, super(address, port, lengthCheck, acknowledge, ackHost, ackPort, ackTimeout); } + /** + * Construct MulticastSendingMessageHandler based on the destination SpEL expression to + * determine the target destination at runtime against requestMessage. + * @param destinationExpression the SpEL expression to evaluate the target destination + * at runtime. Must evaluate to {@link String}, {@link URI} or {@link SocketAddress}. + * @since 5.0 + */ + public MulticastSendingMessageHandler(Expression destinationExpression) { + super(destinationExpression); + } + + /** + * Construct MulticastSendingMessageHandler based on the destination SpEL expression to + * determine the target destination at runtime against requestMessage. + * @param destinationExpression the SpEL expression to evaluate the target destination + * at runtime. Must evaluate to {@link String}, {@link URI} or {@link SocketAddress}. + * @since 5.0 + */ + public MulticastSendingMessageHandler(String destinationExpression) { + super(destinationExpression); + } + @Override protected DatagramSocket getSocket() throws IOException { if (this.getTheSocket() == null) { diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastReceivingChannelAdapter.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastReceivingChannelAdapter.java index 3af9ba2bf89..9d777303799 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastReceivingChannelAdapter.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastReceivingChannelAdapter.java @@ -74,6 +74,15 @@ public UnicastReceivingChannelAdapter(int port, boolean lengthCheck) { this.mapper.setLengthCheck(lengthCheck); } + /** + * @param lengthCheck if true, the incoming packet is expected to have a four + * byte binary length header. + * @since 5.0 + */ + public void setLengthCheck(boolean lengthCheck) { + this.mapper.setLengthCheck(lengthCheck); + } + @Override public boolean isLongLived() { return true; diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastSendingMessageHandler.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastSendingMessageHandler.java index 9d62a3df3af..bac767bb98a 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastSendingMessageHandler.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastSendingMessageHandler.java @@ -114,10 +114,10 @@ public UnicastSendingMessageHandler(String host, int port) { } /** - * Construct UnicastSendingMessageHandler based on the destination SpEL expression to determine - * the target destination at runtime against requestMessage. - * @param destinationExpression the SpEL expression to evaluate the target destination at runtime. - * Must evaluate to {@link String}, {@link URI} or {@link SocketAddress}. + * Construct UnicastSendingMessageHandler based on the destination SpEL expression to + * determine the target destination at runtime against requestMessage. + * @param destinationExpression the SpEL expression to evaluate the target destination + * at runtime. Must evaluate to {@link String}, {@link URI} or {@link SocketAddress}. * @since 4.3 */ public UnicastSendingMessageHandler(String destinationExpression) { @@ -129,10 +129,10 @@ public UnicastSendingMessageHandler(String destinationExpression) { } /** - * Construct UnicastSendingMessageHandler based on the destination SpEL expression to determine - * the target destination at runtime against requestMessage. - * @param destinationExpression the SpEL expression to evaluate the target destination at runtime. - * Must evaluate to {@link String}, {@link URI} or {@link SocketAddress}. + * Construct UnicastSendingMessageHandler based on the destination SpEL expression to + * determine the target destination at runtime against requestMessage. + * @param destinationExpression the SpEL expression to evaluate the target destination + * at runtime. Must evaluate to {@link String}, {@link URI} or {@link SocketAddress}. * @since 4.3 */ public UnicastSendingMessageHandler(Expression destinationExpression) { @@ -217,6 +217,15 @@ protected final void setReliabilityAttributes(boolean lengthCheck, } } + /** + * @param lengthCheck if true, a four byte binary length header is added to the + * packet, allowing the receiver to check for data truncation. + * @since 5.0 + */ + public void setLengthCheck(boolean lengthCheck) { + this.mapper.setLengthCheck(lengthCheck); + } + @Override public void doStart() { if (this.acknowledge) { diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/util/TestingUtilities.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/util/TestingUtilities.java index a4a24a816fa..d2c8e694191 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/util/TestingUtilities.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/util/TestingUtilities.java @@ -18,6 +18,7 @@ import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; +import org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter; /** * Convenience class providing methods for testing IP components. @@ -65,6 +66,37 @@ public static void waitListening(AbstractServerConnectionFactory serverConnectio } } + /** + * Wait for a server connection factory to actually start listening before + * starting a test. Waits for up to 10 seconds by default. + * @param adapter The server connection factory. + * @param delay How long to wait in milliseconds; default 10000 (10 seconds) if null. + * @throws IllegalStateException If the server does not start listening in time. + */ + public static void waitListening(UnicastReceivingChannelAdapter adapter, Long delay) + throws IllegalStateException { + if (delay == null) { + delay = 100L; + } + else { + delay = delay / 100; + } + int n = 0; + while (!adapter.isListening()) { + try { + Thread.sleep(100); + } + catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e1); + } + + if (n++ > delay) { + throw new IllegalStateException("Server didn't start listening."); + } + } + } + /** * Wait for a server connection factory to stop listening. * Waits for up to 10 seconds by default. diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFacforyTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFacforyTests.java new file mode 100644 index 00000000000..b343b0988da --- /dev/null +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFacforyTests.java @@ -0,0 +1,69 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; +import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; +import org.springframework.integration.ip.util.TestingUtilities; +import org.springframework.integration.transformer.ObjectToStringTransformer; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; + +/** + * @author Gary Russell + * @since 5.0 + * + */ +public class ConnectionFacforyTests { + + @Test + public void test() throws Exception { + ApplicationEventPublisher publisher = e -> { }; + AbstractServerConnectionFactory server = Tcp.netServer(0).backlog(2).soTimeout(5000).get(); + final AtomicReference> received = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + server.registerListener(m -> { + received.set(new ObjectToStringTransformer().transform(m)); + latch.countDown(); + return false; + }); + server.setApplicationEventPublisher(publisher); + server.afterPropertiesSet(); + server.start(); + TestingUtilities.waitListening(server, null); + AbstractClientConnectionFactory client = Tcp.netClient("localhost", server.getPort()).get(); + client.setApplicationEventPublisher(publisher); + client.afterPropertiesSet(); + client.start(); + client.getConnection().send(new GenericMessage<>("foo")); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals("foo", received.get().getPayload()); + client.stop(); + server.stop(); + } + +} diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java new file mode 100644 index 00000000000..2b39c79bd82 --- /dev/null +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java @@ -0,0 +1,177 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.ip.dsl; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.dsl.Transformers; +import org.springframework.integration.dsl.context.IntegrationFlowContext; +import org.springframework.integration.dsl.context.IntegrationFlowRegistration; +import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter; +import org.springframework.integration.ip.tcp.TcpSendingMessageHandler; +import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; +import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; +import org.springframework.integration.ip.tcp.serializer.TcpCodecs; +import org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter; +import org.springframework.integration.ip.util.TestingUtilities; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * @author Gary Russell + * @since 5.0 + * + */ +@RunWith(SpringRunner.class) +@DirtiesContext +public class IpIntegrationTests { + + @Autowired + private AbstractServerConnectionFactory server1; + + @Autowired + private IntegrationFlowContext flowContext; + + @Autowired + @Qualifier("outUdpAdapter.input") + private MessageChannel udpOut; + + @Autowired + private UnicastReceivingChannelAdapter udpInbound; + + @Autowired + private QueueChannel udpIn; + + @Test + public void testTcpAdapters() throws Exception { + ApplicationEventPublisher publisher = e -> { }; + AbstractServerConnectionFactory server = Tcp.netServer(0).backlog(2).soTimeout(5000).id("server").get(); + assertEquals("server", server.getComponentName()); + server.setApplicationEventPublisher(publisher); + server.afterPropertiesSet(); + TcpReceivingChannelAdapter inbound = Tcp.inboundAdapter(server).get(); + QueueChannel received = new QueueChannel(); + inbound.setOutputChannel(received); + inbound.afterPropertiesSet(); + inbound.start(); + TestingUtilities.waitListening(server, null); + AbstractClientConnectionFactory client = Tcp.netClient("localhost", server.getPort()).id("client").get(); + assertEquals("client", client.getComponentName()); + client.setApplicationEventPublisher(publisher); + client.afterPropertiesSet(); + TcpSendingMessageHandler handler = Tcp.outboundAdapter(client).get(); + handler.start(); + handler.handleMessage(new GenericMessage<>("foo")); + Message receivedMessage = received.receive(10000); + assertNotNull(receivedMessage); + assertEquals("foo", Transformers.objectToString().transform(receivedMessage).getPayload()); + client.stop(); + server.stop(); + } + + @Test + public void testTcpGateways() { + TestingUtilities.waitListening(this.server1, null); + IntegrationFlow flow = f -> f + .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort()) + .serializer(TcpCodecs.crlf()) + .deserializer(TcpCodecs.lengthHeader1()) + .id("client1")) + .remoteTimeout(m -> 5000)) + .transform(Transformers.objectToString()); + IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register(); + assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO")); + } + + @Test + public void testUdp() { + TestingUtilities.waitListening(this.udpInbound, null); + Message outMessage = MessageBuilder.withPayload("foo") + .setHeader("udp_dest", "udp://localhost:" + this.udpInbound.getPort()) + .build(); + this.udpOut.send(outMessage); + Message received = this.udpIn.receive(10000); + assertNotNull(received); + assertEquals("foo", Transformers.objectToString().transform(received).getPayload()); + } + + @Test + public void testUdpInheritance() { + Udp.outboundMulticastAdapter("headers['udp_dest']") + .lengthCheck(true) + .timeToLive(10); + } + + @Configuration + @EnableIntegration + public static class Config { + + @Bean + public AbstractServerConnectionFactory server1() { + return Tcp.netServer(0) + .serializer(TcpCodecs.lengthHeader1()) + .deserializer(TcpCodecs.crlf()) + .get(); + } + + @Bean + public IntegrationFlow inTcpGateway() { + return IntegrationFlows.from(Tcp.inboundGateway(server1())) + .transform(Transformers.objectToString()) + .transform(String::toUpperCase) + .get(); + } + + @Bean + public IntegrationFlow inUdpAdapter() { + return IntegrationFlows.from(Udp.inboundAdapter(0)) + .channel(udpIn()) + .get(); + } + + @Bean + public QueueChannel udpIn() { + return new QueueChannel(); + } + + @Bean + public IntegrationFlow outUdpAdapter() { + return f -> f.handle(Udp.outboundAdapter("headers['udp_dest']")); + } + + } + +} diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AutoStartTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AutoStartTests.java index 9c549569ed6..85ade45c710 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AutoStartTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AutoStartTests.java @@ -16,11 +16,11 @@ package org.springframework.integration.ip.tcp; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import org.junit.Test; import org.junit.runner.RunWith; + import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; @@ -45,7 +45,6 @@ public class AutoStartTests { @Test public void testNetIn() throws Exception { - assertFalse(cfS1.isAutoStartup()); DirectFieldAccessor dfa = new DirectFieldAccessor(cfS1); assertNull(dfa.getPropertyValue("serverSocket")); startAndStop(); diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java index 68304f142ff..450b4902997 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java @@ -25,8 +25,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.log4j.Level; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.springframework.integration.jdbc.lock.DefaultLockRepository; @@ -35,6 +37,7 @@ import org.springframework.integration.leader.DefaultCandidate; import org.springframework.integration.leader.event.LeaderEventPublisher; import org.springframework.integration.support.leader.LockRegistryLeaderInitiator; +import org.springframework.integration.test.rule.Log4jLevelAdjuster; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; @@ -45,6 +48,9 @@ */ public class JdbcLockRegistryLeaderInitiatorTests { + @Rule + public Log4jLevelAdjuster logAdjuster = new Log4jLevelAdjuster(Level.DEBUG, "org.springframework.integration"); + public static EmbeddedDatabase dataSource; @BeforeClass @@ -162,9 +168,9 @@ public void publishOnRevoked(Object source, Context context, String role) { private static class CountingPublisher implements LeaderEventPublisher { - private CountDownLatch granted; + private final CountDownLatch granted; - private CountDownLatch revoked; + private final CountDownLatch revoked; private volatile LockRegistryLeaderInitiator initiator; diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsOutboundGatewaySpec.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsOutboundGatewaySpec.java index 8c25f36e417..a0810b608e2 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsOutboundGatewaySpec.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsOutboundGatewaySpec.java @@ -108,7 +108,7 @@ public JmsOutboundGatewaySpec requestDestinationExpression(String destination) { } /** - * Configure a {@link Function} that will be invoked at run time to determine the destination to + * Configure a {@link Function} that will be invoked at runtime to determine the destination to * which a message will be sent. Typically used with a Java 8 Lambda expression: *
 	 * {@code
diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/config/MongoDbInboundChannelAdapterIntegrationTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/config/MongoDbInboundChannelAdapterIntegrationTests.java
index 70a37d5351d..9d2b67abc0d 100644
--- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/config/MongoDbInboundChannelAdapterIntegrationTests.java
+++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/config/MongoDbInboundChannelAdapterIntegrationTests.java
@@ -35,12 +35,12 @@
 import org.springframework.data.mongodb.core.query.Criteria;
 import org.springframework.data.mongodb.core.query.Query;
 import org.springframework.integration.aop.AbstractMessageSourceAdvice;
+import org.springframework.integration.channel.QueueChannel;
 import org.springframework.integration.core.MessageSource;
 import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
 import org.springframework.integration.mongodb.rules.MongoDbAvailable;
 import org.springframework.integration.mongodb.rules.MongoDbAvailableTests;
 import org.springframework.messaging.Message;
-import org.springframework.messaging.PollableChannel;
 import org.springframework.test.annotation.DirtiesContext;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@@ -62,7 +62,7 @@ public class MongoDbInboundChannelAdapterIntegrationTests extends MongoDbAvailab
 	private MongoTemplate mongoTemplate;
 
 	@Autowired
-	private PollableChannel replyChannel;
+	private QueueChannel replyChannel;
 
 	@Autowired
 	@Qualifier("mongoInboundAdapter")
@@ -106,6 +106,7 @@ public void testWithDefaultMongoFactory() throws Exception {
 		assertNotNull(this.replyChannel.receive(10000));
 
 		this.mongoInboundAdapter.stop();
+		this.replyChannel.purge(null);
 	}
 
 	@Test
@@ -121,6 +122,7 @@ public void testWithNamedMongoFactory() throws Exception {
 		assertEquals("Bob", message.getPayload().get(0).get("name"));
 
 		this.mongoInboundAdapterNamedFactory.stop();
+		this.replyChannel.purge(null);
 	}
 
 	@Test
@@ -136,6 +138,7 @@ public void testWithMongoTemplate() throws Exception {
 		assertEquals("Bob", message.getPayload().getName());
 
 		this.mongoInboundAdapterWithTemplate.stop();
+		this.replyChannel.purge(null);
 	}
 
 	@Test
@@ -151,6 +154,7 @@ public void testWithNamedCollection() throws Exception {
 		assertEquals("Bob", message.getPayload().get(0).getName());
 
 		this.mongoInboundAdapterWithNamedCollection.stop();
+		this.replyChannel.purge(null);
 	}
 
 	@Test
@@ -166,6 +170,7 @@ public void testWithNamedCollectionExpression() throws Exception {
 		assertEquals("Bob", message.getPayload().get(0).getName());
 
 		this.mongoInboundAdapterWithNamedCollectionExpression.stop();
+		this.replyChannel.purge(null);
 	}
 
 	@Test
@@ -181,6 +186,7 @@ public void testWithOnSuccessDisposition() throws Exception {
 		this.inboundAdapterWithOnSuccessDisposition.stop();
 
 		assertNull(this.mongoTemplate.findOne(new Query(Criteria.where("name").is("Bob")), Person.class, "data"));
+		this.replyChannel.purge(null);
 	}
 
 	@Test
@@ -197,6 +203,7 @@ public void testWithMongoConverter() throws Exception {
 		assertNotNull(replyChannel.receive(10000));
 
 		this.mongoInboundAdapterWithConverter.stop();
+		this.replyChannel.purge(null);
 	}
 
 	@Test(expected = BeanDefinitionParsingException.class)