diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index e9744f19f7..4345da5caa 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -66,6 +66,7 @@ class StreamEnvironment implements Environment { private final EventLoopGroup eventLoopGroup; private final ScheduledExecutorService scheduledExecutorService; + private final ScheduledExecutorService locatorReconnectionScheduledExecutorService; private final boolean privateScheduleExecutorService; private final Client.ClientParameters clientParametersPrototype; private final List
addresses; @@ -235,17 +236,22 @@ class StreamEnvironment implements Environment { maxProducersByConnection, maxTrackingConsumersByConnection, connectionNamingStrategy, - Utils.coordinatorClientFactory(this, producerNodeRetryDelay), + coordinatorClientFactory(this, producerNodeRetryDelay), forceLeaderForProducers); this.consumersCoordinator = new ConsumersCoordinator( this, maxConsumersByConnection, connectionNamingStrategy, - Utils.coordinatorClientFactory(this, consumerNodeRetryDelay), + coordinatorClientFactory(this, consumerNodeRetryDelay), forceReplicaForConsumers, Utils.brokerPicker()); this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this); + + ThreadFactory threadFactory = threadFactory("rabbitmq-stream-environment-locator-scheduler-"); + this.locatorReconnectionScheduledExecutorService = + Executors.newScheduledThreadPool(this.locators.size(), threadFactory); + ClientParameters clientParametersForInit = locatorParametersCopy(); Runnable locatorInitSequence = () -> { @@ -291,7 +297,7 @@ class StreamEnvironment implements Environment { l, connectionNamingStrategy, clientFactory, - this.scheduledExecutorService, + this.locatorReconnectionScheduledExecutorService, this.recoveryBackOffDelayPolicy, l.label()); } @@ -338,7 +344,7 @@ private ShutdownListener shutdownListener( locator, connectionNamingStrategy, clientFactory, - this.scheduledExecutorService, + this.locatorReconnectionScheduledExecutorService, delayPolicy, label); } else { @@ -683,6 +689,9 @@ public void close() { if (privateScheduleExecutorService) { this.scheduledExecutorService.shutdownNow(); } + if (this.locatorReconnectionScheduledExecutorService != null) { + this.locatorReconnectionScheduledExecutorService.shutdownNow(); + } try { if (this.eventLoopGroup != null && (!this.eventLoopGroup.isShuttingDown() || !this.eventLoopGroup.isShutdown())) { @@ -904,13 +913,7 @@ TrackingConsumerRegistration registerTrackingConsumer( @Override public String toString() { return "{ \"locators\" : [" - + this.locators.stream() - .map( - l -> { - Client c = l.nullableClient(); - return c == null ? "null" : ("\"" + l.label() + "\""); - }) - .collect(Collectors.joining(",")) + + this.locators.stream().map(l -> quote(l.label())).collect(Collectors.joining(",")) + "], " + Utils.jsonField("producer_client_count", this.producersCoordinator.clientCount()) + "," diff --git a/src/test/java/com/rabbitmq/stream/Cli.java b/src/test/java/com/rabbitmq/stream/Cli.java index e532cd508b..b02f9b8349 100644 --- a/src/test/java/com/rabbitmq/stream/Cli.java +++ b/src/test/java/com/rabbitmq/stream/Cli.java @@ -252,6 +252,15 @@ public static String rabbitmqctlCommand() { } } + private static String dockerContainer() { + if (rabbitmqctlCommand().startsWith("docker")) { + String rabbitmqCtl = System.getProperty("rabbitmqctl.bin"); + return rabbitmqCtl.split(":")[1]; + } else { + throw new IllegalStateException("Broker does not run on broker"); + } + } + private static String rabbitmqStreamsCommand() { String rabbitmqctl = rabbitmqctlCommand(); int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl"); @@ -354,6 +363,12 @@ public static void restartNode(String node) { executeCommand(dockerCommand + "rabbitmqctl status"); } + public static void restartBrokerContainer() { + String container = dockerContainer(); + executeCommand("docker stop " + container); + executeCommand("docker start " + container); + } + public static void rebalance() { rabbitmqQueues("rebalance all"); } diff --git a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java index d67787a610..753d5d229d 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java @@ -72,12 +72,9 @@ public class RecoveryClusterTest { static List logLevels; static List> logClasses = List.of( - ProducersCoordinator.class, - ConsumersCoordinator.class, - StreamEnvironment.class, - AsyncRetry.class, - StreamEnvironment.class, - ScheduledExecutorServiceWrapper.class); + // ProducersCoordinator.class, + // ConsumersCoordinator.class, + AsyncRetry.class, StreamEnvironment.class, ScheduledExecutorServiceWrapper.class); ScheduledExecutorService scheduledExecutorService; @BeforeAll diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java index 3b147ed458..2e99dc438f 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java @@ -14,13 +14,9 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; +import static com.rabbitmq.stream.impl.TestUtils.*; import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed; import static com.rabbitmq.stream.impl.TestUtils.ExceptionConditions.responseCode; -import static com.rabbitmq.stream.impl.TestUtils.latchAssert; -import static com.rabbitmq.stream.impl.TestUtils.localhost; -import static com.rabbitmq.stream.impl.TestUtils.localhostTls; -import static com.rabbitmq.stream.impl.TestUtils.streamName; -import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; @@ -57,7 +53,6 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollSocketChannel; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.ssl.SslHandler; import java.net.ConnectException; import java.nio.charset.StandardCharsets; @@ -92,22 +87,11 @@ @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class StreamEnvironmentTest { - static EventLoopGroup eventLoopGroup; - EnvironmentBuilder environmentBuilder; String stream; TestUtils.ClientFactory cf; - - @BeforeAll - static void initAll() { - eventLoopGroup = new NioEventLoopGroup(); - } - - @AfterAll - static void afterAll() throws Exception { - eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS); - } + EventLoopGroup eventLoopGroup; @BeforeEach void init() { diff --git a/src/test/java/com/rabbitmq/stream/impl/TlsTest.java b/src/test/java/com/rabbitmq/stream/impl/TlsTest.java index 04964069d6..78e4b4e8de 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TlsTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/TlsTest.java @@ -56,8 +56,6 @@ import javax.net.ssl.SSLException; import javax.net.ssl.SSLHandshakeException; import javax.net.ssl.SSLParameters; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -65,26 +63,6 @@ @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class TlsTest { - static boolean isJava13() { - String javaVersion = System.getProperty("java.version"); - return javaVersion != null && javaVersion.startsWith("13."); - } - - @BeforeEach - public void init() { - if (isJava13()) { - // for Java 13.0.7, see https://github.com/bcgit/bc-java/issues/941 - System.setProperty("keystore.pkcs12.keyProtectionAlgorithm", "PBEWithHmacSHA256AndAES_256"); - } - } - - @AfterEach - public void tearDown() throws Exception { - if (isJava13()) { - System.setProperty("keystore.pkcs12.keyProtectionAlgorithm", ""); - } - } - String stream; TestUtils.ClientFactory cf;