Skip to content

Commit 07be29a

Browse files
authored
Merge pull request #448 from rabbitmq/super-stream-frames
Support super stream creation/deletion
2 parents 0425a4c + 57376da commit 07be29a

23 files changed

+832
-316
lines changed

.github/workflows/test-pr.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ jobs:
2424
cache: 'maven'
2525
- name: Start broker
2626
run: ci/start-broker.sh
27+
env:
28+
RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:super-stream-frames-otp-max-bazel'
2729
- name: Test
2830
run: |
2931
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \

src/docs/asciidoc/super-streams.adoc

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,51 @@ When a super stream is in use, the stream Java client queries this information t
5858
From the application code point of view, using a super stream is mostly configuration-based.
5959
Some logic must also be provided to extract routing information from messages.
6060

61-
==== Super Stream Creation
61+
==== Super Stream Creation and Deletion
6262

63-
It is possible to create the topology of a super stream with any AMQP 0.9.1 library or with the https://www.rabbitmq.com/management.html[management plugin], but the `rabbitmq-streams add_super_stream` command is a handy shortcut.
64-
Here is how to create an invoices super stream with 3 partitions:
63+
It is possible to manage super streams with
64+
65+
* the stream Java client, by using `Environment#streamCreator()` and `Environment#deleteSuperStream(String)`
66+
* the `add_super_stream` and `delete_super_stream` commands in `rabbitmq-streams` (CLI)
67+
* any AMQP 0.9.1 client library
68+
* the https://www.rabbitmq.com/management.html[management plugin]
69+
70+
The stream Java client and the dedicated CLI commands are easier to use as they take care of the topology details (exchange, streams, and bindings).
71+
72+
===== With the Client Library
73+
74+
Here is how to create an `invoices` super stream with 5 partitions:
75+
76+
.Creating a super stream by specifying the number of partitions
77+
[source,java,indent=0]
78+
--------
79+
include::{test-examples}/SuperStreamUsage.java[tag=creation-partitions]
80+
--------
81+
82+
The super stream partitions will be `invoices-0`, `invoices-1`, ..., `invoices-5`.
83+
We use this kind of topology when routing keys of outbound messages are hashed to pick the partition to publish them to.
84+
This way, if the routing key is the customer ID of the invoice, all the invoices for a given customer end up in the same partition, and they can be processed in the publishing order.
85+
86+
It is also possible to specify binding keys when creating a super stream:
87+
88+
.Creating a super stream by specifying the binding keys
89+
[source,java,indent=0]
90+
--------
91+
include::{test-examples}/SuperStreamUsage.java[tag=creation-binding-keys]
92+
--------
93+
94+
The super stream partitions will be `invoices-amer`, `invoices-emea` and `invoices-apac` in this case.
95+
96+
Using one type of topology or the other depends on the use cases, especially how messages are processed.
97+
See the next sections on publishing and consuming to find out more.
98+
99+
===== With the CLI
100+
101+
Here is how to create an `invoices` super stream with 5 partitions:
65102

66103
.Creating a super stream from the CLI
67104
----
68-
rabbitmq-streams add_super_stream invoices --partitions 3
105+
rabbitmq-streams add_super_stream invoices --partitions 5
69106
----
70107

71108
Use `rabbitmq-streams add_super_stream --help` to learn more about the command.

src/main/java/com/rabbitmq/stream/Constants.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -72,6 +72,8 @@ public final class Constants {
7272
public static final short COMMAND_CONSUMER_UPDATE = 26;
7373
public static final short COMMAND_EXCHANGE_COMMAND_VERSIONS = 27;
7474
public static final short COMMAND_STREAM_STATS = 28;
75+
public static final short COMMAND_CREATE_SUPER_STREAM = 29;
76+
public static final short COMMAND_DELETE_SUPER_STREAM = 30;
7577

7678
public static final short VERSION_1 = 1;
7779
public static final short VERSION_2 = 2;

src/main/java/com/rabbitmq/stream/Environment.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -58,9 +58,19 @@ static EnvironmentBuilder builder() {
5858
* Delete a stream
5959
*
6060
* @param stream the stream to delete
61+
* @since 0.15.0
6162
*/
6263
void deleteStream(String stream);
6364

65+
/**
66+
* Delete a super stream.
67+
*
68+
* <p>Requires RabbitMQ 3.13.0 or more.
69+
*
70+
* @param superStream the super stream to delete
71+
*/
72+
void deleteSuperStream(String superStream);
73+
6474
/**
6575
* Query statistics on a stream.
6676
*

src/main/java/com/rabbitmq/stream/StreamCreator.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -23,13 +23,24 @@ public interface StreamCreator {
2323
ByteCapacity MAX_SEGMENT_SIZE = ByteCapacity.from("3GB");
2424

2525
/**
26-
* The name of the stream
26+
* The name of the stream.
27+
*
28+
* <p>Alias for {@link #name(String)}.
2729
*
2830
* @param stream
2931
* @return this creator instance
3032
*/
3133
StreamCreator stream(String stream);
3234

35+
/**
36+
* The name of the (super) stream.
37+
*
38+
* @param name
39+
* @return this creator instance
40+
* @since 0.15.0
41+
*/
42+
StreamCreator name(String name);
43+
3344
/**
3445
* The maximum size of the stream before it gets truncated.
3546
*
@@ -80,6 +91,16 @@ public interface StreamCreator {
8091
*/
8192
StreamCreator filterSize(int size);
8293

94+
/**
95+
* Configure the super stream to create.
96+
*
97+
* <p>Requires RabbitMQ 3.13.0 or more.
98+
*
99+
* @return the super stream configuration
100+
* @since 0.15.0
101+
*/
102+
SuperStreamConfiguration superStream();
103+
83104
/**
84105
* Create the stream.
85106
*
@@ -142,4 +163,39 @@ public String value() {
142163
return this.value;
143164
}
144165
}
166+
167+
/**
168+
* Super stream configuration.
169+
*
170+
* @since 0.15.0
171+
*/
172+
interface SuperStreamConfiguration {
173+
174+
/**
175+
* The number of partitions of the super stream.
176+
*
177+
* <p>Mutually exclusive with {@link #bindingKeys(String...)}. Default is 3.
178+
*
179+
* @param partitions
180+
* @return this super stream configuration instance
181+
*/
182+
SuperStreamConfiguration partitions(int partitions);
183+
184+
/**
185+
* The binding keys to use when declaring the super stream partitions.
186+
*
187+
* <p>Mutually exclusive with {@link #partitions(int)}. Default is null.
188+
*
189+
* @param bindingKeys
190+
* @return this super stream configuration instance
191+
*/
192+
SuperStreamConfiguration bindingKeys(String... bindingKeys);
193+
194+
/**
195+
* Go back to the creator.
196+
*
197+
* @return the stream creator
198+
*/
199+
StreamCreator creator();
200+
}
145201
}

0 commit comments

Comments
 (0)