Skip to content

Commit 9753b88

Browse files
committed
Add deliver/shutdown lambda-based consume methods
References #247
1 parent b9d07e8 commit 9753b88

File tree

8 files changed

+310
-17
lines changed

8 files changed

+310
-17
lines changed

src/main/java/com/rabbitmq/client/CancelCallback.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@
1616
package com.rabbitmq.client;
1717

1818
import java.io.IOException;
19+
import java.util.Map;
1920

2021
/**
2122
* Callback interface to be notified of the cancellation of a consumer.
2223
* Prefer it over {@link Consumer} for a lambda-oriented syntax,
2324
* if you don't need to implement all the application callbacks.
24-
* @since 5.0
2525
* @see DeliverCallback
26+
* @see ConsumerShutdownSignalCallback
2627
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback)
28+
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, ConsumerShutdownSignalCallback)
2729
* @since 5.0
2830
*/
2931
@FunctionalInterface

src/main/java/com/rabbitmq/client/Channel.java

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,26 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
816816
*/
817817
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
818818

819+
/**
820+
* Start a non-nolocal, non-exclusive consumer, with
821+
* explicit acknowledgement and a server-generated consumerTag.
822+
* Provide access only to <code>basic.deliver</code> and
823+
* shutdown signal callbacks (which is sufficient
824+
* for most cases). See methods with a {@link Consumer} argument
825+
* to have access to all the application callbacks.
826+
* @param queue the name of the queue
827+
* @param deliverCallback callback when a message is delivered
828+
* @param shutdownSignalCallback callback when the channel/connection is shut down
829+
* @return the consumerTag generated by the server
830+
* @throws IOException if an error is encountered
831+
* @see com.rabbitmq.client.AMQP.Basic.Consume
832+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
833+
* @see #basicAck
834+
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
835+
* @since 5.0
836+
*/
837+
String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
838+
819839
/**
820840
* Start a non-nolocal, non-exclusive consumer, with
821841
* a server-generated consumerTag.
@@ -855,6 +875,29 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
855875
*/
856876
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
857877

878+
/**
879+
* Start a non-nolocal, non-exclusive consumer, with
880+
* a server-generated consumerTag.
881+
* Provide access only to <code>basic.deliver</code> and
882+
* shutdown signal callbacks (which is sufficient
883+
* for most cases). See methods with a {@link Consumer} argument
884+
* to have access to all the application callbacks.
885+
* @param queue the name of the queue
886+
* @param autoAck true if the server should consider messages
887+
* acknowledged once delivered; false if the server should expect
888+
* explicit acknowledgements
889+
* @param deliverCallback callback when a message is delivered
890+
* @param shutdownSignalCallback callback when the channel/connection is shut down
891+
* @return the consumerTag generated by the server
892+
* @throws IOException if an error is encountered
893+
* @see com.rabbitmq.client.AMQP.Basic.Consume
894+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
895+
* @see #basicAck
896+
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
897+
* @since 5.0
898+
*/
899+
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
900+
858901
/**
859902
* Start a non-nolocal, non-exclusive consumer, with
860903
* a server-generated consumerTag and specified arguments.
@@ -896,6 +939,30 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
896939
*/
897940
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
898941

942+
/**
943+
* Start a non-nolocal, non-exclusive consumer, with
944+
* a server-generated consumerTag and specified arguments.
945+
* Provide access only to <code>basic.deliver</code> and
946+
* shutdown signal callbacks (which is sufficient
947+
* for most cases). See methods with a {@link Consumer} argument
948+
* to have access to all the application callbacks.
949+
* @param queue the name of the queue
950+
* @param autoAck true if the server should consider messages
951+
* acknowledged once delivered; false if the server should expect
952+
* explicit acknowledgements
953+
* @param arguments a set of arguments for the consume
954+
* @param deliverCallback callback when a message is delivered
955+
* @param shutdownSignalCallback callback when the channel/connection is shut down
956+
* @return the consumerTag generated by the server
957+
* @throws IOException if an error is encountered
958+
* @see com.rabbitmq.client.AMQP.Basic.Consume
959+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
960+
* @see #basicAck
961+
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
962+
* @since 5.0
963+
*/
964+
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
965+
899966
/**
900967
* Start a non-nolocal, non-exclusive consumer.
901968
* @param queue the name of the queue
@@ -934,6 +1001,28 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
9341001
*/
9351002
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
9361003

1004+
/**
1005+
* Start a non-nolocal, non-exclusive consumer.
1006+
* Provide access only to <code>basic.deliver</code> and
1007+
* shutdown signal callbacks (which is sufficient
1008+
* for most cases). See methods with a {@link Consumer} argument
1009+
* to have access to all the application callbacks.
1010+
* @param queue the name of the queue
1011+
* @param autoAck true if the server should consider messages
1012+
* acknowledged once delivered; false if the server should expect
1013+
* explicit acknowledgements
1014+
* @param consumerTag a client-generated consumer tag to establish context
1015+
* @param deliverCallback callback when a message is delivered
1016+
* @param shutdownSignalCallback callback when the channel/connection is shut down
1017+
* @return the consumerTag associated with the new consumer
1018+
* @throws java.io.IOException if an error is encountered
1019+
* @see com.rabbitmq.client.AMQP.Basic.Consume
1020+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
1021+
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
1022+
* @since 5.0
1023+
*/
1024+
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
1025+
9371026
/**
9381027
* Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}
9391028
* method.
@@ -980,6 +1069,32 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
9801069
*/
9811070
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
9821071

1072+
/**
1073+
* Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}
1074+
* method.
1075+
* Provide access only to <code>basic.deliver</code> and
1076+
* shutdown signal callbacks (which is sufficient
1077+
* for most cases). See methods with a {@link Consumer} argument
1078+
* to have access to all the application callbacks.
1079+
* @param queue the name of the queue
1080+
* @param autoAck true if the server should consider messages
1081+
* acknowledged once delivered; false if the server should expect
1082+
* explicit acknowledgements
1083+
* @param consumerTag a client-generated consumer tag to establish context
1084+
* @param noLocal true if the server should not deliver to this consumer
1085+
* messages published on this channel's connection
1086+
* @param exclusive true if this is an exclusive consumer
1087+
* @param arguments a set of arguments for the consume
1088+
* @param deliverCallback callback when a message is delivered
1089+
* @param shutdownSignalCallback callback when the channel/connection is shut down
1090+
* @return the consumerTag associated with the new consumer
1091+
* @throws java.io.IOException if an error is encountered
1092+
* @see com.rabbitmq.client.AMQP.Basic.Consume
1093+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
1094+
* @since 5.0
1095+
*/
1096+
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
1097+
9831098
/**
9841099
* Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}
9851100
* method.

src/main/java/com/rabbitmq/client/Consumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
* because this will delay dispatch of messages to other {@link Consumer}s on the same
3434
* {@link Channel}.
3535
*
36-
* For a lambda-oriented syntax, use {@link DeliverCallback} and
37-
* {@link CancelCallback}.
36+
* For a lambda-oriented syntax, use {@link DeliverCallback},
37+
* {@link CancelCallback}, and {@link ConsumerShutdownSignalCallback}.
3838
*
3939
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, java.util.Map, Consumer)
4040
* @see Channel#basicCancel
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client;
17+
18+
import java.util.Map;
19+
20+
/**
21+
* Callback interface to be notified when either the consumer channel
22+
* or the underlying connection has been shut down.
23+
* Prefer it over {@link Consumer} for a lambda-oriented syntax,
24+
* if you don't need to implement all the application callbacks.
25+
* @see CancelCallback
26+
* @see DeliverCallback
27+
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback)
28+
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, ConsumerShutdownSignalCallback)
29+
* @since 5.0
30+
*/
31+
@FunctionalInterface
32+
public interface ConsumerShutdownSignalCallback {
33+
34+
/**
35+
* Called when either the channel or the underlying connection has been shut down.
36+
* @param consumerTag the <i>consumer tag</i> associated with the consumer
37+
* @param sig a {@link ShutdownSignalException} indicating the reason for the shut down
38+
*/
39+
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
40+
41+
}

src/main/java/com/rabbitmq/client/DeliverCallback.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
14
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
25
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
36
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
@@ -20,7 +23,9 @@
2023
* Prefer it over {@link Consumer} for a lambda-oriented syntax,
2124
* if you don't need to implement all the application callbacks.
2225
* @see CancelCallback
26+
* @see ConsumerShutdownSignalCallback
2327
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback)
28+
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, ConsumerShutdownSignalCallback)
2429
* @since 5.0
2530
*/
2631
@FunctionalInterface

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,7 +1192,13 @@ public String basicConsume(String queue, Consumer callback)
11921192
/** Public API - {@inheritDoc} */
11931193
@Override
11941194
public String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
1195-
return basicConsume(queue, consumerFromCallbacks(deliverCallback, cancelCallback));
1195+
return basicConsume(queue, consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback));
1196+
}
1197+
1198+
/** Public API - {@inheritDoc} */
1199+
@Override
1200+
public String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
1201+
return basicConsume(queue, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback));
11961202
}
11971203

11981204
/** Public API - {@inheritDoc} */
@@ -1203,10 +1209,17 @@ public String basicConsume(String queue, boolean autoAck, Consumer callback)
12031209
return basicConsume(queue, autoAck, "", callback);
12041210
}
12051211

1212+
/** Public API - {@inheritDoc} */
1213+
@Override
1214+
public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
1215+
throws IOException {
1216+
return basicConsume(queue, autoAck, "", consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback));
1217+
}
1218+
12061219
/** Public API - {@inheritDoc} */
12071220
@Override
12081221
public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
1209-
return basicConsume(queue, autoAck, "", consumerFromCallbacks(deliverCallback, cancelCallback));
1222+
return basicConsume(queue, autoAck, "", consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback));
12101223
}
12111224

12121225
/** Public API - {@inheritDoc} */
@@ -1222,7 +1235,14 @@ public String basicConsume(String queue, boolean autoAck, Map<String, Object> ar
12221235
@Override
12231236
public String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback)
12241237
throws IOException {
1225-
return basicConsume(queue, autoAck, "", false, false, arguments, consumerFromCallbacks(deliverCallback, cancelCallback));
1238+
return basicConsume(queue, autoAck, "", false, false, arguments, consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback));
1239+
}
1240+
1241+
/** Public API - {@inheritDoc} */
1242+
@Override
1243+
public String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback,
1244+
ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
1245+
return basicConsume(queue, autoAck, "", false, false, arguments, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback));
12261246
}
12271247

12281248
/** Public API - {@inheritDoc} */
@@ -1238,14 +1258,28 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag,
12381258
@Override
12391259
public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback)
12401260
throws IOException {
1241-
return basicConsume(queue, autoAck, consumerTag, false, false, null, consumerFromCallbacks(deliverCallback, cancelCallback));
1261+
return basicConsume(queue, autoAck, consumerTag, false, false, null, consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback));
1262+
}
1263+
1264+
/** Public API - {@inheritDoc} */
1265+
@Override
1266+
public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback,
1267+
ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
1268+
return basicConsume(queue, autoAck, consumerTag, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback));
12421269
}
12431270

12441271
/** Public API - {@inheritDoc} */
12451272
@Override
12461273
public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments,
12471274
DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
1248-
return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromCallbacks(deliverCallback, cancelCallback));
1275+
return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback));
1276+
}
1277+
1278+
/** Public API - {@inheritDoc} */
1279+
@Override
1280+
public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments,
1281+
DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
1282+
return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback));
12491283
}
12501284

12511285
/** Public API - {@inheritDoc} */
@@ -1286,7 +1320,7 @@ public String transformReply(AMQCommand replyCommand) {
12861320
}
12871321
}
12881322

1289-
private Consumer consumerFromCallbacks(final DeliverCallback deliverCallback, final CancelCallback cancelCallback) {
1323+
private Consumer consumerFromDeliverCancelCallbacks(final DeliverCallback deliverCallback, final CancelCallback cancelCallback) {
12901324
return new Consumer() {
12911325

12921326
@Override
@@ -1313,6 +1347,32 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
13131347
};
13141348
}
13151349

1350+
private Consumer consumerFromDeliverShutdownCallbacks(final DeliverCallback deliverCallback, final ConsumerShutdownSignalCallback shutdownSignalCallback) {
1351+
return new Consumer() {
1352+
@Override
1353+
public void handleConsumeOk(String consumerTag) { }
1354+
1355+
@Override
1356+
public void handleCancelOk(String consumerTag) { }
1357+
1358+
@Override
1359+
public void handleCancel(String consumerTag) throws IOException { }
1360+
1361+
@Override
1362+
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
1363+
shutdownSignalCallback.handleShutdownSignal(consumerTag, sig);
1364+
}
1365+
1366+
@Override
1367+
public void handleRecoverOk(String consumerTag) { }
1368+
1369+
@Override
1370+
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
1371+
deliverCallback.handle(consumerTag, new Delivery(envelope, properties, body));
1372+
}
1373+
};
1374+
}
1375+
13161376
/** Public API - {@inheritDoc} */
13171377
@Override
13181378
public void basicCancel(final String consumerTag)

0 commit comments

Comments
 (0)