diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index e9744f19f7..4345da5caa 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -66,6 +66,7 @@ class StreamEnvironment implements Environment { private final EventLoopGroup eventLoopGroup; private final ScheduledExecutorService scheduledExecutorService; + private final ScheduledExecutorService locatorReconnectionScheduledExecutorService; private final boolean privateScheduleExecutorService; private final Client.ClientParameters clientParametersPrototype; private final List
addresses; @@ -235,17 +236,22 @@ class StreamEnvironment implements Environment { maxProducersByConnection, maxTrackingConsumersByConnection, connectionNamingStrategy, - Utils.coordinatorClientFactory(this, producerNodeRetryDelay), + coordinatorClientFactory(this, producerNodeRetryDelay), forceLeaderForProducers); this.consumersCoordinator = new ConsumersCoordinator( this, maxConsumersByConnection, connectionNamingStrategy, - Utils.coordinatorClientFactory(this, consumerNodeRetryDelay), + coordinatorClientFactory(this, consumerNodeRetryDelay), forceReplicaForConsumers, Utils.brokerPicker()); this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this); + + ThreadFactory threadFactory = threadFactory("rabbitmq-stream-environment-locator-scheduler-"); + this.locatorReconnectionScheduledExecutorService = + Executors.newScheduledThreadPool(this.locators.size(), threadFactory); + ClientParameters clientParametersForInit = locatorParametersCopy(); Runnable locatorInitSequence = () -> { @@ -291,7 +297,7 @@ class StreamEnvironment implements Environment { l, connectionNamingStrategy, clientFactory, - this.scheduledExecutorService, + this.locatorReconnectionScheduledExecutorService, this.recoveryBackOffDelayPolicy, l.label()); } @@ -338,7 +344,7 @@ private ShutdownListener shutdownListener( locator, connectionNamingStrategy, clientFactory, - this.scheduledExecutorService, + this.locatorReconnectionScheduledExecutorService, delayPolicy, label); } else { @@ -683,6 +689,9 @@ public void close() { if (privateScheduleExecutorService) { this.scheduledExecutorService.shutdownNow(); } + if (this.locatorReconnectionScheduledExecutorService != null) { + this.locatorReconnectionScheduledExecutorService.shutdownNow(); + } try { if (this.eventLoopGroup != null && (!this.eventLoopGroup.isShuttingDown() || !this.eventLoopGroup.isShutdown())) { @@ -904,13 +913,7 @@ TrackingConsumerRegistration registerTrackingConsumer( @Override public String toString() { return "{ \"locators\" : [" - + this.locators.stream() - .map( - l -> { - Client c = l.nullableClient(); - return c == null ? "null" : ("\"" + l.label() + "\""); - }) - .collect(Collectors.joining(",")) + + this.locators.stream().map(l -> quote(l.label())).collect(Collectors.joining(",")) + "], " + Utils.jsonField("producer_client_count", this.producersCoordinator.clientCount()) + "," diff --git a/src/test/java/com/rabbitmq/stream/Cli.java b/src/test/java/com/rabbitmq/stream/Cli.java index e532cd508b..b02f9b8349 100644 --- a/src/test/java/com/rabbitmq/stream/Cli.java +++ b/src/test/java/com/rabbitmq/stream/Cli.java @@ -252,6 +252,15 @@ public static String rabbitmqctlCommand() { } } + private static String dockerContainer() { + if (rabbitmqctlCommand().startsWith("docker")) { + String rabbitmqCtl = System.getProperty("rabbitmqctl.bin"); + return rabbitmqCtl.split(":")[1]; + } else { + throw new IllegalStateException("Broker does not run on broker"); + } + } + private static String rabbitmqStreamsCommand() { String rabbitmqctl = rabbitmqctlCommand(); int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl"); @@ -354,6 +363,12 @@ public static void restartNode(String node) { executeCommand(dockerCommand + "rabbitmqctl status"); } + public static void restartBrokerContainer() { + String container = dockerContainer(); + executeCommand("docker stop " + container); + executeCommand("docker start " + container); + } + public static void rebalance() { rabbitmqQueues("rebalance all"); } diff --git a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java index d67787a610..753d5d229d 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java @@ -72,12 +72,9 @@ public class RecoveryClusterTest { static List