From 0b92a794e4efc49f7c4dbe5f53c61ea4ec6d0192 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Thu, 12 Dec 2024 15:15:41 +0100 Subject: [PATCH 1/2] Run only recovery tests --- .github/workflows/test-pr.yml | 48 +++++++++---------- .../stream/impl/StreamEnvironment.java | 6 +-- .../stream/impl/RecoveryClusterTest.java | 9 ++-- 3 files changed, 29 insertions(+), 34 deletions(-) diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index 380db0602c..f0e0b72e09 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -11,38 +11,38 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Checkout tls-gen - uses: actions/checkout@v4 - with: - repository: rabbitmq/tls-gen - path: './tls-gen' +# - name: Checkout tls-gen +# uses: actions/checkout@v4 +# with: +# repository: rabbitmq/tls-gen +# path: './tls-gen' - name: Set up JDK uses: actions/setup-java@v4 with: distribution: 'temurin' java-version: '21' cache: 'maven' - - name: Start broker - run: ci/start-broker.sh - - name: Test (no dynamic-batch publishing) - run: | - ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ - -Drabbitmq.stream.producer.dynamic.batch=false \ - -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ - -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ - -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem - - name: Test (dynamic-batch publishing) - run: | - ./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \ - -Drabbitmq.stream.producer.dynamic.batch=true \ - -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ - -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ - -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem - - name: Stop broker - run: docker stop rabbitmq && docker rm rabbitmq +# - name: Start broker +# run: ci/start-broker.sh +# - name: Test (no dynamic-batch publishing) +# run: | +# ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ +# -Drabbitmq.stream.producer.dynamic.batch=false \ +# -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ +# -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ +# -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem +# - name: Test (dynamic-batch publishing) +# run: | +# ./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \ +# -Drabbitmq.stream.producer.dynamic.batch=true \ +# -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ +# -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ +# -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem +# - name: Stop broker +# run: docker stop rabbitmq && docker rm rabbitmq - name: Start cluster run: ci/start-cluster.sh - name: Test against cluster - run: ./mvnw test -Dtest="*ClusterTest" -Drabbitmqctl.bin=DOCKER:rabbitmq0 + run: ./mvnw test -Dtest="RecoveryClusterTest" -Drabbitmqctl.bin=DOCKER:rabbitmq0 - name: Stop cluster run: docker compose --file ci/cluster/docker-compose.yml down diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index e9744f19f7..a4c6eaaba4 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -905,11 +905,7 @@ TrackingConsumerRegistration registerTrackingConsumer( public String toString() { return "{ \"locators\" : [" + this.locators.stream() - .map( - l -> { - Client c = l.nullableClient(); - return c == null ? "null" : ("\"" + l.label() + "\""); - }) + .map(l -> quote(l.label())) .collect(Collectors.joining(",")) + "], " + Utils.jsonField("producer_client_count", this.producersCoordinator.clientCount()) diff --git a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java index d67787a610..f714e5ff6e 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java @@ -72,9 +72,8 @@ public class RecoveryClusterTest { static List logLevels; static List> logClasses = List.of( - ProducersCoordinator.class, - ConsumersCoordinator.class, - StreamEnvironment.class, +// ProducersCoordinator.class, +// ConsumersCoordinator.class, AsyncRetry.class, StreamEnvironment.class, ScheduledExecutorServiceWrapper.class); @@ -126,9 +125,9 @@ static void tearDownAll() { @ParameterizedTest @CsvSource({ - "false,false", +// "false,false", "true,true", - "true,false", +// "true,false", }) void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws InterruptedException { LOGGER.info( From bedf745bb540f3b453eb2c20feb1871ae658be60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Thu, 12 Dec 2024 17:00:54 +0100 Subject: [PATCH 2/2] Use dedicated scheduler for locator recovery --- .github/workflows/test-pr.yml | 48 +++++++++---------- .../stream/impl/StreamEnvironment.java | 21 +++++--- src/test/java/com/rabbitmq/stream/Cli.java | 15 ++++++ .../stream/impl/RecoveryClusterTest.java | 12 ++--- .../stream/impl/StreamEnvironmentTest.java | 20 +------- .../com/rabbitmq/stream/impl/TlsTest.java | 22 --------- 6 files changed, 60 insertions(+), 78 deletions(-) diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index f0e0b72e09..380db0602c 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -11,38 +11,38 @@ jobs: steps: - uses: actions/checkout@v4 -# - name: Checkout tls-gen -# uses: actions/checkout@v4 -# with: -# repository: rabbitmq/tls-gen -# path: './tls-gen' + - name: Checkout tls-gen + uses: actions/checkout@v4 + with: + repository: rabbitmq/tls-gen + path: './tls-gen' - name: Set up JDK uses: actions/setup-java@v4 with: distribution: 'temurin' java-version: '21' cache: 'maven' -# - name: Start broker -# run: ci/start-broker.sh -# - name: Test (no dynamic-batch publishing) -# run: | -# ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ -# -Drabbitmq.stream.producer.dynamic.batch=false \ -# -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ -# -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ -# -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem -# - name: Test (dynamic-batch publishing) -# run: | -# ./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \ -# -Drabbitmq.stream.producer.dynamic.batch=true \ -# -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ -# -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ -# -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem -# - name: Stop broker -# run: docker stop rabbitmq && docker rm rabbitmq + - name: Start broker + run: ci/start-broker.sh + - name: Test (no dynamic-batch publishing) + run: | + ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=false \ + -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ + -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ + -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem + - name: Test (dynamic-batch publishing) + run: | + ./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=true \ + -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ + -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ + -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem + - name: Stop broker + run: docker stop rabbitmq && docker rm rabbitmq - name: Start cluster run: ci/start-cluster.sh - name: Test against cluster - run: ./mvnw test -Dtest="RecoveryClusterTest" -Drabbitmqctl.bin=DOCKER:rabbitmq0 + run: ./mvnw test -Dtest="*ClusterTest" -Drabbitmqctl.bin=DOCKER:rabbitmq0 - name: Stop cluster run: docker compose --file ci/cluster/docker-compose.yml down diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index a4c6eaaba4..4345da5caa 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -66,6 +66,7 @@ class StreamEnvironment implements Environment { private final EventLoopGroup eventLoopGroup; private final ScheduledExecutorService scheduledExecutorService; + private final ScheduledExecutorService locatorReconnectionScheduledExecutorService; private final boolean privateScheduleExecutorService; private final Client.ClientParameters clientParametersPrototype; private final List
addresses; @@ -235,17 +236,22 @@ class StreamEnvironment implements Environment { maxProducersByConnection, maxTrackingConsumersByConnection, connectionNamingStrategy, - Utils.coordinatorClientFactory(this, producerNodeRetryDelay), + coordinatorClientFactory(this, producerNodeRetryDelay), forceLeaderForProducers); this.consumersCoordinator = new ConsumersCoordinator( this, maxConsumersByConnection, connectionNamingStrategy, - Utils.coordinatorClientFactory(this, consumerNodeRetryDelay), + coordinatorClientFactory(this, consumerNodeRetryDelay), forceReplicaForConsumers, Utils.brokerPicker()); this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this); + + ThreadFactory threadFactory = threadFactory("rabbitmq-stream-environment-locator-scheduler-"); + this.locatorReconnectionScheduledExecutorService = + Executors.newScheduledThreadPool(this.locators.size(), threadFactory); + ClientParameters clientParametersForInit = locatorParametersCopy(); Runnable locatorInitSequence = () -> { @@ -291,7 +297,7 @@ class StreamEnvironment implements Environment { l, connectionNamingStrategy, clientFactory, - this.scheduledExecutorService, + this.locatorReconnectionScheduledExecutorService, this.recoveryBackOffDelayPolicy, l.label()); } @@ -338,7 +344,7 @@ private ShutdownListener shutdownListener( locator, connectionNamingStrategy, clientFactory, - this.scheduledExecutorService, + this.locatorReconnectionScheduledExecutorService, delayPolicy, label); } else { @@ -683,6 +689,9 @@ public void close() { if (privateScheduleExecutorService) { this.scheduledExecutorService.shutdownNow(); } + if (this.locatorReconnectionScheduledExecutorService != null) { + this.locatorReconnectionScheduledExecutorService.shutdownNow(); + } try { if (this.eventLoopGroup != null && (!this.eventLoopGroup.isShuttingDown() || !this.eventLoopGroup.isShutdown())) { @@ -904,9 +913,7 @@ TrackingConsumerRegistration registerTrackingConsumer( @Override public String toString() { return "{ \"locators\" : [" - + this.locators.stream() - .map(l -> quote(l.label())) - .collect(Collectors.joining(",")) + + this.locators.stream().map(l -> quote(l.label())).collect(Collectors.joining(",")) + "], " + Utils.jsonField("producer_client_count", this.producersCoordinator.clientCount()) + "," diff --git a/src/test/java/com/rabbitmq/stream/Cli.java b/src/test/java/com/rabbitmq/stream/Cli.java index e532cd508b..b02f9b8349 100644 --- a/src/test/java/com/rabbitmq/stream/Cli.java +++ b/src/test/java/com/rabbitmq/stream/Cli.java @@ -252,6 +252,15 @@ public static String rabbitmqctlCommand() { } } + private static String dockerContainer() { + if (rabbitmqctlCommand().startsWith("docker")) { + String rabbitmqCtl = System.getProperty("rabbitmqctl.bin"); + return rabbitmqCtl.split(":")[1]; + } else { + throw new IllegalStateException("Broker does not run on broker"); + } + } + private static String rabbitmqStreamsCommand() { String rabbitmqctl = rabbitmqctlCommand(); int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl"); @@ -354,6 +363,12 @@ public static void restartNode(String node) { executeCommand(dockerCommand + "rabbitmqctl status"); } + public static void restartBrokerContainer() { + String container = dockerContainer(); + executeCommand("docker stop " + container); + executeCommand("docker start " + container); + } + public static void rebalance() { rabbitmqQueues("rebalance all"); } diff --git a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java index f714e5ff6e..753d5d229d 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java @@ -72,11 +72,9 @@ public class RecoveryClusterTest { static List logLevels; static List> logClasses = List.of( -// ProducersCoordinator.class, -// ConsumersCoordinator.class, - AsyncRetry.class, - StreamEnvironment.class, - ScheduledExecutorServiceWrapper.class); + // ProducersCoordinator.class, + // ConsumersCoordinator.class, + AsyncRetry.class, StreamEnvironment.class, ScheduledExecutorServiceWrapper.class); ScheduledExecutorService scheduledExecutorService; @BeforeAll @@ -125,9 +123,9 @@ static void tearDownAll() { @ParameterizedTest @CsvSource({ -// "false,false", + "false,false", "true,true", -// "true,false", + "true,false", }) void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws InterruptedException { LOGGER.info( diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java index 3b147ed458..2e99dc438f 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java @@ -14,13 +14,9 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; +import static com.rabbitmq.stream.impl.TestUtils.*; import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed; import static com.rabbitmq.stream.impl.TestUtils.ExceptionConditions.responseCode; -import static com.rabbitmq.stream.impl.TestUtils.latchAssert; -import static com.rabbitmq.stream.impl.TestUtils.localhost; -import static com.rabbitmq.stream.impl.TestUtils.localhostTls; -import static com.rabbitmq.stream.impl.TestUtils.streamName; -import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; @@ -57,7 +53,6 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollSocketChannel; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.ssl.SslHandler; import java.net.ConnectException; import java.nio.charset.StandardCharsets; @@ -92,22 +87,11 @@ @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class StreamEnvironmentTest { - static EventLoopGroup eventLoopGroup; - EnvironmentBuilder environmentBuilder; String stream; TestUtils.ClientFactory cf; - - @BeforeAll - static void initAll() { - eventLoopGroup = new NioEventLoopGroup(); - } - - @AfterAll - static void afterAll() throws Exception { - eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS); - } + EventLoopGroup eventLoopGroup; @BeforeEach void init() { diff --git a/src/test/java/com/rabbitmq/stream/impl/TlsTest.java b/src/test/java/com/rabbitmq/stream/impl/TlsTest.java index 04964069d6..78e4b4e8de 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TlsTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/TlsTest.java @@ -56,8 +56,6 @@ import javax.net.ssl.SSLException; import javax.net.ssl.SSLHandshakeException; import javax.net.ssl.SSLParameters; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -65,26 +63,6 @@ @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class TlsTest { - static boolean isJava13() { - String javaVersion = System.getProperty("java.version"); - return javaVersion != null && javaVersion.startsWith("13."); - } - - @BeforeEach - public void init() { - if (isJava13()) { - // for Java 13.0.7, see https://github.com/bcgit/bc-java/issues/941 - System.setProperty("keystore.pkcs12.keyProtectionAlgorithm", "PBEWithHmacSHA256AndAES_256"); - } - } - - @AfterEach - public void tearDown() throws Exception { - if (isJava13()) { - System.setProperty("keystore.pkcs12.keyProtectionAlgorithm", ""); - } - } - String stream; TestUtils.ClientFactory cf;