diff --git a/src/main/java/com/rabbitmq/client/Channel.java b/src/main/java/com/rabbitmq/client/Channel.java index 5ab9123c12..6e72cb2fd9 100644 --- a/src/main/java/com/rabbitmq/client/Channel.java +++ b/src/main/java/com/rabbitmq/client/Channel.java @@ -89,7 +89,7 @@ public interface Channel extends ShutdownNotifier, AutoCloseable { * Forces the channel to close and waits for the close operation to complete. * Any encountered exceptions in the close operation are silently discarded. */ - void abort() throws IOException; + void abort(); /** * Abort this channel. @@ -97,7 +97,7 @@ public interface Channel extends ShutdownNotifier, AutoCloseable { * Forces the channel to close and waits for the close operation to complete. * Any encountered exceptions in the close operation are silently discarded. */ - void abort(int closeCode, String closeMessage) throws IOException; + void abort(int closeCode, String closeMessage); /** * Add a {@link ReturnListener}. diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index 428224a4bc..b922b53da0 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -545,7 +545,6 @@ public void close(int closeCode, String closeMessage) /** Public API - {@inheritDoc} */ @Override public void abort() - throws IOException { abort(AMQP.REPLY_SUCCESS, "OK"); } @@ -553,14 +552,11 @@ public void abort() /** Public API - {@inheritDoc} */ @Override public void abort(int closeCode, String closeMessage) - throws IOException { try { close(closeCode, closeMessage, true, null, true); - } catch (IOException _e) { - /* ignored */ - } catch (TimeoutException _e) { - /* ignored */ + } catch (IOException | TimeoutException _e) { + // abort() shall silently discard any exceptions } } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java index cfba283dd2..0771e21fbb 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java @@ -80,31 +80,35 @@ public void close(int closeCode, String closeMessage) throws IOException, Timeou } @Override - public void abort() throws IOException { - try { - executeAndClean(() -> delegate.abort()); - } catch (TimeoutException e) { - // abort() ignores exceptions - } + public void abort() { + this.delegate.abort(); + this.clean(); } @Override - public void abort(int closeCode, String closeMessage) throws IOException { - try { - executeAndClean(() -> delegate.abort(closeCode, closeMessage)); - } catch (TimeoutException e) { - // abort() ignores exceptions + public void abort(int closeCode, String closeMessage) { + this.delegate.abort(closeCode, closeMessage != null ? closeMessage : ""); + this.clean(); + } + + /** + * Cleans up the channel in the following way: + *

+ * Removes every recorded consumer of the channel and finally unregisters the channel from + * the underlying connection to not process any further traffic. + */ + private void clean() { + for (String consumerTag : Utility.copy(consumerTags)) { + this.deleteRecordedConsumer(consumerTag); } + this.connection.unregisterChannel(this); } private void executeAndClean(IoTimeoutExceptionRunnable callback) throws IOException, TimeoutException { try { callback.run(); } finally { - for (String consumerTag : Utility.copy(consumerTags)) { - this.deleteRecordedConsumer(consumerTag); - } - this.connection.unregisterChannel(this); + this.clean(); } } diff --git a/src/test/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannelTest.java b/src/test/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannelTest.java new file mode 100644 index 0000000000..bd72f31e47 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannelTest.java @@ -0,0 +1,48 @@ +package com.rabbitmq.client.impl.recovery; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public final class AutorecoveringChannelTest { + + private AutorecoveringChannel channel; + + @Mock + private AutorecoveringConnection autorecoveringConnection; + + @Mock + private RecoveryAwareChannelN recoveryAwareChannelN; + + @BeforeEach + void setup() { + MockitoAnnotations.openMocks(this); + this.channel = new AutorecoveringChannel(autorecoveringConnection, recoveryAwareChannelN); + } + + @Test + void abort() { + this.channel.abort(); + verify(recoveryAwareChannelN, times(1)).abort(); + } + + @Test + void abortWithDetails() { + int closeCode = 1; + String closeMessage = "reason"; + this.channel.abort(closeCode, closeMessage); + verify(recoveryAwareChannelN, times(1)).abort(closeCode, closeMessage); + } + + @Test + void abortWithDetailsCloseMessageNull() { + int closeCode = 1; + this.channel.abort(closeCode, null); + verify(recoveryAwareChannelN, times(1)).abort(closeCode, ""); + } + +}