diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc
index f7edd8fffd..30af7553cd 100644
--- a/src/docs/asciidoc/api.adoc
+++ b/src/docs/asciidoc/api.adoc
@@ -220,6 +220,11 @@ The client retries 5 times before falling back to the stream leader node.
Set to `true` only for clustered environments, not for 1-node environments, where only the stream leader is available.
|`false`
+|`forceLeaderForProducers`
+|Force connecting to a stream leader for producers.
+Set to `false` if it acceptable to stay connected to a stream replica when a load balancer is in use.
+|`true`
+
|`id`
|Informational ID for the environment instance.
Used as a prefix for connection names.
diff --git a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
index 57fe269ea5..64010d4d51 100644
--- a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -354,7 +354,7 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy(
*
Do not set this flag to true
when streams have only 1 member (the leader),
* e.g. for local development.
*
- *
Default is false.
+ *
Default is false
.
*
* @param forceReplica whether to force the connection to a replica or not
* @return this builder instance
@@ -364,6 +364,37 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy(
*/
EnvironmentBuilder forceReplicaForConsumers(boolean forceReplica);
+ /**
+ * Flag to force the connection to the stream leader for producers.
+ *
+ *
The library prefers to connect to a node that hosts a stream leader for producers (default
+ * behavior).
+ *
+ *
When using a load balancer, the library does not know in advance the node it connects to. It
+ * may have to retry to connect to the appropriate node.
+ *
+ *
It will retry until it connects to the appropriate node (flag set to true
, the
+ * default). This provides the best data locality, but may require several attempts, delaying the
+ * creation or the recovery of producers. This usually suits high-throughput use cases.
+ *
+ *
The library will accept the connection to a stream replica if the flag is set to false
+ *
. This will speed up the creation/recovery of producers, but at the cost of network hops
+ * between cluster nodes when publishing messages because only a stream leader accepts writes.
+ * This is usually acceptable for low-throughput use cases.
+ *
+ *
Changing the default value should only benefit systems where a load balancer sits between
+ * the client applications and the cluster nodes.
+ *
+ *
Default is true
.
+ *
+ * @param forceLeader whether to force the connection to the leader or not
+ * @return this builder instance
+ * @see #recoveryBackOffDelayPolicy(BackOffDelayPolicy)
+ * @see #topologyUpdateBackOffDelayPolicy(BackOffDelayPolicy)
+ * @since 0.21.0
+ */
+ EnvironmentBuilder forceLeaderForProducers(boolean forceLeader);
+
/**
* Create the {@link Environment} instance.
*
diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java
index 5a169b2044..011b784d4d 100644
--- a/src/main/java/com/rabbitmq/stream/impl/Client.java
+++ b/src/main/java/com/rabbitmq/stream/impl/Client.java
@@ -2245,7 +2245,10 @@ public StreamMetadata(String stream, short responseCode, Broker leader, List getReplicas() {
- return replicas;
+ return this.replicas.isEmpty() ? Collections.emptyList() : new ArrayList<>(this.replicas);
+ }
+
+ boolean hasReplicas() {
+ return !this.replicas.isEmpty();
}
public String getStream() {
diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java
index 14a1f3adc0..0cd9038209 100644
--- a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java
@@ -15,6 +15,7 @@
package com.rabbitmq.stream.impl;
import static com.rabbitmq.stream.impl.Utils.*;
+import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import com.rabbitmq.stream.BackOffDelayPolicy;
@@ -49,7 +50,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class ProducersCoordinator {
+final class ProducersCoordinator implements AutoCloseable {
static final int MAX_PRODUCERS_PER_CLIENT = 256;
static final int MAX_TRACKING_CONSUMERS_PER_CLIENT = 50;
@@ -67,18 +68,21 @@ class ProducersCoordinator {
new DefaultExecutorServiceFactory(
Runtime.getRuntime().availableProcessors(), 10, "rabbitmq-stream-producer-connection-");
private final Lock coordinatorLock = new ReentrantLock();
+ private final boolean forceLeader;
ProducersCoordinator(
StreamEnvironment environment,
int maxProducersByClient,
int maxTrackingConsumersByClient,
Function connectionNamingStrategy,
- ClientFactory clientFactory) {
+ ClientFactory clientFactory,
+ boolean forceLeader) {
this.environment = environment;
this.clientFactory = clientFactory;
this.maxProducersByClient = maxProducersByClient;
this.maxTrackingConsumersByClient = maxTrackingConsumersByClient;
this.connectionNamingStrategy = connectionNamingStrategy;
+ this.forceLeader = forceLeader;
}
Runnable registerProducer(StreamProducer producer, String reference, String stream) {
@@ -105,9 +109,10 @@ Runnable registerTrackingConsumer(StreamConsumer consumer) {
}
private Runnable registerAgentTracker(AgentTracker tracker, String stream) {
- Client.Broker broker = getBrokerForProducer(stream);
+ List candidates = findCandidateNodes(stream, this.forceLeader);
+ Broker broker = pickBroker(candidates);
- addToManager(broker, tracker);
+ addToManager(broker, candidates, tracker);
if (DEBUG) {
return () -> {
@@ -125,7 +130,7 @@ private Runnable registerAgentTracker(AgentTracker tracker, String stream) {
}
}
- private void addToManager(Broker node, AgentTracker tracker) {
+ private void addToManager(Broker node, List candidates, AgentTracker tracker) {
ClientParameters clientParameters =
environment
.clientParametersCopy()
@@ -153,7 +158,8 @@ private void addToManager(Broker node, AgentTracker tracker) {
if (pickedManager == null) {
String name = keyForNode(node);
LOGGER.debug("Trying to create producer manager on {}", name);
- pickedManager = new ClientProducersManager(node, this.clientFactory, clientParameters);
+ pickedManager =
+ new ClientProducersManager(node, candidates, this.clientFactory, clientParameters);
LOGGER.debug("Created producer manager on {}, id {}", name, pickedManager.id);
}
try {
@@ -192,11 +198,12 @@ private void addToManager(Broker node, AgentTracker tracker) {
}
}
- private Client.Broker getBrokerForProducer(String stream) {
+ // package protected for testing
+ List findCandidateNodes(String stream, boolean forceLeader) {
Map metadata =
this.environment.locatorOperation(
namedFunction(c -> c.metadata(stream), "Candidate lookup to publish to '%s'", stream));
- if (metadata.size() == 0 || metadata.get(stream) == null) {
+ if (metadata.isEmpty() || metadata.get(stream) == null) {
throw new StreamDoesNotExistException(stream);
}
@@ -210,17 +217,34 @@ private Client.Broker getBrokerForProducer(String stream) {
}
}
+ List candidates = new ArrayList<>();
Client.Broker leader = streamMetadata.getLeader();
- if (leader == null) {
+ if (leader == null && forceLeader) {
throw new IllegalStateException("Not leader available for stream " + stream);
}
- LOGGER.debug(
- "Using client on {}:{} to publish to {}", leader.getHost(), leader.getPort(), stream);
+ candidates.add(new BrokerWrapper(leader, true));
- return leader;
+ if (!forceLeader && streamMetadata.hasReplicas()) {
+ candidates.addAll(
+ streamMetadata.getReplicas().stream()
+ .map(b -> new BrokerWrapper(b, false))
+ .collect(toList()));
+ }
+
+ LOGGER.debug("Candidates to publish to {}: {}", stream, candidates);
+
+ return Collections.unmodifiableList(candidates);
+ }
+
+ static Broker pickBroker(List candidates) {
+ return candidates.stream()
+ .filter(BrokerWrapper::isLeader)
+ .findFirst()
+ .map(BrokerWrapper::broker)
+ .orElseThrow(() -> new IllegalStateException("Not leader available"));
}
- void close() {
+ public void close() {
Iterator iterator = this.managers.iterator();
while (iterator.hasNext()) {
ClientProducersManager manager = iterator.next();
@@ -568,7 +592,10 @@ private class ClientProducersManager implements Comparable candidates,
+ ClientFactory cf,
+ Client.ClientParameters clientParameters) {
this.id = managerIdSequence.getAndIncrement();
AtomicReference nameReference = new AtomicReference<>();
AtomicReference ref = new AtomicReference<>();
@@ -682,7 +709,7 @@ private ClientProducersManager(
.metadataListener(metadataListener)
.clientProperty("connection_name", connectionName),
keyForNode(targetNode),
- Collections.emptyList());
+ candidates.stream().map(BrokerWrapper::broker).collect(toList()));
this.client = cf.client(connectionFactoryContext);
this.node = Utils.brokerFromClient(this.client);
this.name = keyForNode(this.node);
@@ -694,18 +721,19 @@ private ClientProducersManager(
private void assignProducersToNewManagers(
Collection trackers, String stream, BackOffDelayPolicy delayPolicy) {
- AsyncRetry.asyncRetry(() -> getBrokerForProducer(stream))
+ AsyncRetry.asyncRetry(() -> findCandidateNodes(stream, forceLeader))
.description("Candidate lookup to publish to " + stream)
.scheduler(environment.scheduledExecutorService())
.retry(ex -> !(ex instanceof StreamDoesNotExistException))
.delayPolicy(delayPolicy)
.build()
.thenAccept(
- broker -> {
+ candidates -> {
+ Broker broker = pickBroker(candidates);
String key = keyForNode(broker);
LOGGER.debug(
"Assigning {} producer(s) and consumer tracker(s) to {}", trackers.size(), key);
- trackers.forEach(tracker -> maybeRecoverAgent(broker, tracker));
+ trackers.forEach(tracker -> maybeRecoverAgent(broker, candidates, tracker));
})
.exceptionally(
ex -> {
@@ -730,10 +758,11 @@ private void assignProducersToNewManagers(
});
}
- private void maybeRecoverAgent(Broker broker, AgentTracker tracker) {
+ private void maybeRecoverAgent(
+ Broker broker, List candidates, AgentTracker tracker) {
if (tracker.markRecoveryInProgress()) {
try {
- recoverAgent(broker, tracker);
+ recoverAgent(broker, candidates, tracker);
} catch (Exception e) {
LOGGER.warn(
"Error while recovering {} tracker {} (stream '{}'). Reason: {}",
@@ -750,14 +779,14 @@ private void maybeRecoverAgent(Broker broker, AgentTracker tracker) {
}
}
- private void recoverAgent(Broker node, AgentTracker tracker) {
+ private void recoverAgent(Broker node, List candidates, AgentTracker tracker) {
boolean reassignmentCompleted = false;
while (!reassignmentCompleted) {
try {
if (tracker.isOpen()) {
LOGGER.debug(
"Using {} to resume {} to {}", node.label(), tracker.type(), tracker.stream());
- addToManager(node, tracker);
+ addToManager(node, candidates, tracker);
tracker.running();
} else {
LOGGER.debug(
@@ -776,14 +805,15 @@ private void recoverAgent(Broker node, AgentTracker tracker) {
tracker.identifiable() ? tracker.id() : "N/A",
tracker.stream());
// maybe not a good candidate, let's refresh and retry for this one
- node =
+ candidates =
Utils.callAndMaybeRetry(
- () -> getBrokerForProducer(tracker.stream()),
+ () -> findCandidateNodes(tracker.stream(), forceLeader),
ex -> !(ex instanceof StreamDoesNotExistException),
environment.recoveryBackOffDelayPolicy(),
"Candidate lookup for %s on stream '%s'",
tracker.type(),
tracker.stream());
+ node = pickBroker(candidates);
} catch (Exception e) {
LOGGER.warn(
"Error while re-assigning {} (stream '{}')", tracker.type(), tracker.stream(), e);
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
index 3b66e0d3ca..3f7642cbec 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
@@ -102,7 +102,10 @@ class StreamEnvironment implements Environment {
Function connectionNamingStrategy,
Function clientFactory,
ObservationCollector> observationCollector,
- boolean forceReplicaForConsumers) {
+ boolean forceReplicaForConsumers,
+ boolean forceLeaderForProducers,
+ Duration producerNodeRetryDelay,
+ Duration consumerNodeRetryDelay) {
this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
this.byteBufAllocator = byteBufAllocator;
@@ -212,13 +215,14 @@ class StreamEnvironment implements Environment {
maxProducersByConnection,
maxTrackingConsumersByConnection,
connectionNamingStrategy,
- Utils.coordinatorClientFactory(this));
+ Utils.coordinatorClientFactory(this, producerNodeRetryDelay),
+ forceLeaderForProducers);
this.consumersCoordinator =
new ConsumersCoordinator(
this,
maxConsumersByConnection,
connectionNamingStrategy,
- Utils.coordinatorClientFactory(this),
+ Utils.coordinatorClientFactory(this, consumerNodeRetryDelay),
forceReplicaForConsumers,
Utils.brokerPicker());
this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
index 3fc919aaba..0ec0a9dca2 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -65,8 +65,11 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
private CompressionCodecFactory compressionCodecFactory;
private boolean lazyInit = false;
private boolean forceReplicaForConsumers = false;
+ private boolean forceLeaderForProducers = true;
private Function clientFactory = Client::new;
private ObservationCollector> observationCollector = ObservationCollector.NO_OP;
+ private Duration producerNodeRetryDelay = Duration.ofMillis(500);
+ private Duration consumerNodeRetryDelay = Duration.ofMillis(1000);
public StreamEnvironmentBuilder() {}
@@ -274,6 +277,12 @@ public EnvironmentBuilder forceReplicaForConsumers(boolean forceReplica) {
return this;
}
+ @Override
+ public EnvironmentBuilder forceLeaderForProducers(boolean forceLeader) {
+ this.forceLeaderForProducers = forceLeader;
+ return this;
+ }
+
@Override
public TlsConfiguration tls() {
this.tls.enable();
@@ -296,6 +305,16 @@ public EnvironmentBuilder observationCollector(ObservationCollector> observati
return this;
}
+ StreamEnvironmentBuilder producerNodeRetryDelay(Duration producerNodeRetryDelay) {
+ this.producerNodeRetryDelay = producerNodeRetryDelay;
+ return this;
+ }
+
+ StreamEnvironmentBuilder consumerNodeRetryDelay(Duration consumerNodeRetryDelay) {
+ this.consumerNodeRetryDelay = consumerNodeRetryDelay;
+ return this;
+ }
+
@Override
public Environment build() {
if (this.compressionCodecFactory == null) {
@@ -327,7 +346,10 @@ public Environment build() {
connectionNamingStrategy,
this.clientFactory,
this.observationCollector,
- this.forceReplicaForConsumers);
+ this.forceReplicaForConsumers,
+ this.forceLeaderForProducers,
+ this.producerNodeRetryDelay,
+ this.consumerNodeRetryDelay);
}
static final class DefaultTlsConfiguration implements TlsConfiguration {
diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java
index 558d2a347b..f63ac5792b 100644
--- a/src/main/java/com/rabbitmq/stream/impl/Utils.java
+++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java
@@ -135,10 +135,6 @@ static short encodeResponseCode(Short code) {
return (short) (code | 0B1000_0000_0000_0000);
}
- static ClientFactory coordinatorClientFactory(StreamEnvironment environment) {
- return coordinatorClientFactory(environment, ConditionalClientFactory.RETRY_INTERVAL);
- }
-
static ClientFactory coordinatorClientFactory(
StreamEnvironment environment, Duration retryInterval) {
String messageFormat =
diff --git a/src/test/java/com/rabbitmq/stream/impl/LoadBalancerClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/LoadBalancerClusterTest.java
index 727ccc58e4..9d63f42fdc 100644
--- a/src/test/java/com/rabbitmq/stream/impl/LoadBalancerClusterTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/LoadBalancerClusterTest.java
@@ -14,26 +14,28 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
+import static com.rabbitmq.stream.impl.Assertions.assertThat;
import static java.lang.Integer.parseInt;
import static java.util.stream.Collectors.toSet;
+import static java.util.stream.IntStream.range;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
-import com.rabbitmq.stream.Address;
-import com.rabbitmq.stream.ConsumerFlowStrategy;
-import com.rabbitmq.stream.OffsetSpecification;
-import com.rabbitmq.stream.SubscriptionListener;
+import com.rabbitmq.stream.*;
+import com.rabbitmq.stream.impl.Client.Broker;
import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumerCoordinatorInfo;
+import com.rabbitmq.stream.impl.MonitoringTestUtils.EnvironmentInfo;
+import com.rabbitmq.stream.impl.MonitoringTestUtils.ProducersCoordinatorInfo;
import com.rabbitmq.stream.impl.TestUtils.DisabledIfNotCluster;
import io.netty.channel.EventLoopGroup;
import java.time.Duration;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.stream.IntStream;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
@@ -49,11 +51,13 @@ public class LoadBalancerClusterTest {
@Mock StreamEnvironment environment;
@Mock StreamConsumer consumer;
+ @Mock StreamProducer producer;
AutoCloseable mocks;
TestUtils.ClientFactory cf;
String stream;
EventLoopGroup eventLoopGroup;
Client locator;
+ Address loadBalancerAddress = new Address("localhost", LB_PORT);
@BeforeEach
void init() {
@@ -62,7 +66,6 @@ void init() {
when(environment.locator()).thenReturn(locator);
when(environment.clientParametersCopy())
.thenReturn(new Client.ClientParameters().eventLoopGroup(eventLoopGroup).port(LB_PORT));
- Address loadBalancerAddress = new Address("localhost", LB_PORT);
when(environment.addressResolver()).thenReturn(address -> loadBalancerAddress);
when(environment.locatorOperation(any())).thenCallRealMethod();
}
@@ -86,7 +89,7 @@ void pickConsumersAmongCandidates(boolean forceReplica) {
forceReplica,
Utils.brokerPicker())) {
- IntStream.range(0, subscriptionCount)
+ range(0, subscriptionCount)
.forEach(
ignored -> {
c.subscribe(
@@ -102,22 +105,139 @@ void pickConsumersAmongCandidates(boolean forceReplica) {
});
Client.StreamMetadata metadata = locator.metadata(stream).get(stream);
- Set allowedNodes = new HashSet<>(metadata.getReplicas());
+ Set allowedNodes = new HashSet<>(metadata.getReplicas());
if (!forceReplica) {
allowedNodes.add(metadata.getLeader());
}
ConsumerCoordinatorInfo info = MonitoringTestUtils.extract(c);
assertThat(info.consumerCount()).isEqualTo(subscriptionCount);
- Set usedNodes =
+ Set usedNodes =
info.clients().stream()
.map(m -> m.node().split(":"))
- .map(np -> new Client.Broker(np[0], parseInt(np[1])))
+ .map(np -> new Broker(np[0], parseInt(np[1])))
.collect(toSet());
assertThat(usedNodes).hasSameSizeAs(allowedNodes).containsAll(allowedNodes);
}
}
+ @Test
+ void pickProducersAmongCandidatesIfInstructed() {
+ boolean forceLeader = true;
+ when(consumer.stream()).thenReturn(stream);
+ int maxAgentPerClient = 2;
+ int agentCount = maxAgentPerClient * 10;
+ try (ProducersCoordinator c =
+ new ProducersCoordinator(
+ environment,
+ maxAgentPerClient,
+ maxAgentPerClient,
+ type -> "producer-connection",
+ Utils.coordinatorClientFactory(this.environment, Duration.ofMillis(10)),
+ forceLeader)) {
+
+ range(0, agentCount)
+ .forEach(
+ ignored -> {
+ c.registerProducer(producer, null, stream);
+ c.registerTrackingConsumer(consumer);
+ });
+
+ Client.StreamMetadata metadata = locator.metadata(stream).get(stream);
+ Set allowedNodes = new HashSet<>(Collections.singleton(metadata.getLeader()));
+ if (!forceLeader) {
+ allowedNodes.addAll(metadata.getReplicas());
+ }
+
+ ProducersCoordinatorInfo info = MonitoringTestUtils.extract(c);
+ assertThat(info.producerCount()).isEqualTo(agentCount);
+ assertThat(info.trackingConsumerCount()).isEqualTo(agentCount);
+ Set usedNodes =
+ info.nodesConnected().stream()
+ .map(n -> n.split(":"))
+ .map(np -> new Broker(np[0], parseInt(np[1])))
+ .collect(toSet());
+ assertThat(usedNodes).hasSameSizeAs(allowedNodes).containsAll(allowedNodes);
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void producersConsumersShouldSpreadAccordingToDataLocalitySettings(boolean forceLocality) {
+ int maxPerConnection = 2;
+ int agentCount = maxPerConnection * 20;
+ StreamEnvironmentBuilder builder = (StreamEnvironmentBuilder) Environment.builder();
+ builder
+ .producerNodeRetryDelay(Duration.ofMillis(10))
+ .consumerNodeRetryDelay(Duration.ofMillis(10));
+ try (Environment env =
+ builder
+ .port(LB_PORT)
+ .forceReplicaForConsumers(forceLocality)
+ .forceReplicaForConsumers(forceLocality)
+ .addressResolver(addr -> loadBalancerAddress)
+ .maxProducersByConnection(maxPerConnection)
+ .maxConsumersByConnection(maxPerConnection)
+ .forceLeaderForProducers(forceLocality)
+ .netty()
+ .eventLoopGroup(eventLoopGroup)
+ .environmentBuilder()
+ .build()) {
+ TestUtils.Sync consumeSync = TestUtils.sync(agentCount * agentCount);
+ Set consumersThatReceived = ConcurrentHashMap.newKeySet(agentCount);
+ List producers = new ArrayList<>();
+ range(0, agentCount)
+ .forEach(
+ index -> {
+ producers.add(env.producerBuilder().stream(stream).build());
+ env.consumerBuilder().stream(stream)
+ .messageHandler(
+ (ctx, msg) -> {
+ consumersThatReceived.add(index);
+ consumeSync.down();
+ })
+ .offset(OffsetSpecification.first())
+ .build();
+ });
+ producers.forEach(p -> p.send(p.messageBuilder().build(), ctx -> {}));
+ assertThat(consumeSync).completes();
+ assertThat(consumersThatReceived).containsAll(range(0, agentCount).boxed().collect(toSet()));
+
+ EnvironmentInfo info = MonitoringTestUtils.extract(env);
+ ProducersCoordinatorInfo producerInfo = info.getProducers();
+ ConsumerCoordinatorInfo consumerInfo = info.getConsumers();
+
+ assertThat(producerInfo.producerCount()).isEqualTo(agentCount);
+ assertThat(consumerInfo.consumerCount()).isEqualTo(agentCount);
+
+ Client.StreamMetadata metadata = locator.metadata(stream).get(stream);
+
+ Function, Set> toBrokers =
+ nodes ->
+ nodes.stream()
+ .map(n -> n.split(":"))
+ .map(n -> new Broker(n[0], parseInt(n[1])))
+ .collect(toSet());
+ Set usedNodes = toBrokers.apply(producerInfo.nodesConnected());
+ assertThat(usedNodes).contains(metadata.getLeader());
+ if (forceLocality) {
+ assertThat(usedNodes).hasSize(1);
+ } else {
+ assertThat(usedNodes).hasSize(metadata.getReplicas().size() + 1);
+ assertThat(usedNodes).containsAll(metadata.getReplicas());
+ }
+
+ usedNodes = toBrokers.apply(consumerInfo.nodesConnected());
+ assertThat(usedNodes).containsAll(metadata.getReplicas());
+ if (forceLocality) {
+ assertThat(usedNodes).hasSameSizeAs(metadata.getReplicas());
+ } else {
+ assertThat(usedNodes).hasSize(metadata.getReplicas().size() + 1);
+ assertThat(usedNodes).contains(metadata.getLeader());
+ }
+ }
+ }
+
private static ConsumerFlowStrategy flowStrategy() {
return ConsumerFlowStrategy.creditOnChunkArrival(10);
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java
index dc89e5ec4c..176f5dbb13 100644
--- a/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java
@@ -19,6 +19,7 @@
import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed;
import static com.rabbitmq.stream.impl.TestUtils.answer;
import static com.rabbitmq.stream.impl.TestUtils.metadata;
+import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
@@ -79,6 +80,10 @@ static Client.Broker leader() {
return new Client.Broker("leader", 5552);
}
+ static Utils.BrokerWrapper leaderWrapper() {
+ return new Utils.BrokerWrapper(leader(), true);
+ }
+
static Client.Broker leader1() {
return new Client.Broker("leader-1", 5552);
}
@@ -91,6 +96,10 @@ static List replicas() {
return Arrays.asList(new Client.Broker("replica1", 5552), new Client.Broker("replica2", 5552));
}
+ static List replicaWrappers() {
+ return replicas().stream().map(b -> new Utils.BrokerWrapper(b, false)).collect(toList());
+ }
+
@BeforeEach
void init() {
Client.ClientParameters clientParameters =
@@ -125,7 +134,8 @@ public Client.ClientParameters metadataListener(
ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT,
ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT,
type -> "producer-connection",
- clientFactory);
+ clientFactory,
+ true);
when(client.isOpen()).thenReturn(true);
when(client.deletePublisher(anyByte())).thenReturn(new Response(Constants.RESPONSE_CODE_OK));
}
@@ -194,7 +204,8 @@ void registerShouldAllowPublishing() {
ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT,
ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT,
type -> "producer-connection",
- cf);
+ cf,
+ true);
when(locator.metadata("stream")).thenReturn(metadata(leader(), replicas()));
when(clientFactory.client(any())).thenReturn(client);
@@ -221,7 +232,8 @@ void shouldGetExactNodeImmediatelyWithAdvertisedHostNameClientFactoryAndExactNod
ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT,
ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT,
type -> "producer-connection",
- cf);
+ cf,
+ true);
when(locator.metadata("stream")).thenReturn(metadata(leader(), replicas()));
when(clientFactory.client(any())).thenReturn(client);
@@ -563,7 +575,8 @@ void growShrinkResourcesBasedOnProducersAndTrackingConsumersCount(int maxProduce
maxProducersByClient,
ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT,
type -> "producer-connection",
- clientFactory);
+ clientFactory,
+ true);
class ProducerInfo {
StreamProducer producer;
@@ -721,6 +734,28 @@ void pickSlotTest() {
assertThat(pickSlot(map, "257", sequence)).isEqualTo(5);
}
+ @Test
+ void findCandidateNodesShouldReturnOnlyLeaderWhenForceLeaderIsTrue() {
+ when(locator.metadata("stream")).thenReturn(metadata(leader(), replicas()));
+ assertThat(coordinator.findCandidateNodes("stream", true)).containsOnly(leaderWrapper());
+ }
+
+ @Test
+ void findCandidateNodesShouldReturnLeaderAndReplicasWhenForceLeaderIsFalse() {
+ when(locator.metadata("stream")).thenReturn(metadata(leader(), replicas()));
+ assertThat(coordinator.findCandidateNodes("stream", false))
+ .hasSize(3)
+ .contains(leaderWrapper())
+ .containsAll(replicaWrappers());
+ }
+
+ @Test
+ void findCandidateNodesShouldThrowIfThereIsNoLeader() {
+ when(locator.metadata("stream")).thenReturn(metadata(null, replicas()));
+ assertThatThrownBy(() -> coordinator.findCandidateNodes("stream", true))
+ .isInstanceOf(IllegalStateException.class);
+ }
+
private static ScheduledExecutorService createScheduledExecutorService() {
return new ScheduledExecutorServiceWrapper(Executors.newSingleThreadScheduledExecutor());
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java
index c422d454a3..5724e0f6f9 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -98,7 +98,10 @@ Client.ClientParameters duplicate() {
type -> "locator-connection",
cf,
ObservationCollector.NO_OP,
- false);
+ false,
+ true,
+ Duration.ofMillis(100),
+ Duration.ofMillis(100));
}
@AfterEach
@@ -163,7 +166,10 @@ void shouldTryUrisOnInitializationFailure() throws Exception {
type -> "locator-connection",
cf,
ObservationCollector.NO_OP,
- false);
+ false,
+ true,
+ Duration.ofMillis(100),
+ Duration.ofMillis(100));
verify(cf, times(3)).apply(any(Client.ClientParameters.class));
}
@@ -191,7 +197,10 @@ void shouldNotOpenConnectionWhenLazyInitIsEnabled(
type -> "locator-connection",
cf,
ObservationCollector.NO_OP,
- false);
+ false,
+ true,
+ Duration.ofMillis(100),
+ Duration.ofMillis(100));
verify(cf, times(expectedConnectionCreation)).apply(any(Client.ClientParameters.class));
}