Skip to content

Commit 9c26ff5

Browse files
committed
Remove QueueingConsumer usages in some tests
References #213
1 parent c9d6670 commit 9c26ff5

File tree

3 files changed

+36
-19
lines changed

3 files changed

+36
-19
lines changed

src/test/java/com/rabbitmq/client/test/functional/NoRequeueOnCancel.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818

1919
import static org.junit.Assert.assertNotNull;
2020
import static org.junit.Assert.assertNull;
21+
import static org.junit.Assert.assertTrue;
2122

2223
import java.io.IOException;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
2326

27+
import com.rabbitmq.client.*;
2428
import org.junit.Test;
2529

26-
import com.rabbitmq.client.QueueingConsumer;
2730
import com.rabbitmq.client.test.BrokerTestCase;
2831

2932
public class NoRequeueOnCancel extends BrokerTestCase
@@ -43,11 +46,16 @@ protected void releaseResources() throws IOException {
4346
{
4447
channel.basicPublish("", Q, null, "1".getBytes());
4548

46-
QueueingConsumer c;
47-
48-
c = new QueueingConsumer(channel);
49+
final CountDownLatch latch = new CountDownLatch(1);
50+
Consumer c = new DefaultConsumer(channel) {
51+
@Override
52+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
53+
latch.countDown();
54+
}
55+
};
4956
String consumerTag = channel.basicConsume(Q, false, c);
50-
c.nextDelivery();
57+
assertTrue(latch.await(5, TimeUnit.SECONDS));
58+
5159
channel.basicCancel(consumerTag);
5260

5361
assertNull(channel.basicGet(Q, true));

src/test/java/com/rabbitmq/client/test/functional/QueueLifecycle.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,18 @@
1616

1717
package com.rabbitmq.client.test.functional;
1818

19-
import static org.junit.Assert.fail;
19+
import com.rabbitmq.client.AMQP;
20+
import com.rabbitmq.client.Consumer;
21+
import com.rabbitmq.client.DefaultConsumer;
22+
import com.rabbitmq.client.test.BrokerTestCase;
23+
import org.junit.Test;
2024

2125
import java.io.IOException;
2226
import java.util.HashMap;
2327
import java.util.Map;
2428
import java.util.concurrent.TimeoutException;
2529

26-
import org.junit.Test;
27-
28-
import com.rabbitmq.client.AMQP;
29-
import com.rabbitmq.client.QueueingConsumer;
30-
import com.rabbitmq.client.test.BrokerTestCase;
30+
import static org.junit.Assert.fail;
3131

3232
/**
3333
* Test queue auto-delete and exclusive semantics.
@@ -125,7 +125,7 @@ void verifyNotEquivalent(boolean durable, boolean exclusive,
125125
channel.queueDeclare(name, false, false, true, null);
126126
// now it's there
127127
verifyQueue(name, false, false, true, null);
128-
QueueingConsumer consumer = new QueueingConsumer(channel);
128+
Consumer consumer = new DefaultConsumer(channel);
129129
String consumerTag = channel.basicConsume(name, consumer);
130130
channel.basicCancel(consumerTag);
131131
// now it's not .. we hope
@@ -143,7 +143,7 @@ void verifyNotEquivalent(boolean durable, boolean exclusive,
143143
channel.queueDeclare(name, false, true, false, null);
144144
// now it's there
145145
verifyQueue(name, false, true, false, null);
146-
QueueingConsumer consumer = new QueueingConsumer(channel);
146+
Consumer consumer = new DefaultConsumer(channel);
147147
String consumerTag = channel.basicConsume(name, consumer);
148148
channel.basicCancel(consumerTag);
149149
// and still there, because exclusive no longer implies autodelete

src/test/java/com/rabbitmq/client/test/functional/Recover.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@
2323

2424
import java.io.IOException;
2525
import java.util.Arrays;
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicReference;
2629

30+
import com.rabbitmq.client.*;
2731
import org.junit.Test;
2832

29-
import com.rabbitmq.client.AMQP;
30-
import com.rabbitmq.client.Channel;
31-
import com.rabbitmq.client.QueueingConsumer;
3233
import com.rabbitmq.client.test.BrokerTestCase;
3334

3435
public class Recover extends BrokerTestCase {
@@ -67,12 +68,20 @@ void verifyRedeliverOnRecover(RecoverCallback call)
6768

6869
void verifyNoRedeliveryWithAutoAck(RecoverCallback call)
6970
throws IOException, InterruptedException {
70-
QueueingConsumer consumer = new QueueingConsumer(channel);
71+
final CountDownLatch latch = new CountDownLatch(1);
72+
final AtomicReference<byte[]> bodyReference = new AtomicReference<byte[]>();
73+
Consumer consumer = new DefaultConsumer(channel) {
74+
@Override
75+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
76+
bodyReference.set(body);
77+
latch.countDown();
78+
}
79+
};
7180
channel.basicConsume(queue, true, consumer); // auto ack.
7281
channel.basicPublish("", queue, new AMQP.BasicProperties.Builder().build(), body);
73-
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
82+
assertTrue(latch.await(5, TimeUnit.SECONDS));
7483
assertTrue("consumed message body not as sent",
75-
Arrays.equals(body, delivery.getBody()));
84+
Arrays.equals(body, bodyReference.get()));
7685
call.recover(channel);
7786
assertNull("should be no message available", channel.basicGet(queue, true));
7887
}

0 commit comments

Comments
 (0)