diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index c140c0bdeb..4fe7c9c57c 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -25,8 +25,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map.Entry; -import java.util.function.BiConsumer; import javax.net.SocketFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; @@ -38,7 +36,9 @@ import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.*; +import java.util.Map.Entry; import java.util.concurrent.*; +import java.util.function.BiConsumer; import java.util.function.Predicate; import static java.util.concurrent.TimeUnit.MINUTES; @@ -209,8 +209,9 @@ public String getHost() { } /** @param host the default host to use for connections */ - public void setHost(String host) { + public ConnectionFactory setHost(String host) { this.host = host; + return this; } public static int portOrDefault(int port, boolean ssl) { @@ -228,8 +229,9 @@ public int getPort() { * Set the target port. * @param port the default port to use for connections */ - public void setPort(int port) { + public ConnectionFactory setPort(int port) { this.port = port; + return this; } /** @@ -244,11 +246,12 @@ public String getUsername() { * Set the user name. * @param username the AMQP user name to use when connecting to the broker */ - public void setUsername(String username) { + public ConnectionFactory setUsername(String username) { this.credentialsProvider = new DefaultCredentialsProvider( username, this.credentialsProvider.getPassword() ); + return this; } /** @@ -263,11 +266,12 @@ public String getPassword() { * Set the password. * @param password the password to use when connecting to the broker */ - public void setPassword(String password) { + public ConnectionFactory setPassword(String password) { this.credentialsProvider = new DefaultCredentialsProvider( this.credentialsProvider.getUsername(), password ); + return this; } /** @@ -277,8 +281,9 @@ public void setPassword(String password) { * @see com.rabbitmq.client.impl.DefaultCredentialsProvider * @since 4.5.0 */ - public void setCredentialsProvider(CredentialsProvider credentialsProvider) { + public ConnectionFactory setCredentialsProvider(CredentialsProvider credentialsProvider) { this.credentialsProvider = credentialsProvider; + return this; } /** @@ -293,8 +298,9 @@ public String getVirtualHost() { * Set the virtual host. * @param virtualHost the virtual host to use when connecting to the broker */ - public void setVirtualHost(String virtualHost) { + public ConnectionFactory setVirtualHost(String virtualHost) { this.virtualHost = virtualHost; + return this; } @@ -305,7 +311,7 @@ public void setVirtualHost(String virtualHost) { * is left unchanged. * @param uri is the AMQP URI containing the data */ - public void setUri(URI uri) + public ConnectionFactory setUri(URI uri) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException { if ("amqp".equals(uri.getScheme().toLowerCase())) { @@ -360,6 +366,7 @@ public void setUri(URI uri) if (rawQuery != null && rawQuery.length() > 0) { setQuery(rawQuery); } + return this; } /** @@ -372,10 +379,11 @@ public void setUri(URI uri) * hostname are not permitted. * @param uriString is the AMQP URI containing the data */ - public void setUri(String uriString) + public ConnectionFactory setUri(String uriString) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException { setUri(new URI(uriString)); + return this; } private static String uriDecode(String s) { @@ -426,7 +434,7 @@ private static String uriDecode(String s) { * https://www.rabbitmq.com/uri-query-parameters.html * @param rawQuery is the string containing the raw query parameters part from a URI */ - private void setQuery(String rawQuery) { + private ConnectionFactory setQuery(String rawQuery) { Map parameters = new HashMap<>(); // parsing the query parameters try { @@ -452,6 +460,7 @@ private void setQuery(String rawQuery) { processUriQueryParameter(entry.getKey(), entry.getValue()); } } + return this; } /** @@ -480,11 +489,12 @@ public int getRequestedChannelMax() { * * @param requestedChannelMax initially requested maximum channel number; zero for unlimited */ - public void setRequestedChannelMax(int requestedChannelMax) { + public ConnectionFactory setRequestedChannelMax(int requestedChannelMax) { if (requestedChannelMax < 0 || requestedChannelMax > MAX_UNSIGNED_SHORT) { throw new IllegalArgumentException("Requested channel max must be between 0 and " + MAX_UNSIGNED_SHORT); } this.requestedChannelMax = requestedChannelMax; + return this; } /** @@ -499,8 +509,9 @@ public int getRequestedFrameMax() { * Set the requested maximum frame size * @param requestedFrameMax initially requested maximum frame size, in octets; zero for unlimited */ - public void setRequestedFrameMax(int requestedFrameMax) { + public ConnectionFactory setRequestedFrameMax(int requestedFrameMax) { this.requestedFrameMax = requestedFrameMax; + return this; } /** @@ -515,11 +526,12 @@ public int getRequestedHeartbeat() { * Set the TCP connection timeout. * @param timeout connection TCP establishment timeout in milliseconds; zero for infinite */ - public void setConnectionTimeout(int timeout) { + public ConnectionFactory setConnectionTimeout(int timeout) { if(timeout < 0) { throw new IllegalArgumentException("TCP connection timeout cannot be negative"); } this.connectionTimeout = timeout; + return this; } /** @@ -542,11 +554,12 @@ public int getHandshakeTimeout() { * Set the AMQP0-9-1 protocol handshake timeout. * @param timeout the AMQP0-9-1 protocol handshake timeout, in milliseconds */ - public void setHandshakeTimeout(int timeout) { + public ConnectionFactory setHandshakeTimeout(int timeout) { if(timeout < 0) { throw new IllegalArgumentException("handshake timeout cannot be negative"); } this.handshakeTimeout = timeout; + return this; } /** @@ -557,8 +570,9 @@ public void setHandshakeTimeout(int timeout) { * the Consumer's handleShutdownSignal() invocation) will be lost. * @param shutdownTimeout shutdown timeout in milliseconds; zero for infinite; default 10000 */ - public void setShutdownTimeout(int shutdownTimeout) { + public ConnectionFactory setShutdownTimeout(int shutdownTimeout) { this.shutdownTimeout = shutdownTimeout; + return this; } /** @@ -579,11 +593,12 @@ public int getShutdownTimeout() { * @param requestedHeartbeat the initially requested heartbeat timeout, in seconds; zero for none * @see RabbitMQ Heartbeats Guide */ - public void setRequestedHeartbeat(int requestedHeartbeat) { + public ConnectionFactory setRequestedHeartbeat(int requestedHeartbeat) { if (requestedHeartbeat < 0 || requestedHeartbeat > MAX_UNSIGNED_SHORT) { throw new IllegalArgumentException("Requested heartbeat must be between 0 and " + MAX_UNSIGNED_SHORT); } this.requestedHeartbeat = requestedHeartbeat; + return this; } /** @@ -605,8 +620,9 @@ public Map getClientProperties() { * @param clientProperties the map of extra client properties * @see #getClientProperties */ - public void setClientProperties(Map clientProperties) { - _clientProperties = clientProperties; + public ConnectionFactory setClientProperties(Map clientProperties) { + this._clientProperties = clientProperties; + return this; } /** @@ -623,8 +639,9 @@ public SaslConfig getSaslConfig() { * @param saslConfig * @see com.rabbitmq.client.SaslConfig */ - public void setSaslConfig(SaslConfig saslConfig) { + public ConnectionFactory setSaslConfig(SaslConfig saslConfig) { this.saslConfig = saslConfig; + return this; } /** @@ -642,8 +659,9 @@ public SocketFactory getSocketFactory() { * NIO, as the NIO API doesn't use the SocketFactory API. * @see #useSslProtocol */ - public void setSocketFactory(SocketFactory factory) { + public ConnectionFactory setSocketFactory(SocketFactory factory) { this.socketFactory = factory; + return this; } /** @@ -662,8 +680,9 @@ public SocketConfigurator getSocketConfigurator() { * * @param socketConfigurator the configurator to use */ - public void setSocketConfigurator(SocketConfigurator socketConfigurator) { + public ConnectionFactory setSocketConfigurator(SocketConfigurator socketConfigurator) { this.socketConf = socketConfigurator; + return this; } /** @@ -677,8 +696,9 @@ public void setSocketConfigurator(SocketConfigurator socketConfigurator) { * @param executor executor service to be used for * consumer operation */ - public void setSharedExecutor(ExecutorService executor) { + public ConnectionFactory setSharedExecutor(ExecutorService executor) { this.sharedExecutor = executor; + return this; } /** @@ -691,8 +711,9 @@ public void setSharedExecutor(ExecutorService executor) { * @param executor executor service to be used for * connection shutdown */ - public void setShutdownExecutor(ExecutorService executor) { + public ConnectionFactory setShutdownExecutor(ExecutorService executor) { this.shutdownExecutor = executor; + return this; } /** @@ -704,8 +725,9 @@ public void setShutdownExecutor(ExecutorService executor) { * * @param executor executor service to be used to send heartbeat */ - public void setHeartbeatExecutor(ScheduledExecutorService executor) { + public ConnectionFactory setHeartbeatExecutor(ScheduledExecutorService executor) { this.heartbeatExecutor = executor; + return this; } /** @@ -720,8 +742,9 @@ public ThreadFactory getThreadFactory() { * Set the thread factory used to instantiate new threads. * @see ThreadFactory */ - public void setThreadFactory(ThreadFactory threadFactory) { + public ConnectionFactory setThreadFactory(ThreadFactory threadFactory) { this.threadFactory = threadFactory; + return this; } /** @@ -737,11 +760,12 @@ public ExceptionHandler getExceptionHandler() { * Set the exception handler to use for newly created connections. * @see com.rabbitmq.client.ExceptionHandler */ - public void setExceptionHandler(ExceptionHandler exceptionHandler) { + public ConnectionFactory setExceptionHandler(ExceptionHandler exceptionHandler) { if (exceptionHandler == null) { throw new IllegalArgumentException("exception handler cannot be null!"); } this.exceptionHandler = exceptionHandler; + return this; } public boolean isSSL(){ @@ -758,10 +782,10 @@ public boolean isSSL(){ * not recommended to use in production as it provides no protection * against man-in-the-middle attacks. Prefer {@link #useSslProtocol(SSLContext)}. */ - public void useSslProtocol() + public ConnectionFactory useSslProtocol() throws NoSuchAlgorithmException, KeyManagementException { - useSslProtocol(computeDefaultTlsProtocol(SSLContext.getDefault().getSupportedSSLParameters().getProtocols())); + return useSslProtocol(computeDefaultTlsProtocol(SSLContext.getDefault().getSupportedSSLParameters().getProtocols())); } /** @@ -780,10 +804,10 @@ public void useSslProtocol() * Use {@link #setSslContextFactory(SslContextFactory)} for more flexibility. * @see #setSslContextFactory(SslContextFactory) */ - public void useSslProtocol(String protocol) + public ConnectionFactory useSslProtocol(String protocol) throws NoSuchAlgorithmException, KeyManagementException { - useSslProtocol(protocol, new TrustEverythingTrustManager()); + return useSslProtocol(protocol, new TrustEverythingTrustManager()); } /** @@ -800,12 +824,12 @@ public void useSslProtocol(String protocol) * @see #setSslContextFactory(SslContextFactory) * @see #useSslProtocol(SSLContext) */ - public void useSslProtocol(String protocol, TrustManager trustManager) + public ConnectionFactory useSslProtocol(String protocol, TrustManager trustManager) throws NoSuchAlgorithmException, KeyManagementException { SSLContext c = SSLContext.getInstance(protocol); c.init(null, new TrustManager[] { trustManager }, null); - useSslProtocol(c); + return useSslProtocol(c); } /** @@ -820,9 +844,10 @@ public void useSslProtocol(String protocol, TrustManager trustManager) * @param context An initialized SSLContext * @see #setSslContextFactory(SslContextFactory) */ - public void useSslProtocol(SSLContext context) { + public ConnectionFactory useSslProtocol(SSLContext context) { this.sslContextFactory = name -> context; setSocketFactory(context.getSocketFactory()); + return this; } /** @@ -844,9 +869,10 @@ public void useSslProtocol(SSLContext context) { * @see ConnectionFactory#useSslProtocol(String, TrustManager) * @since 5.4.0 */ - public void enableHostnameVerification() { + public ConnectionFactory enableHostnameVerification() { enableHostnameVerificationForNio(); enableHostnameVerificationForBlockingIo(); + return this; } protected void enableHostnameVerificationForNio() { @@ -890,8 +916,9 @@ public boolean isAutomaticRecoveryEnabled() { * @param automaticRecovery if true, enables connection recovery * @see Automatic Recovery */ - public void setAutomaticRecoveryEnabled(boolean automaticRecovery) { + public ConnectionFactory setAutomaticRecoveryEnabled(boolean automaticRecovery) { this.automaticRecovery = automaticRecovery; + return this; } /** @@ -908,8 +935,9 @@ public boolean isTopologyRecoveryEnabled() { * @param topologyRecovery if true, enables topology recovery * @see Automatic Recovery */ - public void setTopologyRecoveryEnabled(boolean topologyRecovery) { + public ConnectionFactory setTopologyRecoveryEnabled(boolean topologyRecovery) { this.topologyRecovery = topologyRecovery; + return this; } /** @@ -929,12 +957,14 @@ public ExecutorService getTopologyRecoveryExecutor() { * @param topologyRecoveryExecutor thread pool executor * @since 4.7.0 */ - public void setTopologyRecoveryExecutor(final ExecutorService topologyRecoveryExecutor) { + public ConnectionFactory setTopologyRecoveryExecutor(final ExecutorService topologyRecoveryExecutor) { this.topologyRecoveryExecutor = topologyRecoveryExecutor; + return this; } - public void setMetricsCollector(MetricsCollector metricsCollector) { + public ConnectionFactory setMetricsCollector(MetricsCollector metricsCollector) { this.metricsCollector = metricsCollector; + return this; } public MetricsCollector getMetricsCollector() { @@ -956,8 +986,9 @@ public MetricsCollector getMetricsCollector() { * @see #setCredentialsProvider(CredentialsProvider) * @see DefaultCredentialsRefreshService */ - public void setCredentialsRefreshService(CredentialsRefreshService credentialsRefreshService) { + public ConnectionFactory setCredentialsRefreshService(CredentialsRefreshService credentialsRefreshService) { this.credentialsRefreshService = credentialsRefreshService; + return this; } protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IOException { @@ -1451,8 +1482,9 @@ public long getNetworkRecoveryInterval() { * @param networkRecoveryInterval how long will automatic recovery wait before attempting to reconnect, in ms * @see RecoveryDelayHandler */ - public void setNetworkRecoveryInterval(int networkRecoveryInterval) { + public ConnectionFactory setNetworkRecoveryInterval(int networkRecoveryInterval) { this.networkRecoveryInterval = networkRecoveryInterval; + return this; } /** @@ -1462,8 +1494,9 @@ public void setNetworkRecoveryInterval(int networkRecoveryInterval) { * @param networkRecoveryInterval how long will automatic recovery wait before attempting to reconnect, in ms * @see RecoveryDelayHandler */ - public void setNetworkRecoveryInterval(long networkRecoveryInterval) { + public ConnectionFactory setNetworkRecoveryInterval(long networkRecoveryInterval) { this.networkRecoveryInterval = networkRecoveryInterval; + return this; } /** @@ -1480,8 +1513,9 @@ public RecoveryDelayHandler getRecoveryDelayHandler() { * @param recoveryDelayHandler the recovery delay handler * @since 4.3.0 */ - public void setRecoveryDelayHandler(final RecoveryDelayHandler recoveryDelayHandler) { + public ConnectionFactory setRecoveryDelayHandler(final RecoveryDelayHandler recoveryDelayHandler) { this.recoveryDelayHandler = recoveryDelayHandler; + return this; } /** @@ -1491,8 +1525,9 @@ public void setRecoveryDelayHandler(final RecoveryDelayHandler recoveryDelayHand * @param nioParams * @see NioParams */ - public void setNioParams(NioParams nioParams) { + public ConnectionFactory setNioParams(NioParams nioParams) { this.nioParams = nioParams; + return this; } /** @@ -1519,8 +1554,9 @@ public NioParams getNioParams() { * @see java.nio.channels.SocketChannel * @see java.nio.channels.Selector */ - public void useNio() { + public ConnectionFactory useNio() { this.nio = true; + return this; } /** @@ -1528,8 +1564,9 @@ public void useNio() { * With blocking IO, each connection creates its own thread * to read data from the server. */ - public void useBlockingIo() { + public ConnectionFactory useBlockingIo() { this.nio = false; + return this; } /** @@ -1537,11 +1574,12 @@ public void useBlockingIo() { * Default is 10 minutes. 0 means no timeout. * @param channelRpcTimeout */ - public void setChannelRpcTimeout(int channelRpcTimeout) { + public ConnectionFactory setChannelRpcTimeout(int channelRpcTimeout) { if(channelRpcTimeout < 0) { throw new IllegalArgumentException("Timeout cannot be less than 0"); } this.channelRpcTimeout = channelRpcTimeout; + return this; } /** @@ -1564,8 +1602,9 @@ public int getChannelRpcTimeout() { * @see #useSslProtocol(SSLContext) * @since 5.0.0 */ - public void setSslContextFactory(SslContextFactory sslContextFactory) { + public ConnectionFactory setSslContextFactory(SslContextFactory sslContextFactory) { this.sslContextFactory = sslContextFactory; + return this; } /** @@ -1575,8 +1614,9 @@ public void setSslContextFactory(SslContextFactory sslContextFactory) { * Default is false. * @param channelShouldCheckRpcResponseType */ - public void setChannelShouldCheckRpcResponseType(boolean channelShouldCheckRpcResponseType) { + public ConnectionFactory setChannelShouldCheckRpcResponseType(boolean channelShouldCheckRpcResponseType) { this.channelShouldCheckRpcResponseType = channelShouldCheckRpcResponseType; + return this; } public boolean isChannelShouldCheckRpcResponseType() { @@ -1598,8 +1638,9 @@ public boolean isChannelShouldCheckRpcResponseType() { * @param workPoolTimeout timeout in ms * @since 4.5.0 */ - public void setWorkPoolTimeout(int workPoolTimeout) { + public ConnectionFactory setWorkPoolTimeout(int workPoolTimeout) { this.workPoolTimeout = workPoolTimeout; + return this; } public int getWorkPoolTimeout() { @@ -1615,8 +1656,9 @@ public int getWorkPoolTimeout() { * @param errorOnWriteListener the listener * @since 4.5.0 */ - public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) { + public ConnectionFactory setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) { this.errorOnWriteListener = errorOnWriteListener; + return this; } /** @@ -1624,8 +1666,9 @@ public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) { * * @since 4.8.0 */ - public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter) { + public ConnectionFactory setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter) { this.topologyRecoveryFilter = topologyRecoveryFilter; + return this; } /** @@ -1634,8 +1677,9 @@ public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFil * * @param connectionRecoveryTriggeringCondition */ - public void setConnectionRecoveryTriggeringCondition(Predicate connectionRecoveryTriggeringCondition) { + public ConnectionFactory setConnectionRecoveryTriggeringCondition(Predicate connectionRecoveryTriggeringCondition) { this.connectionRecoveryTriggeringCondition = connectionRecoveryTriggeringCondition; + return this; } /** @@ -1645,8 +1689,9 @@ public void setConnectionRecoveryTriggeringCondition(Predicate call, boolean expectException) { } + @Test + public void shouldBeConfigurableUsingFluentAPI() throws Exception { + /* GIVEN */ + Map clientProperties = new HashMap<>(); + SaslConfig saslConfig = mock(SaslConfig.class); + ConnectionFactory connectionFactory = new ConnectionFactory(); + SocketFactory socketFactory = mock(SocketFactory.class); + SocketConfigurator socketConfigurator = mock(SocketConfigurator.class); + ExecutorService executorService = mock(ExecutorService.class); + ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class); + ThreadFactory threadFactory = mock(ThreadFactory.class); + ExceptionHandler exceptionHandler = mock(ExceptionHandler.class); + MetricsCollector metricsCollector = mock(MetricsCollector.class); + CredentialsRefreshService credentialsRefreshService = mock(CredentialsRefreshService.class); + RecoveryDelayHandler recoveryDelayHandler = mock(RecoveryDelayHandler.class); + NioParams nioParams = mock(NioParams.class); + SslContextFactory sslContextFactory = mock(SslContextFactory.class); + TopologyRecoveryFilter topologyRecoveryFilter = mock(TopologyRecoveryFilter.class); + Predicate connectionRecoveryTriggeringCondition = (ShutdownSignalException) -> true; + RetryHandler retryHandler = mock(RetryHandler.class); + RecoveredQueueNameSupplier recoveredQueueNameSupplier = mock(RecoveredQueueNameSupplier.class); + + /* WHEN */ + connectionFactory + .setHost("rabbitmq") + .setPort(5672) + .setUsername("guest") + .setPassword("guest") + .setVirtualHost("/") + .setRequestedChannelMax(1) + .setRequestedFrameMax(2) + .setRequestedHeartbeat(3) + .setConnectionTimeout(4) + .setHandshakeTimeout(5) + .setShutdownTimeout(6) + .setClientProperties(clientProperties) + .setSaslConfig(saslConfig) + .setSocketFactory(socketFactory) + .setSocketConfigurator(socketConfigurator) + .setSharedExecutor(executorService) + .setShutdownExecutor(executorService) + .setHeartbeatExecutor(scheduledExecutorService) + .setThreadFactory(threadFactory) + .setExceptionHandler(exceptionHandler) + .setAutomaticRecoveryEnabled(true) + .setTopologyRecoveryEnabled(true) + .setTopologyRecoveryExecutor(executorService) + .setMetricsCollector(metricsCollector) + .setCredentialsRefreshService(credentialsRefreshService) + .setNetworkRecoveryInterval(7) + .setRecoveryDelayHandler(recoveryDelayHandler) + .setNioParams(nioParams) + .useNio() + .useBlockingIo() + .setChannelRpcTimeout(8) + .setSslContextFactory(sslContextFactory) + .setChannelShouldCheckRpcResponseType(true) + .setWorkPoolTimeout(9) + .setTopologyRecoveryFilter(topologyRecoveryFilter) + .setConnectionRecoveryTriggeringCondition(connectionRecoveryTriggeringCondition) + .setTopologyRecoveryRetryHandler(retryHandler) + .setRecoveredQueueNameSupplier(recoveredQueueNameSupplier); + + /* THEN */ + assertThat(connectionFactory.getHost()).isEqualTo("rabbitmq"); + assertThat(connectionFactory.getPort()).isEqualTo(5672); + assertThat(connectionFactory.getUsername()).isEqualTo("guest"); + assertThat(connectionFactory.getPassword()).isEqualTo("guest"); + assertThat(connectionFactory.getVirtualHost()).isEqualTo("/"); + assertThat(connectionFactory.getRequestedChannelMax()).isEqualTo(1); + assertThat(connectionFactory.getRequestedFrameMax()).isEqualTo(2); + assertThat(connectionFactory.getRequestedHeartbeat()).isEqualTo(3); + assertThat(connectionFactory.getConnectionTimeout()).isEqualTo(4); + assertThat(connectionFactory.getHandshakeTimeout()).isEqualTo(5); + assertThat(connectionFactory.getShutdownTimeout()).isEqualTo(6); + assertThat(connectionFactory.getClientProperties()).isEqualTo(clientProperties); + assertThat(connectionFactory.getSaslConfig()).isEqualTo(saslConfig); + assertThat(connectionFactory.getSocketFactory()).isEqualTo(socketFactory); + assertThat(connectionFactory.getSocketConfigurator()).isEqualTo(socketConfigurator); + assertThat(connectionFactory.isAutomaticRecoveryEnabled()).isEqualTo(true); + assertThat(connectionFactory.isTopologyRecoveryEnabled()).isEqualTo(true); + assertThat(connectionFactory.getMetricsCollector()).isEqualTo(metricsCollector); + assertThat(connectionFactory.getNetworkRecoveryInterval()).isEqualTo(7); + assertThat(connectionFactory.getRecoveryDelayHandler()).isEqualTo(recoveryDelayHandler); + assertThat(connectionFactory.getNioParams()).isEqualTo(nioParams); + assertThat(connectionFactory.getChannelRpcTimeout()).isEqualTo(8); + assertThat(connectionFactory.isChannelShouldCheckRpcResponseType()).isEqualTo(true); + assertThat(connectionFactory.getWorkPoolTimeout()).isEqualTo(9); + assertThat(connectionFactory.isSSL()).isEqualTo(true); + + /* Now test cross-cutting setters that override properties set by other setters */ + CredentialsProvider credentialsProvider = mock(CredentialsProvider.class); + when(credentialsProvider.getUsername()).thenReturn("admin"); + when(credentialsProvider.getPassword()).thenReturn("admin"); + connectionFactory + .setCredentialsProvider(credentialsProvider) + .setUri("amqp://host:5671") + .useSslProtocol("TLSv1.2"); + assertThat(connectionFactory.getHost()).isEqualTo("host"); + assertThat(connectionFactory.getPort()).isEqualTo(5671); + assertThat(connectionFactory.getUsername()).isEqualTo("admin"); + assertThat(connectionFactory.getPassword()).isEqualTo("admin"); + assertThat(connectionFactory.isSSL()).isEqualTo(true); + } + } diff --git a/src/test/java/com/rabbitmq/client/test/JavaNioTest.java b/src/test/java/com/rabbitmq/client/test/JavaNioTest.java index 925c31ed55..cdc095e40c 100644 --- a/src/test/java/com/rabbitmq/client/test/JavaNioTest.java +++ b/src/test/java/com/rabbitmq/client/test/JavaNioTest.java @@ -30,8 +30,8 @@ public class JavaNioTest { @BeforeEach public void init() throws Exception { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.useNio(); + ConnectionFactory connectionFactory = new ConnectionFactory() + .useNio(); testConnection = connectionFactory.newConnection(); } @@ -46,8 +46,8 @@ public void tearDown() throws Exception { @Test public void connection() throws Exception { CountDownLatch latch = new CountDownLatch(1); - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.useNio(); + ConnectionFactory connectionFactory = new ConnectionFactory() + .useNio(); Connection connection = null; try { connection = basicGetBasicConsume(connectionFactory, "nio.queue", latch); @@ -61,9 +61,9 @@ public void connection() throws Exception { @Test public void twoConnections() throws IOException, TimeoutException, InterruptedException { CountDownLatch latch = new CountDownLatch(2); - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.useNio(); - connectionFactory.setNioParams(new NioParams().setNbIoThreads(4)); + ConnectionFactory connectionFactory = new ConnectionFactory() + .useNio() + .setNioParams(new NioParams().setNbIoThreads(4)); Connection connection1 = null; Connection connection2 = null; try { @@ -82,8 +82,8 @@ public void twoConnections() throws IOException, TimeoutException, InterruptedEx public void twoConnectionsWithNioExecutor() throws IOException, TimeoutException, InterruptedException { CountDownLatch latch = new CountDownLatch(2); ExecutorService nioExecutor = Executors.newFixedThreadPool(5); - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.useNio(); + ConnectionFactory connectionFactory = new ConnectionFactory() + .useNio(); Connection connection1 = null; Connection connection2 = null; try { @@ -101,8 +101,8 @@ public void twoConnectionsWithNioExecutor() throws IOException, TimeoutException @Test public void shutdownListenerCalled() throws IOException, TimeoutException, InterruptedException { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.useNio(); + ConnectionFactory connectionFactory = new ConnectionFactory() + .useNio(); Connection connection = connectionFactory.newConnection(); try { final CountDownLatch latch = new CountDownLatch(1); @@ -122,8 +122,8 @@ public void shutdownCompleted(ShutdownSignalException cause) { @Test public void nioLoopCleaning() throws Exception { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.useNio(); + ConnectionFactory connectionFactory = new ConnectionFactory() + .useNio(); for (int i = 0; i < 10; i++) { Connection connection = connectionFactory.newConnection(); connection.abort(); @@ -139,20 +139,20 @@ public void messageSize() throws Exception { @Test public void byteBufferFactory() throws Exception { - ConnectionFactory cf = new ConnectionFactory(); - cf.useNio(); + ConnectionFactory connectionFactory = new ConnectionFactory() + .useNio(); int baseCapacity = 32768; NioParams nioParams = new NioParams(); nioParams.setReadByteBufferSize(baseCapacity / 2); nioParams.setWriteByteBufferSize(baseCapacity / 4); List byteBuffers = new CopyOnWriteArrayList<>(); - cf.setNioParams(nioParams.setByteBufferFactory(new DefaultByteBufferFactory(capacity -> { + connectionFactory.setNioParams(nioParams.setByteBufferFactory(new DefaultByteBufferFactory(capacity -> { ByteBuffer bb = ByteBuffer.allocate(capacity); byteBuffers.add(bb); return bb; }))); - try (Connection c = cf.newConnection()) { + try (Connection c = connectionFactory.newConnection()) { sendAndVerifyMessage(c, 100); } @@ -165,27 +165,27 @@ public void byteBufferFactory() throws Exception { @Test public void directByteBuffers() throws Exception { - ConnectionFactory cf = new ConnectionFactory(); - cf.useNio(); - cf.setNioParams(new NioParams().setByteBufferFactory(new DefaultByteBufferFactory(capacity -> ByteBuffer.allocateDirect(capacity)))); - try (Connection c = cf.newConnection()) { + ConnectionFactory connectionFactory = new ConnectionFactory() + .useNio() + .setNioParams(new NioParams().setByteBufferFactory(new DefaultByteBufferFactory(capacity -> ByteBuffer.allocateDirect(capacity)))); + try (Connection c = connectionFactory.newConnection()) { sendAndVerifyMessage(c, 100); } } @Test public void customWriteQueue() throws Exception { - ConnectionFactory cf = new ConnectionFactory(); - cf.useNio(); AtomicInteger count = new AtomicInteger(0); - cf.setNioParams(new NioParams().setWriteQueueFactory(ctx -> { - count.incrementAndGet(); - return new BlockingQueueNioQueue( - new LinkedBlockingQueue<>(ctx.getNioParams().getWriteQueueCapacity()), - ctx.getNioParams().getWriteEnqueuingTimeoutInMs() - ); - })); - try (Connection c = cf.newConnection()) { + ConnectionFactory connectionFactory = new ConnectionFactory() + .useNio() + .setNioParams(new NioParams().setWriteQueueFactory(ctx -> { + count.incrementAndGet(); + return new BlockingQueueNioQueue( + new LinkedBlockingQueue<>(ctx.getNioParams().getWriteQueueCapacity()), + ctx.getNioParams().getWriteEnqueuingTimeoutInMs() + ); + })); + try (Connection c = connectionFactory.newConnection()) { sendAndVerifyMessage(c, 100); } assertEquals(1, count.get()); diff --git a/src/test/java/com/rabbitmq/client/test/PropertyFileInitialisationTest.java b/src/test/java/com/rabbitmq/client/test/PropertyFileInitialisationTest.java index 24d3f808d8..d0d52ab556 100644 --- a/src/test/java/com/rabbitmq/client/test/PropertyFileInitialisationTest.java +++ b/src/test/java/com/rabbitmq/client/test/PropertyFileInitialisationTest.java @@ -205,9 +205,9 @@ public void tlsSslContextSetIfTlsEnabled() { AtomicBoolean sslProtocolSet = new AtomicBoolean(false); ConnectionFactory connectionFactory = new ConnectionFactory() { @Override - public void useSslProtocol(SSLContext context) { + public ConnectionFactory useSslProtocol(SSLContext context) { sslProtocolSet.set(true); - super.useSslProtocol(context); + return super.useSslProtocol(context); } }; ConnectionFactoryConfigurator.load( diff --git a/src/test/java/com/rabbitmq/client/test/SslContextFactoryTest.java b/src/test/java/com/rabbitmq/client/test/SslContextFactoryTest.java index 69a3eee1e8..af4e206977 100644 --- a/src/test/java/com/rabbitmq/client/test/SslContextFactoryTest.java +++ b/src/test/java/com/rabbitmq/client/test/SslContextFactoryTest.java @@ -40,30 +40,22 @@ public class SslContextFactoryTest { @Test public void setSslContextFactory() throws Exception { - doTestSetSslContextFactory(() -> { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.useBlockingIo(); - connectionFactory.setAutomaticRecoveryEnabled(true); - return connectionFactory; - }); - doTestSetSslContextFactory(() -> { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.useNio(); - connectionFactory.setAutomaticRecoveryEnabled(true); - return connectionFactory; - }); - doTestSetSslContextFactory(() -> { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.useBlockingIo(); - connectionFactory.setAutomaticRecoveryEnabled(false); - return connectionFactory; - }); - doTestSetSslContextFactory(() -> { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.useNio(); - connectionFactory.setAutomaticRecoveryEnabled(false); - return connectionFactory; - }); + doTestSetSslContextFactory(() -> new ConnectionFactory() + .useBlockingIo() + .setAutomaticRecoveryEnabled(true) + ); + doTestSetSslContextFactory(() -> new ConnectionFactory() + .useNio() + .setAutomaticRecoveryEnabled(true) + ); + doTestSetSslContextFactory(() -> new ConnectionFactory() + .useBlockingIo() + .setAutomaticRecoveryEnabled(false) + ); + doTestSetSslContextFactory(() -> new ConnectionFactory() + .useNio() + .setAutomaticRecoveryEnabled(false) + ); } private void doTestSetSslContextFactory(Supplier supplier) throws Exception { @@ -82,31 +74,27 @@ private void doTestSetSslContextFactory(Supplier supplier) th } @Test public void socketFactoryTakesPrecedenceOverSslContextFactoryWithBlockingIo() throws Exception { - doTestSocketFactoryTakesPrecedenceOverSslContextFactoryWithBlockingIo(() -> { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.useBlockingIo(); - connectionFactory.setAutomaticRecoveryEnabled(true); - return connectionFactory; - }); - doTestSocketFactoryTakesPrecedenceOverSslContextFactoryWithBlockingIo(() -> { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.useBlockingIo(); - connectionFactory.setAutomaticRecoveryEnabled(false); - return connectionFactory; - }); + doTestSocketFactoryTakesPrecedenceOverSslContextFactoryWithBlockingIo(() -> new ConnectionFactory() + .useBlockingIo() + .setAutomaticRecoveryEnabled(true) + ); + doTestSocketFactoryTakesPrecedenceOverSslContextFactoryWithBlockingIo(() -> new ConnectionFactory() + .useBlockingIo() + .setAutomaticRecoveryEnabled(false) + ); } private void doTestSocketFactoryTakesPrecedenceOverSslContextFactoryWithBlockingIo( Supplier supplier ) throws Exception { - ConnectionFactory connectionFactory = supplier.get(); - connectionFactory.useBlockingIo(); SslContextFactory sslContextFactory = sslContextFactory(); - connectionFactory.setSslContextFactory(sslContextFactory); - SSLContext contextAcceptAll = sslContextFactory.create("connection01"); - connectionFactory.setSocketFactory(contextAcceptAll.getSocketFactory()); - + ConnectionFactory connectionFactory = supplier.get(); + connectionFactory + .useBlockingIo() + .setSslContextFactory(sslContextFactory) + .setSocketFactory(contextAcceptAll.getSocketFactory()); + Connection connection = connectionFactory.newConnection("connection01"); TestUtils.close(connection); connection = connectionFactory.newConnection("connection02");