Skip to content

Commit 53567ce

Browse files
committed
Enhance RpcClient: Provide access to msg metadata
This provides a simple way to get at the other parameters from the Consumer's `handleDelivery` method (e.g. consumerTag, envelope, properties). It is backwards compatible, and only those using the new interface `responseCall` will have access to the additional data.
1 parent bba45a8 commit 53567ce

File tree

1 file changed

+64
-6
lines changed

1 file changed

+64
-6
lines changed

src/main/java/com/rabbitmq/client/RpcClient.java

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ public void handleDelivery(String consumerTag,
191191
String replyId = properties.getCorrelationId();
192192
BlockingCell<Object> blocker = _continuationMap.get(replyId);
193193
_continuationMap.remove(replyId);
194-
blocker.set(body);
194+
blocker.set(new Response(consumerTag, envelope, properties, body));
195195
}
196196
}
197197
};
@@ -205,16 +205,15 @@ public void publish(AMQP.BasicProperties props, byte[] message)
205205
_channel.basicPublish(_exchange, _routingKey, props, message);
206206
}
207207

208-
public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
209-
throws IOException, ShutdownSignalException, TimeoutException
210-
{
208+
public Response doCall(AMQP.BasicProperties props, byte[] message)
209+
throws IOException, ShutdownSignalException, TimeoutException {
211210
checkConsumer();
212211
BlockingCell<Object> k = new BlockingCell<Object>();
213212
synchronized (_continuationMap) {
214213
_correlationId++;
215214
String replyId = "" + _correlationId;
216215
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
217-
.correlationId(replyId).replyTo(_replyTo).build();
216+
.correlationId(replyId).replyTo(_replyTo).build();
218217
_continuationMap.put(replyId, k);
219218
}
220219
publish(props, message);
@@ -229,10 +228,16 @@ public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
229228
wrapper.initCause(sig);
230229
throw wrapper;
231230
} else {
232-
return (byte[]) reply;
231+
return (Response) reply;
233232
}
234233
}
235234

235+
public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
236+
throws IOException, ShutdownSignalException, TimeoutException
237+
{
238+
return doCall(props, message).getBody();
239+
}
240+
236241
/**
237242
* Perform a simple byte-array-based RPC roundtrip.
238243
* @param message the byte array request message to send
@@ -246,6 +251,21 @@ public byte[] primitiveCall(byte[] message)
246251
return primitiveCall(null, message);
247252
}
248253

254+
/**
255+
* Perform a simple byte-array-based RPC roundtrip
256+
*
257+
* Useful if you need to get at more than just the body of the message
258+
*
259+
* @param message the byte array request message to send
260+
* @return The response object is an envelope that contains all of the data provided to the `handleDelivery` consumer
261+
* @throws ShutdownSignalException if the connection dies during our wait
262+
* @throws IOException if an error is encountered
263+
* @throws TimeoutException if a response is not received within the configured timeout
264+
*/
265+
public Response responseCall(byte[] message) throws IOException, ShutdownSignalException, TimeoutException {
266+
return doCall(null, message);
267+
}
268+
249269
/**
250270
* Perform a simple string-based RPC roundtrip.
251271
* @param message the string request message to send
@@ -368,5 +388,43 @@ public int getCorrelationId() {
368388
public Consumer getConsumer() {
369389
return _consumer;
370390
}
391+
392+
/**
393+
* The response object is an envelope that contains all of the data provided to the `handleDelivery` consumer
394+
*/
395+
public static class Response {
396+
protected String consumerTag;
397+
protected Envelope envelope;
398+
protected AMQP.BasicProperties properties;
399+
protected byte[] body;
400+
401+
public Response() {
402+
}
403+
404+
public Response(
405+
final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties,
406+
final byte[] body) {
407+
this.consumerTag = consumerTag;
408+
this.envelope = envelope;
409+
this.properties = properties;
410+
this.body = body;
411+
}
412+
413+
public String getConsumerTag() {
414+
return consumerTag;
415+
}
416+
417+
public Envelope getEnvelope() {
418+
return envelope;
419+
}
420+
421+
public AMQP.BasicProperties getProperties() {
422+
return properties;
423+
}
424+
425+
public byte[] getBody() {
426+
return body;
427+
}
428+
}
371429
}
372430

0 commit comments

Comments
 (0)