Skip to content

Commit ad12fd8

Browse files
committed
Improve recovery
Extra debug logging, adapt recovery delay for cluster tests.
1 parent b8e6d81 commit ad12fd8

19 files changed

+546
-344
lines changed

ci/start-broker.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ wait_for_message() {
1414

1515
make -C "${PWD}"/tls-gen/basic
1616

17+
rm -rf rabbitmq-configuration
1718
mkdir -p rabbitmq-configuration/tls
1819
cp -R "${PWD}"/tls-gen/basic/result/* rabbitmq-configuration/tls
1920
chmod o+r rabbitmq-configuration/tls/*

pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
<maven.gpg.plugin.version>3.2.7</maven.gpg.plugin.version>
7979
<buildnumber.plugin.version>3.2.1</buildnumber.plugin.version>
8080
<maven-resources-plugin.version>3.3.1</maven-resources-plugin.version>
81+
<maven-clean-plugin.version>3.4.0</maven-clean-plugin.version>
8182
<maven-source-plugin.version>3.3.1</maven-source-plugin.version>
8283
<maven-javadoc-plugin.version>3.11.2</maven-javadoc-plugin.version>
8384
<maven.jar.plugin.version>3.4.2</maven.jar.plugin.version>
@@ -100,7 +101,6 @@
100101
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
101102
<maven.packagecloud.wagon.version>0.0.6</maven.packagecloud.wagon.version>
102103
<nexus-staging-maven-plugin.version>1.7.0</nexus-staging-maven-plugin.version>
103-
104104
<maven.javadoc.skip>true</maven.javadoc.skip>
105105
<gpg.skip>true</gpg.skip>
106106
</properties>
@@ -356,6 +356,12 @@
356356
</configuration>
357357
</plugin>
358358

359+
<plugin>
360+
<groupId>org.apache.maven.plugins</groupId>
361+
<artifactId>maven-clean-plugin</artifactId>
362+
<version>${maven-clean-plugin.version}</version>
363+
</plugin>
364+
359365
<plugin>
360366
<groupId>org.apache.maven.plugins</groupId>
361367
<artifactId>maven-dependency-plugin</artifactId>

src/main/java/com/rabbitmq/stream/impl/AsyncRetry.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -14,6 +14,7 @@
1414
// info@rabbitmq.com.
1515
package com.rabbitmq.stream.impl;
1616

17+
import static com.rabbitmq.stream.impl.ThreadUtils.isVirtual;
1718
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
1819

1920
import com.rabbitmq.stream.BackOffDelayPolicy;
@@ -53,11 +54,19 @@ private AsyncRetry(
5354
return;
5455
}
5556
try {
57+
LOGGER.debug(
58+
"Running task '{}' (virtual threads: {})",
59+
description,
60+
isVirtual(Thread.currentThread()));
5661
V result = task.call();
5762
LOGGER.debug("Task '{}' succeeded, completing future", description);
5863
completableFuture.complete(result);
5964
} catch (Exception e) {
6065
int attemptCount = attempts.getAndIncrement();
66+
LOGGER.debug(
67+
"Attempt {} for task '{}' failed, checking retry policy",
68+
attemptCount,
69+
description);
6170
if (retry.test(e)) {
6271
if (delayPolicy.delay(attemptCount).equals(BackOffDelayPolicy.TIMEOUT)) {
6372
LOGGER.debug(

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package com.rabbitmq.stream.impl;
1616

1717
import static com.rabbitmq.stream.Constants.*;
18+
import static com.rabbitmq.stream.impl.ThreadUtils.threadFactory;
1819
import static com.rabbitmq.stream.impl.Utils.DEFAULT_USERNAME;
1920
import static com.rabbitmq.stream.impl.Utils.encodeRequestCode;
2021
import static com.rabbitmq.stream.impl.Utils.encodeResponseCode;
@@ -45,7 +46,6 @@
4546
import com.rabbitmq.stream.impl.Client.ShutdownContext.ShutdownReason;
4647
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandler;
4748
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo;
48-
import com.rabbitmq.stream.impl.Utils.NamedThreadFactory;
4949
import com.rabbitmq.stream.metrics.MetricsCollector;
5050
import com.rabbitmq.stream.metrics.NoOpMetricsCollector;
5151
import com.rabbitmq.stream.sasl.CredentialsProvider;
@@ -164,7 +164,7 @@ public class Client implements AutoCloseable {
164164
final CompressionCodecFactory compressionCodecFactory;
165165
private final Consumer<ShutdownContext.ShutdownReason> shutdownListenerCallback;
166166
private final ToLongFunction<Object> publishSequenceFunction =
167-
new ToLongFunction<Object>() {
167+
new ToLongFunction<>() {
168168
private final AtomicLong publishSequence = new AtomicLong(0);
169169

170170
@Override
@@ -302,6 +302,7 @@ public void initChannel(SocketChannel ch) {
302302
}
303303
});
304304

305+
this.nettyClosing = Utils.makeIdempotent(this::closeNetty);
305306
ChannelFuture f;
306307
String clientConnectionName = parameters.clientProperties.getOrDefault("connection_name", "");
307308
try {
@@ -326,13 +327,11 @@ public void initChannel(SocketChannel ch) {
326327
throw new StreamException(message, e);
327328
}
328329
}
329-
330330
this.channel = f.channel();
331-
this.nettyClosing = Utils.makeIdempotent(this::closeNetty);
332331
ExecutorServiceFactory executorServiceFactory = parameters.executorServiceFactory;
333332
if (executorServiceFactory == null) {
334333
this.executorService =
335-
Executors.newSingleThreadExecutor(new NamedThreadFactory(clientConnectionName + "-"));
334+
Executors.newSingleThreadExecutor(threadFactory(clientConnectionName + "-"));
336335
} else {
337336
this.executorService = executorServiceFactory.get();
338337
}
@@ -341,7 +340,7 @@ public void initChannel(SocketChannel ch) {
341340
if (dispatchingExecutorServiceFactory == null) {
342341
this.dispatchingExecutorService =
343342
Executors.newSingleThreadExecutor(
344-
new NamedThreadFactory("dispatching-" + clientConnectionName + "-"));
343+
threadFactory("dispatching-" + clientConnectionName + "-"));
345344
} else {
346345
this.dispatchingExecutorService = dispatchingExecutorServiceFactory.get();
347346
}
@@ -1443,7 +1442,7 @@ void closingSequence(ShutdownContext.ShutdownReason reason) {
14431442

14441443
private void closeNetty() {
14451444
try {
1446-
if (this.channel.isOpen()) {
1445+
if (this.channel != null && this.channel.isOpen()) {
14471446
LOGGER.debug("Closing Netty channel");
14481447
this.channel.close().get(10, TimeUnit.SECONDS);
14491448
}

0 commit comments

Comments
 (0)