Skip to content

Commit 2253017

Browse files
committed
Use atomic boolean to track state of RpcClient
Instead of using the consumer property. Avoids race conditions during closing. References #1033
1 parent 478979b commit 2253017

File tree

1 file changed

+13
-16
lines changed

1 file changed

+13
-16
lines changed

src/main/java/com/rabbitmq/client/RpcClient.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -13,12 +13,10 @@
1313
// If you have any questions regarding licensing, please contact us at
1414
// info@rabbitmq.com.
1515

16-
1716
package com.rabbitmq.client;
1817

1918
import java.io.ByteArrayInputStream;
2019
import java.io.ByteArrayOutputStream;
21-
import java.io.Closeable;
2220
import java.io.DataInputStream;
2321
import java.io.DataOutputStream;
2422
import java.io.EOFException;
@@ -28,6 +26,7 @@
2826
import java.util.Map;
2927
import java.util.Map.Entry;
3028
import java.util.concurrent.TimeoutException;
29+
import java.util.concurrent.atomic.AtomicBoolean;
3130
import java.util.function.Function;
3231
import java.util.function.Supplier;
3332

@@ -45,7 +44,7 @@
4544
* It simply provides a mechanism for sending a message to an exchange with a given routing key,
4645
* and waiting for a response.
4746
*/
48-
public class RpcClient implements Closeable {
47+
public class RpcClient implements AutoCloseable {
4948

5049
private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);
5150

@@ -63,6 +62,8 @@ public class RpcClient implements Closeable {
6362
protected final static int NO_TIMEOUT = -1;
6463
/** Whether to publish RPC requests with the mandatory flag or not. */
6564
private final boolean _useMandatory;
65+
/** closed flag */
66+
private final AtomicBoolean closed = new AtomicBoolean(false);
6667

6768
public final static Function<Object, Response> DEFAULT_REPLY_HANDLER = reply -> {
6869
if (reply instanceof ShutdownSignalException) {
@@ -96,7 +97,7 @@ public class RpcClient implements Closeable {
9697
private String lastCorrelationId = "0";
9798

9899
/** Consumer attached to our reply queue */
99-
private DefaultConsumer _consumer;
100+
private final DefaultConsumer _consumer;
100101

101102
/**
102103
* Construct a {@link RpcClient} with the passed-in {@link RpcClientParams}.
@@ -142,8 +143,8 @@ public RpcClient(RpcClientParams params) throws
142143
* Private API - ensures the RpcClient is correctly open.
143144
* @throws IOException if an error is encountered
144145
*/
145-
public void checkConsumer() throws IOException {
146-
if (_consumer == null) {
146+
private void checkNotClosed() throws IOException {
147+
if (this.closed.get()) {
147148
throw new EOFException("RpcClient is closed");
148149
}
149150
}
@@ -154,11 +155,8 @@ public void checkConsumer() throws IOException {
154155
*/
155156
@Override
156157
public void close() throws IOException {
157-
if (_consumer != null) {
158-
final String consumerTag = _consumer.getConsumerTag();
159-
// set it null before calling basicCancel to make this method idempotent in case of IOException
160-
_consumer = null;
161-
_channel.basicCancel(consumerTag);
158+
if (this.closed.compareAndSet(false, true)) {
159+
_channel.basicCancel(_consumer.getConsumerTag());
162160
}
163161
}
164162

@@ -176,16 +174,15 @@ public void handleShutdownSignal(String consumerTag,
176174
for (Entry<String, BlockingCell<Object>> entry : _continuationMap.entrySet()) {
177175
entry.getValue().set(signal);
178176
}
179-
_consumer = null;
177+
closed.set(true);
180178
}
181179
}
182180

183181
@Override
184182
public void handleDelivery(String consumerTag,
185183
Envelope envelope,
186184
AMQP.BasicProperties properties,
187-
byte[] body)
188-
throws IOException {
185+
byte[] body) {
189186
synchronized (_continuationMap) {
190187
String replyId = properties.getCorrelationId();
191188
BlockingCell<Object> blocker =_continuationMap.remove(replyId);
@@ -216,7 +213,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message)
216213

217214
public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
218215
throws IOException, ShutdownSignalException, TimeoutException {
219-
checkConsumer();
216+
checkNotClosed();
220217
BlockingCell<Object> k = new BlockingCell<Object>();
221218
String replyId;
222219
synchronized (_continuationMap) {

0 commit comments

Comments
 (0)