Skip to content

Commit 16ffff6

Browse files
committed
Notify SAC when it is no longer active
A single active consumer may not always receive a notification from the broker when it gets inactive. An obvious reason is the consumer connection goes down. It is still possible to call the consumer update listener from the library, which can help applications take an appropriate action when a consumer goes from active to inactive. This commit implements the call to the listener under such circumstances (connection closed, stream unavailable because restarted, normal consumer closing).
1 parent bd3cec6 commit 16ffff6

File tree

8 files changed

+374
-52
lines changed

8 files changed

+374
-52
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -303,12 +303,7 @@ public void initChannel(SocketChannel ch) {
303303
});
304304

305305
ChannelFuture f;
306-
String clientConnectionName =
307-
parameters.clientProperties == null
308-
? ""
309-
: (parameters.clientProperties.containsKey("connection_name")
310-
? parameters.clientProperties.get("connection_name")
311-
: "");
306+
String clientConnectionName = parameters.clientProperties.getOrDefault("connection_name", "");
312307
try {
313308
LOGGER.debug(
314309
"Trying to create stream connection to {}:{}, with client connection name '{}'",
@@ -1505,6 +1500,10 @@ String connectionName() {
15051500
return builder.append(serverAddress()).toString();
15061501
}
15071502

1503+
String clientConnectionName() {
1504+
return this.clientConnectionName;
1505+
}
1506+
15081507
private String serverAddress() {
15091508
SocketAddress remoteAddress = remoteAddress();
15101509
if (remoteAddress instanceof InetSocketAddress) {

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,7 @@ public void close() {
481481
}
482482

483483
void closeFromEnvironment() {
484+
this.maybeNotifyActiveToInactiveSac();
484485
LOGGER.debug("Calling consumer {} closing callback (stream {})", this.id, this.stream);
485486
this.closingCallback.run();
486487
closed.set(true);
@@ -490,6 +491,7 @@ void closeFromEnvironment() {
490491

491492
void closeAfterStreamDeletion() {
492493
if (closed.compareAndSet(false, true)) {
494+
this.maybeNotifyActiveToInactiveSac();
493495
this.environment.removeConsumer(this);
494496
this.status = Status.CLOSED;
495497
}
@@ -506,11 +508,23 @@ void setTrackingClient(Client client) {
506508
void setSubscriptionClient(Client client) {
507509
this.subscriptionClient = client;
508510
if (client == null && this.isSac()) {
511+
maybeNotifyActiveToInactiveSac();
509512
// we lost the connection
510513
this.sacActive = false;
511514
}
512515
}
513516

517+
private void maybeNotifyActiveToInactiveSac() {
518+
if (this.isSac() && this.sacActive) {
519+
LOGGER.debug(
520+
"Single active consumer {} from stream {} with name {} is unavailable, calling consumer update listener",
521+
this.id,
522+
this.stream,
523+
this.name);
524+
this.consumerUpdate(false);
525+
}
526+
}
527+
514528
synchronized void unavailable() {
515529
this.status = Status.NOT_AVAILABLE;
516530
this.trackingClient = null;
@@ -623,4 +637,13 @@ private void checkNotClosed() {
623637
long id() {
624638
return this.id;
625639
}
640+
641+
String subscriptionConnectionName() {
642+
Client client = this.subscriptionClient;
643+
if (client == null) {
644+
return "<no-connection>";
645+
} else {
646+
return client.clientConnectionName();
647+
}
648+
}
626649
}

src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ public void store(long offset) {
187187
"Consumer#store(long) does not work for super streams, use MessageHandler.Context#storeOffset() instead");
188188
}
189189

190+
Consumer consumer(String partition) {
191+
return this.consumers.get(partition);
192+
}
193+
190194
@Override
191195
public long storedOffset() {
192196
throw new UnsupportedOperationException(

src/test/java/com/rabbitmq/stream/Host.java

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -47,30 +47,34 @@ public static String capture(InputStream is) throws IOException {
4747
return buff.toString();
4848
}
4949

50-
private static Process executeCommand(String command) throws IOException {
50+
private static Process executeCommand(String command) {
5151
return executeCommand(command, false);
5252
}
5353

54-
private static Process executeCommand(String command, boolean ignoreError) throws IOException {
55-
Process pr = executeCommandProcess(command);
56-
57-
int ev = waitForExitValue(pr);
58-
if (ev != 0 && !ignoreError) {
59-
String stdout = capture(pr.getInputStream());
60-
String stderr = capture(pr.getErrorStream());
61-
throw new IOException(
62-
"unexpected command exit value: "
63-
+ ev
64-
+ "\ncommand: "
65-
+ command
66-
+ "\n"
67-
+ "\nstdout:\n"
68-
+ stdout
69-
+ "\nstderr:\n"
70-
+ stderr
71-
+ "\n");
54+
private static Process executeCommand(String command, boolean ignoreError) {
55+
try {
56+
Process pr = executeCommandProcess(command);
57+
58+
int ev = waitForExitValue(pr);
59+
if (ev != 0 && !ignoreError) {
60+
String stdout = capture(pr.getInputStream());
61+
String stderr = capture(pr.getErrorStream());
62+
throw new IOException(
63+
"unexpected command exit value: "
64+
+ ev
65+
+ "\ncommand: "
66+
+ command
67+
+ "\n"
68+
+ "\nstdout:\n"
69+
+ stdout
70+
+ "\nstderr:\n"
71+
+ stderr
72+
+ "\n");
73+
}
74+
return pr;
75+
} catch (IOException e) {
76+
throw new RuntimeException(e);
7277
}
73-
return pr;
7478
}
7579

7680
public static String hostname() throws IOException {
@@ -110,6 +114,10 @@ public static Process rabbitmqctl(String command) throws IOException {
110114
return executeCommand(rabbitmqctlCommand() + " " + command);
111115
}
112116

117+
static Process rabbitmqStreams(String command) {
118+
return executeCommand(rabbitmqStreamsCommand() + " " + command);
119+
}
120+
113121
public static Process rabbitmqctlIgnoreError(String command) throws IOException {
114122
return executeCommand(rabbitmqctlCommand() + " " + command, true);
115123
}
@@ -189,11 +197,19 @@ static List<ConnectionInfo> toConnectionInfoList(String json) {
189197
return GSON.fromJson(json, new TypeToken<List<ConnectionInfo>>() {}.getType());
190198
}
191199

192-
public static Process killStreamLeaderProcess(String stream) throws IOException {
193-
return rabbitmqctl(
194-
"eval 'case rabbit_stream_manager:lookup_leader(<<\"/\">>, <<\""
195-
+ stream
196-
+ "\">>) of {ok, Pid} -> exit(Pid, kill); Pid -> exit(Pid, kill) end.'");
200+
public static void restartStream(String stream) {
201+
rabbitmqStreams(" restart_stream " + stream);
202+
}
203+
204+
public static Process killStreamLeaderProcess(String stream) {
205+
try {
206+
return rabbitmqctl(
207+
"eval 'case rabbit_stream_manager:lookup_leader(<<\"/\">>, <<\""
208+
+ stream
209+
+ "\">>) of {ok, Pid} -> exit(Pid, kill); Pid -> exit(Pid, kill) end.'");
210+
} catch (IOException e) {
211+
throw new RuntimeException(e);
212+
}
197213
}
198214

199215
public static void addUser(String username, String password) throws IOException {
@@ -243,7 +259,7 @@ public static void setEnv(String parameter, String value) throws IOException {
243259
public static String rabbitmqctlCommand() {
244260
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
245261
if (rabbitmqCtl == null) {
246-
throw new IllegalStateException("Please define the rabbitmqctl.bin system property");
262+
rabbitmqCtl = DOCKER_PREFIX + "rabbitmq";
247263
}
248264
if (rabbitmqCtl.startsWith(DOCKER_PREFIX)) {
249265
String containerId = rabbitmqCtl.split(":")[1];
@@ -253,6 +269,15 @@ public static String rabbitmqctlCommand() {
253269
}
254270
}
255271

272+
private static String rabbitmqStreamsCommand() {
273+
String rabbitmqctl = rabbitmqctlCommand();
274+
int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl");
275+
if (lastIndex == -1) {
276+
throw new IllegalArgumentException("Not a valid rabbitqmctl command: " + rabbitmqctl);
277+
}
278+
return rabbitmqctl.substring(0, lastIndex) + "rabbitmq-streams";
279+
}
280+
256281
public static AutoCloseable diskAlarm() throws Exception {
257282
return new CallableAutoCloseable(
258283
() -> {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
package com.rabbitmq.stream.impl;
16+
17+
import static org.assertj.core.api.Assertions.fail;
18+
19+
import java.time.Duration;
20+
import org.assertj.core.api.AbstractObjectAssert;
21+
22+
final class Assertions {
23+
24+
private Assertions() {}
25+
26+
static SyncAssert assertThat(TestUtils.Sync sync) {
27+
return new SyncAssert(sync);
28+
}
29+
30+
static class SyncAssert extends AbstractObjectAssert<SyncAssert, TestUtils.Sync> {
31+
32+
private SyncAssert(TestUtils.Sync sync) {
33+
super(sync, SyncAssert.class);
34+
}
35+
36+
SyncAssert completes() {
37+
return this.completes(TestUtils.DEFAULT_CONDITION_TIMEOUT);
38+
}
39+
40+
SyncAssert completes(Duration timeout) {
41+
boolean completed = actual.await(timeout);
42+
if (!completed) {
43+
fail("Sync timed out after %d ms", timeout.toMillis());
44+
}
45+
return this;
46+
}
47+
48+
SyncAssert hasCompleted() {
49+
if (!this.actual.hasCompleted()) {
50+
fail("Sync should have completed but has not");
51+
}
52+
return this;
53+
}
54+
55+
SyncAssert hasNotCompleted() {
56+
if (this.actual.hasCompleted()) {
57+
fail("Sync should have not completed");
58+
}
59+
return this;
60+
}
61+
}
62+
}

src/test/java/com/rabbitmq/stream/impl/SacStreamConsumerTest.java

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,24 @@
1414
// info@rabbitmq.com.
1515
package com.rabbitmq.stream.impl;
1616

17-
import static com.rabbitmq.stream.impl.TestUtils.publishAndWaitForConfirms;
18-
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
17+
import static com.rabbitmq.stream.impl.Assertions.assertThat;
18+
import static com.rabbitmq.stream.impl.TestUtils.*;
1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21-
import com.rabbitmq.stream.Consumer;
22-
import com.rabbitmq.stream.Environment;
23-
import com.rabbitmq.stream.EnvironmentBuilder;
24-
import com.rabbitmq.stream.OffsetSpecification;
21+
import com.rabbitmq.stream.*;
2522
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast311Condition;
2623
import io.netty.channel.EventLoopGroup;
2724
import java.util.Map;
2825
import java.util.concurrent.ConcurrentHashMap;
2926
import java.util.concurrent.atomic.AtomicInteger;
3027
import java.util.concurrent.atomic.AtomicLong;
28+
import java.util.stream.Stream;
3129
import org.junit.jupiter.api.AfterEach;
3230
import org.junit.jupiter.api.BeforeEach;
3331
import org.junit.jupiter.api.Test;
3432
import org.junit.jupiter.api.extension.ExtendWith;
33+
import org.junit.jupiter.params.ParameterizedTest;
34+
import org.junit.jupiter.params.provider.MethodSource;
3535

3636
@ExtendWith({
3737
TestUtils.StreamTestInfrastructureExtension.class,
@@ -237,4 +237,72 @@ void externalTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throw
237237
// nothing stored on the server side
238238
assertThat(cf.get().queryOffset(consumerName, stream).getOffset()).isZero();
239239
}
240+
241+
public static Stream<java.util.function.Consumer<Consumer>>
242+
activeConsumerShouldGetUpdateNotificationAfterDisruption() {
243+
return Stream.of(
244+
namedConsumer(consumer -> Host.killConnection(connectionName(consumer)), "kill connection"),
245+
namedConsumer(consumer -> Host.restartStream(stream(consumer)), "restart stream"),
246+
namedConsumer(Consumer::close, "close consumer"));
247+
}
248+
249+
@ParameterizedTest
250+
@MethodSource
251+
@TestUtils.DisabledIfRabbitMqCtlNotSet
252+
void activeConsumerShouldGetUpdateNotificationAfterDisruption(
253+
java.util.function.Consumer<Consumer> disruption) {
254+
String consumerName = "foo";
255+
Sync consumer1Active = sync();
256+
Sync consumer1Inactive = sync();
257+
Consumer consumer1 =
258+
environment.consumerBuilder().stream(stream)
259+
.name(consumerName)
260+
.noTrackingStrategy()
261+
.singleActiveConsumer()
262+
.consumerUpdateListener(
263+
context -> {
264+
if (context.isActive()) {
265+
consumer1Active.down();
266+
} else {
267+
consumer1Inactive.down();
268+
}
269+
return OffsetSpecification.next();
270+
})
271+
.messageHandler((context, message) -> {})
272+
.build();
273+
274+
Sync consumer2Active = sync();
275+
Sync consumer2Inactive = sync();
276+
environment.consumerBuilder().stream(stream)
277+
.name(consumerName)
278+
.noTrackingStrategy()
279+
.singleActiveConsumer()
280+
.consumerUpdateListener(
281+
context -> {
282+
if (!context.isActive()) {
283+
consumer2Inactive.down();
284+
}
285+
return OffsetSpecification.next();
286+
})
287+
.messageHandler((context, message) -> {})
288+
.build();
289+
290+
assertThat(consumer1Active).completes();
291+
assertThat(consumer2Inactive).hasNotCompleted();
292+
assertThat(consumer1Inactive).hasNotCompleted();
293+
assertThat(consumer2Active).hasNotCompleted();
294+
295+
disruption.accept(consumer1);
296+
297+
assertThat(consumer2Inactive).hasNotCompleted();
298+
assertThat(consumer1Inactive).completes();
299+
}
300+
301+
private static String connectionName(Consumer consumer) {
302+
return ((StreamConsumer) consumer).subscriptionConnectionName();
303+
}
304+
305+
private static String stream(Consumer consumer) {
306+
return ((StreamConsumer) consumer).stream();
307+
}
240308
}

0 commit comments

Comments
 (0)