From 7253c941d96a8057c6b32f3ff45d3e1c7fac0164 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Faria?= Date: Thu, 8 Jun 2023 19:47:08 +0100 Subject: [PATCH 1/2] Do not confirmSelect more than once per channel In order to avoid unnecessary blocking RPC calls and conform to best practices, the Channel now checks if it is already activated confirm mode before sending a confirm.select RPC call. If confirm mode is already activated, calling confirmSelect() again returns immediately without sending an RPC call. Closes #1056 --- .../com/rabbitmq/client/impl/ChannelN.java | 11 ++++++- .../rabbitmq/client/test/ChannelNTest.java | 31 +++++++++++++++++-- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index b922b53da0..57a15d62fe 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -82,6 +82,9 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel private final SortedSet unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet()); + /** Whether the confirm select method has been successfully activated */ + private boolean confirmSelectActivated = false; + /** Whether any nacks have been received since the last waitForConfirms(). */ private volatile boolean onlyAcksReceived = true; @@ -1553,10 +1556,16 @@ public Tx.RollbackOk txRollback() public Confirm.SelectOk confirmSelect() throws IOException { + if (confirmSelectActivated) { + return new Confirm.SelectOk(); + } + if (nextPublishSeqNo == 0) nextPublishSeqNo = 1; - return (Confirm.SelectOk) + Confirm.SelectOk result = (Confirm.SelectOk) exnWrappingRpc(new Confirm.Select(false)).getMethod(); + confirmSelectActivated = true; + return result; } /** Public API - {@inheritDoc} */ diff --git a/src/test/java/com/rabbitmq/client/test/ChannelNTest.java b/src/test/java/com/rabbitmq/client/test/ChannelNTest.java index 76d44c816e..461f5d712e 100644 --- a/src/test/java/com/rabbitmq/client/test/ChannelNTest.java +++ b/src/test/java/com/rabbitmq/client/test/ChannelNTest.java @@ -15,17 +15,20 @@ package com.rabbitmq.client.test; +import com.rabbitmq.client.Command; import com.rabbitmq.client.Method; +import com.rabbitmq.client.TrafficListener; import com.rabbitmq.client.impl.*; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.io.IOException; +import java.util.concurrent.*; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class ChannelNTest { @@ -81,6 +84,30 @@ public TestConfig(int value, Consumer call) { .forEach(config -> assertThatThrownBy(() -> config.call.apply(config.value)).isInstanceOf(IllegalArgumentException.class)); } + @Test + public void confirmSelectOnlySendsRPCCallOnce() throws Exception { + AMQConnection connection = Mockito.mock(AMQConnection.class); + TrafficListener trafficListener = Mockito.mock(TrafficListener.class); + + Mockito.when(connection.getTrafficListener()).thenReturn(trafficListener); + + ChannelN channel = new ChannelN(connection, 1, consumerWorkService); + + Future future = executorService.submit(() -> { + try { + return channel.confirmSelect(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + channel.handleCompleteInboundCommand(new AMQCommand(new AMQImpl.Confirm.SelectOk())); + + assertNotNull(future.get(1, TimeUnit.SECONDS)); + assertNotNull(channel.confirmSelect()); + Mockito.verify(trafficListener, Mockito.times(1)).write(Mockito.any(Command.class)); + } + interface Consumer { void apply(int value) throws Exception; From f5c26a7975b22a8b73cbde98be9b8cb63c233275 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Faria?= Date: Fri, 9 Jun 2023 10:05:47 +0100 Subject: [PATCH 2/2] Fix flaky test --- .../com/rabbitmq/client/test/ChannelNTest.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/test/java/com/rabbitmq/client/test/ChannelNTest.java b/src/test/java/com/rabbitmq/client/test/ChannelNTest.java index 461f5d712e..24b6d1e941 100644 --- a/src/test/java/com/rabbitmq/client/test/ChannelNTest.java +++ b/src/test/java/com/rabbitmq/client/test/ChannelNTest.java @@ -24,8 +24,8 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import java.io.IOException; -import java.util.concurrent.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -93,17 +93,16 @@ public void confirmSelectOnlySendsRPCCallOnce() throws Exception { ChannelN channel = new ChannelN(connection, 1, consumerWorkService); - Future future = executorService.submit(() -> { + new Thread(() -> { try { - return channel.confirmSelect(); - } catch (IOException e) { + Thread.sleep(15); + channel.handleCompleteInboundCommand(new AMQCommand(new AMQImpl.Confirm.SelectOk())); + } catch (Exception e) { throw new RuntimeException(e); } - }); + }).start(); - channel.handleCompleteInboundCommand(new AMQCommand(new AMQImpl.Confirm.SelectOk())); - - assertNotNull(future.get(1, TimeUnit.SECONDS)); + assertNotNull(channel.confirmSelect()); assertNotNull(channel.confirmSelect()); Mockito.verify(trafficListener, Mockito.times(1)).write(Mockito.any(Command.class)); }