diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 37c3967fa9..bb8b5bc635 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -303,12 +303,7 @@ public void initChannel(SocketChannel ch) { }); ChannelFuture f; - String clientConnectionName = - parameters.clientProperties == null - ? "" - : (parameters.clientProperties.containsKey("connection_name") - ? parameters.clientProperties.get("connection_name") - : ""); + String clientConnectionName = parameters.clientProperties.getOrDefault("connection_name", ""); try { LOGGER.debug( "Trying to create stream connection to {}:{}, with client connection name '{}'", @@ -1505,6 +1500,10 @@ String connectionName() { return builder.append(serverAddress()).toString(); } + String clientConnectionName() { + return this.clientConnectionName; + } + private String serverAddress() { SocketAddress remoteAddress = remoteAddress(); if (remoteAddress instanceof InetSocketAddress) { diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java index d252c777a5..b62dad93d6 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java @@ -481,6 +481,7 @@ public void close() { } void closeFromEnvironment() { + this.maybeNotifyActiveToInactiveSac(); LOGGER.debug("Calling consumer {} closing callback (stream {})", this.id, this.stream); this.closingCallback.run(); closed.set(true); @@ -490,6 +491,7 @@ void closeFromEnvironment() { void closeAfterStreamDeletion() { if (closed.compareAndSet(false, true)) { + this.maybeNotifyActiveToInactiveSac(); this.environment.removeConsumer(this); this.status = Status.CLOSED; } @@ -506,11 +508,23 @@ void setTrackingClient(Client client) { void setSubscriptionClient(Client client) { this.subscriptionClient = client; if (client == null && this.isSac()) { + maybeNotifyActiveToInactiveSac(); // we lost the connection this.sacActive = false; } } + private void maybeNotifyActiveToInactiveSac() { + if (this.isSac() && this.sacActive) { + LOGGER.debug( + "Single active consumer {} from stream {} with name {} is unavailable, calling consumer update listener", + this.id, + this.stream, + this.name); + this.consumerUpdate(false); + } + } + synchronized void unavailable() { this.status = Status.NOT_AVAILABLE; this.trackingClient = null; @@ -623,4 +637,13 @@ private void checkNotClosed() { long id() { return this.id; } + + String subscriptionConnectionName() { + Client client = this.subscriptionClient; + if (client == null) { + return ""; + } else { + return client.clientConnectionName(); + } + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java index ce417b6b91..dac6a2e9eb 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java @@ -187,6 +187,10 @@ public void store(long offset) { "Consumer#store(long) does not work for super streams, use MessageHandler.Context#storeOffset() instead"); } + Consumer consumer(String partition) { + return this.consumers.get(partition); + } + @Override public long storedOffset() { throw new UnsupportedOperationException( diff --git a/src/test/java/com/rabbitmq/stream/Host.java b/src/test/java/com/rabbitmq/stream/Host.java index 28b63de828..f1267075ed 100644 --- a/src/test/java/com/rabbitmq/stream/Host.java +++ b/src/test/java/com/rabbitmq/stream/Host.java @@ -47,30 +47,34 @@ public static String capture(InputStream is) throws IOException { return buff.toString(); } - private static Process executeCommand(String command) throws IOException { + private static Process executeCommand(String command) { return executeCommand(command, false); } - private static Process executeCommand(String command, boolean ignoreError) throws IOException { - Process pr = executeCommandProcess(command); - - int ev = waitForExitValue(pr); - if (ev != 0 && !ignoreError) { - String stdout = capture(pr.getInputStream()); - String stderr = capture(pr.getErrorStream()); - throw new IOException( - "unexpected command exit value: " - + ev - + "\ncommand: " - + command - + "\n" - + "\nstdout:\n" - + stdout - + "\nstderr:\n" - + stderr - + "\n"); + private static Process executeCommand(String command, boolean ignoreError) { + try { + Process pr = executeCommandProcess(command); + + int ev = waitForExitValue(pr); + if (ev != 0 && !ignoreError) { + String stdout = capture(pr.getInputStream()); + String stderr = capture(pr.getErrorStream()); + throw new IOException( + "unexpected command exit value: " + + ev + + "\ncommand: " + + command + + "\n" + + "\nstdout:\n" + + stdout + + "\nstderr:\n" + + stderr + + "\n"); + } + return pr; + } catch (IOException e) { + throw new RuntimeException(e); } - return pr; } public static String hostname() throws IOException { @@ -110,6 +114,10 @@ public static Process rabbitmqctl(String command) throws IOException { return executeCommand(rabbitmqctlCommand() + " " + command); } + static Process rabbitmqStreams(String command) { + return executeCommand(rabbitmqStreamsCommand() + " " + command); + } + public static Process rabbitmqctlIgnoreError(String command) throws IOException { return executeCommand(rabbitmqctlCommand() + " " + command, true); } @@ -189,11 +197,19 @@ static List toConnectionInfoList(String json) { return GSON.fromJson(json, new TypeToken>() {}.getType()); } - public static Process killStreamLeaderProcess(String stream) throws IOException { - return rabbitmqctl( - "eval 'case rabbit_stream_manager:lookup_leader(<<\"/\">>, <<\"" - + stream - + "\">>) of {ok, Pid} -> exit(Pid, kill); Pid -> exit(Pid, kill) end.'"); + public static void restartStream(String stream) { + rabbitmqStreams(" restart_stream " + stream); + } + + public static Process killStreamLeaderProcess(String stream) { + try { + return rabbitmqctl( + "eval 'case rabbit_stream_manager:lookup_leader(<<\"/\">>, <<\"" + + stream + + "\">>) of {ok, Pid} -> exit(Pid, kill); Pid -> exit(Pid, kill) end.'"); + } catch (IOException e) { + throw new RuntimeException(e); + } } public static void addUser(String username, String password) throws IOException { @@ -243,7 +259,7 @@ public static void setEnv(String parameter, String value) throws IOException { public static String rabbitmqctlCommand() { String rabbitmqCtl = System.getProperty("rabbitmqctl.bin"); if (rabbitmqCtl == null) { - throw new IllegalStateException("Please define the rabbitmqctl.bin system property"); + rabbitmqCtl = DOCKER_PREFIX + "rabbitmq"; } if (rabbitmqCtl.startsWith(DOCKER_PREFIX)) { String containerId = rabbitmqCtl.split(":")[1]; @@ -253,6 +269,15 @@ public static String rabbitmqctlCommand() { } } + private static String rabbitmqStreamsCommand() { + String rabbitmqctl = rabbitmqctlCommand(); + int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl"); + if (lastIndex == -1) { + throw new IllegalArgumentException("Not a valid rabbitqmctl command: " + rabbitmqctl); + } + return rabbitmqctl.substring(0, lastIndex) + "rabbitmq-streams"; + } + public static AutoCloseable diskAlarm() throws Exception { return new CallableAutoCloseable( () -> { diff --git a/src/test/java/com/rabbitmq/stream/impl/Assertions.java b/src/test/java/com/rabbitmq/stream/impl/Assertions.java new file mode 100644 index 0000000000..b6f884d622 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/Assertions.java @@ -0,0 +1,62 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static org.assertj.core.api.Assertions.fail; + +import java.time.Duration; +import org.assertj.core.api.AbstractObjectAssert; + +final class Assertions { + + private Assertions() {} + + static SyncAssert assertThat(TestUtils.Sync sync) { + return new SyncAssert(sync); + } + + static class SyncAssert extends AbstractObjectAssert { + + private SyncAssert(TestUtils.Sync sync) { + super(sync, SyncAssert.class); + } + + SyncAssert completes() { + return this.completes(TestUtils.DEFAULT_CONDITION_TIMEOUT); + } + + SyncAssert completes(Duration timeout) { + boolean completed = actual.await(timeout); + if (!completed) { + fail("Sync timed out after %d ms", timeout.toMillis()); + } + return this; + } + + SyncAssert hasCompleted() { + if (!this.actual.hasCompleted()) { + fail("Sync should have completed but has not"); + } + return this; + } + + SyncAssert hasNotCompleted() { + if (this.actual.hasCompleted()) { + fail("Sync should have not completed"); + } + return this; + } + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/SacStreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/SacStreamConsumerTest.java index af108a9659..698cbc5ede 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SacStreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SacStreamConsumerTest.java @@ -14,24 +14,24 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.impl.TestUtils.publishAndWaitForConfirms; -import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; +import static com.rabbitmq.stream.impl.Assertions.assertThat; +import static com.rabbitmq.stream.impl.TestUtils.*; import static org.assertj.core.api.Assertions.assertThat; -import com.rabbitmq.stream.Consumer; -import com.rabbitmq.stream.Environment; -import com.rabbitmq.stream.EnvironmentBuilder; -import com.rabbitmq.stream.OffsetSpecification; +import com.rabbitmq.stream.*; import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast311Condition; import io.netty.channel.EventLoopGroup; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; @ExtendWith({ TestUtils.StreamTestInfrastructureExtension.class, @@ -237,4 +237,72 @@ void externalTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throw // nothing stored on the server side assertThat(cf.get().queryOffset(consumerName, stream).getOffset()).isZero(); } + + public static Stream> + activeConsumerShouldGetUpdateNotificationAfterDisruption() { + return Stream.of( + namedConsumer(consumer -> Host.killConnection(connectionName(consumer)), "kill connection"), + namedConsumer(consumer -> Host.restartStream(stream(consumer)), "restart stream"), + namedConsumer(Consumer::close, "close consumer")); + } + + @ParameterizedTest + @MethodSource + @TestUtils.DisabledIfRabbitMqCtlNotSet + void activeConsumerShouldGetUpdateNotificationAfterDisruption( + java.util.function.Consumer disruption) { + String consumerName = "foo"; + Sync consumer1Active = sync(); + Sync consumer1Inactive = sync(); + Consumer consumer1 = + environment.consumerBuilder().stream(stream) + .name(consumerName) + .noTrackingStrategy() + .singleActiveConsumer() + .consumerUpdateListener( + context -> { + if (context.isActive()) { + consumer1Active.down(); + } else { + consumer1Inactive.down(); + } + return OffsetSpecification.next(); + }) + .messageHandler((context, message) -> {}) + .build(); + + Sync consumer2Active = sync(); + Sync consumer2Inactive = sync(); + environment.consumerBuilder().stream(stream) + .name(consumerName) + .noTrackingStrategy() + .singleActiveConsumer() + .consumerUpdateListener( + context -> { + if (!context.isActive()) { + consumer2Inactive.down(); + } + return OffsetSpecification.next(); + }) + .messageHandler((context, message) -> {}) + .build(); + + assertThat(consumer1Active).completes(); + assertThat(consumer2Inactive).hasNotCompleted(); + assertThat(consumer1Inactive).hasNotCompleted(); + assertThat(consumer2Active).hasNotCompleted(); + + disruption.accept(consumer1); + + assertThat(consumer2Inactive).hasNotCompleted(); + assertThat(consumer1Inactive).completes(); + } + + private static String connectionName(Consumer consumer) { + return ((StreamConsumer) consumer).subscriptionConnectionName(); + } + + private static String stream(Consumer consumer) { + return ((StreamConsumer) consumer).stream(); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java index afb8e86131..3180bbbea6 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java @@ -14,18 +14,12 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology; -import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology; -import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; +import static com.rabbitmq.stream.impl.Assertions.assertThat; +import static com.rabbitmq.stream.impl.TestUtils.*; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; -import com.rabbitmq.stream.Consumer; -import com.rabbitmq.stream.ConsumerUpdateListener; -import com.rabbitmq.stream.Environment; -import com.rabbitmq.stream.EnvironmentBuilder; -import com.rabbitmq.stream.NoOffsetException; -import com.rabbitmq.stream.OffsetSpecification; +import com.rabbitmq.stream.*; import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast311Condition; import com.rabbitmq.stream.impl.TestUtils.CallableBooleanSupplier; import com.rabbitmq.stream.impl.TestUtils.SingleActiveConsumer; @@ -39,11 +33,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; @ExtendWith({ TestUtils.StreamTestInfrastructureExtension.class, @@ -287,7 +284,8 @@ void sacAutoOffsetTrackingShouldStoreOnRelanbancing() throws Exception { && consumerStates.get("1" + partitions.get(2))); assertThat(consumerStates) - .containsEntry("0" + partitions.get(0), true) // not changed after closing + .containsEntry( + "0" + partitions.get(0), false) // client library notifies the listener on closing .containsEntry("0" + partitions.get(1), false) // not changed after closing .containsEntry("0" + partitions.get(2), false) // not changed after closing .containsEntry("1" + partitions.get(0), true) // now active @@ -314,12 +312,15 @@ void sacAutoOffsetTrackingShouldStoreOnRelanbancing() throws Exception { && consumerStates.get("2" + partitions.get(2))); assertThat(consumerStates) - .containsEntry("0" + partitions.get(0), true) // not changed after closing + .containsEntry( + "0" + partitions.get(0), false) // client library notifies the listener on closing .containsEntry("0" + partitions.get(1), false) // not changed after closing .containsEntry("0" + partitions.get(2), false) // not changed after closing - .containsEntry("1" + partitions.get(0), true) // not changed after closing + .containsEntry( + "1" + partitions.get(0), false) // client library notifies the listener on closing .containsEntry("1" + partitions.get(1), false) // not changed after closing - .containsEntry("1" + partitions.get(2), true) // not changed after closing + .containsEntry( + "1" + partitions.get(2), false) // client library notifies the listener on closing .containsEntry("2" + partitions.get(0), true) // now active .containsEntry("2" + partitions.get(1), true) // now active .containsEntry("2" + partitions.get(2), true); // now active @@ -809,6 +810,85 @@ void consumerGroupsOnSameSuperStreamShouldBeIndependent() { }); } + public static Stream> + activeConsumerShouldGetUpdateNotificationAfterDisruption() { + return Stream.of( + namedBiConsumer((s, c) -> Host.killConnection(connectionName(s, c)), "kill connection"), + namedBiConsumer((s, c) -> Host.restartStream(s), "restart stream"), + namedBiConsumer((s, c) -> c.close(), "close consumer")); + } + + @ParameterizedTest + @MethodSource + @TestUtils.DisabledIfRabbitMqCtlNotSet + void activeConsumerShouldGetUpdateNotificationAfterDisruption( + java.util.function.BiConsumer disruption) { + declareSuperStreamTopology(configurationClient, superStream, partitionCount); + String partition = superStream + "-0"; + + String consumerName = "foo"; + Function, ConsumerUpdateListener> + filteringListener = + action -> + (ConsumerUpdateListener) + context -> { + if (partition.equals(context.stream())) { + action.accept(context); + } + return OffsetSpecification.next(); + }; + + Sync consumer1Active = sync(); + Sync consumer1Inactive = sync(); + + Consumer consumer1 = + environment + .consumerBuilder() + .singleActiveConsumer() + .superStream(superStream) + .name(consumerName) + .noTrackingStrategy() + .consumerUpdateListener( + filteringListener.apply( + context -> { + if (context.isActive()) { + consumer1Active.down(); + } else { + consumer1Inactive.down(); + } + })) + .messageHandler((context, message) -> {}) + .build(); + + Sync consumer2Active = sync(); + Sync consumer2Inactive = sync(); + environment + .consumerBuilder() + .singleActiveConsumer() + .superStream(superStream) + .name(consumerName) + .noTrackingStrategy() + .consumerUpdateListener( + filteringListener.apply( + context -> { + if (!context.isActive()) { + consumer2Inactive.down(); + } + })) + .messageHandler((context, message) -> {}) + .build(); + + assertThat(consumer1Active).completes(); + assertThat(consumer2Inactive).hasNotCompleted(); + assertThat(consumer1Inactive).hasNotCompleted(); + assertThat(consumer2Active).hasNotCompleted(); + + disruption.accept(partition, consumer1); + + assertThat(consumer2Inactive).hasNotCompleted(); + assertThat(consumer1Inactive).completes(); + } + private static void waitUntil(CallableBooleanSupplier action) { try { waitAtMost(action); @@ -816,4 +896,9 @@ private static void waitUntil(CallableBooleanSupplier action) { throw new RuntimeException(e); } } + + private static String connectionName(String partition, Consumer consumer) { + return ((StreamConsumer) ((SuperStreamConsumer) consumer).consumer(partition)) + .subscriptionConnectionName(); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 56e862d7ec..9ab50aadf7 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -83,7 +83,7 @@ public final class TestUtils { private static final Logger LOGGER = LoggerFactory.getLogger(TestUtils.class); - private static final Duration DEFAULT_CONDITION_TIMEOUT = Duration.ofSeconds(10); + static final Duration DEFAULT_CONDITION_TIMEOUT = Duration.ofSeconds(10); private static final ConnectionFactory AMQP_CF = new ConnectionFactory(); @@ -257,6 +257,20 @@ public String toString() { }; } + static BiConsumer namedBiConsumer(BiConsumer delegate, String description) { + return new BiConsumer() { + @Override + public void accept(T t, U s) { + delegate.accept(t, s); + } + + @Override + public String toString() { + return description; + } + }; + } + static Answer answer(Runnable task) { return invocationOnMock -> { task.run(); @@ -1103,4 +1117,46 @@ static void repeatIfFailure(RunnableWithException test) throws Exception { private static Connection connection() throws IOException, TimeoutException { return AMQP_CF.newConnection(); } + + static Sync sync() { + return sync(1); + } + + static Sync sync(int count) { + return new Sync(count); + } + + static class Sync { + + private final AtomicReference latch = new AtomicReference<>(); + + private Sync(int count) { + this.latch.set(new CountDownLatch(count)); + } + + void down() { + this.latch.get().countDown(); + } + + boolean await(Duration timeout) { + try { + return this.latch.get().await(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + } + + void reset(int count) { + this.latch.set(new CountDownLatch(count)); + } + + void reset() { + this.reset(1); + } + + boolean hasCompleted() { + return this.latch.get().getCount() == 0; + } + } }