From 7d56cb8bf751eea94d822a2c52ad49a0a3e8833e Mon Sep 17 00:00:00 2001 From: abilan Date: Mon, 10 Jul 2023 14:57:45 -0400 Subject: [PATCH 1/2] Fix ImapIdleChA for scheduling race condition The IMAP IDLE is long-lived process and can be blocked waiting for any reply from the server. This way it is not suited to be used in a `TaskScheduler` especially when it has only one thread in its pool in Spring Boot by default. Another concurrent scheduled task is exactly an `ImapMailReceiver.IdleCanceller`. With a single thread in a `TaskScheduler` pool it cannot be reached therefore we never cancel and IDLE task and cannot react to the connection loss properly * Rework the `ImapIdleChannelAdapter` logic to use a regular `Executor` and `while()` loop with a `Thread.sleep()` when we lose connection * Clean up the `ImapMailReceiverTests` from `TaskScheduler` not used anymore. * Expose new `taskExecutor` option in the `ImapIdleChannelAdapterSpec` for Java DSL * Enable `ImapMailReceiverTests.testIdleWithMessageMapping()` with an attempt to see if this fix covers an unclear problem exposed before --- .../mail/ImapIdleChannelAdapter.java | 208 +++++++----------- .../mail/dsl/ImapIdleChannelAdapterSpec.java | 12 + .../mail/ImapMailReceiverTests.java | 16 +- 3 files changed, 99 insertions(+), 137 deletions(-) diff --git a/spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java b/spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java index 5cc04f86738..8782442c06c 100755 --- a/spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java +++ b/spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java @@ -17,10 +17,9 @@ package org.springframework.integration.mail; import java.io.Serial; -import java.time.Instant; import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import jakarta.mail.Folder; @@ -31,15 +30,13 @@ import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.mail.event.MailIntegrationEvent; import org.springframework.integration.transaction.IntegrationResourceHolder; import org.springframework.integration.transaction.IntegrationResourceHolderSynchronization; import org.springframework.integration.transaction.TransactionSynchronizationFactory; import org.springframework.messaging.MessagingException; -import org.springframework.scheduling.TaskScheduler; -import org.springframework.scheduling.Trigger; -import org.springframework.scheduling.TriggerContext; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; @@ -63,12 +60,10 @@ public class ImapIdleChannelAdapter extends MessageProducerSupport implements Be private static final int DEFAULT_RECONNECT_DELAY = 10000; - private final ExceptionAwarePeriodicTrigger receivingTaskTrigger = new ExceptionAwarePeriodicTrigger(); - - private final IdleTask idleTask = new IdleTask(); - private final ImapMailReceiver mailReceiver; + private Executor taskExecutor; + private TransactionSynchronizationFactory transactionSynchronizationFactory; private ClassLoader classLoader; @@ -100,6 +95,16 @@ public void setAdviceChain(List adviceChain) { this.adviceChain = adviceChain; } + /** + * Provide a managed {@link Executor} to schedule a receiving IDLE task. + * @param taskExecutor the {@link Executor} to use. + * @since 6.2 + */ + public void setTaskExecutor(Executor taskExecutor) { + Assert.notNull(taskExecutor, "'taskExecutor' must not be null"); + this.taskExecutor = taskExecutor; + } + /** * Specify whether the IDLE task should reconnect automatically after * catching a {@link jakarta.mail.MessagingException} while waiting for messages. The @@ -139,6 +144,10 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv protected void onInit() { super.onInit(); + if (this.taskExecutor == null) { + this.taskExecutor = new SimpleAsyncTaskExecutor(getBeanName() + "-"); + } + Consumer messageSenderToUse = new MessageSender(); if (!CollectionUtils.isEmpty(this.adviceChain)) { @@ -153,16 +162,9 @@ protected void onInit() { this.messageSender = (Consumer) messageSenderToUse; } - - /* - * Lifecycle implementation - */ - - @Override // guarded by super#lifecycleLock + @Override protected void doStart() { - TaskScheduler scheduler = getTaskScheduler(); - Assert.notNull(scheduler, "'taskScheduler' must not be null"); - this.receivingTask = scheduler.schedule(new ReceivingTask(), this.receivingTaskTrigger); + this.taskExecutor.execute(this::callIdle); } @Override @@ -190,6 +192,70 @@ private void publishException(Exception ex) { } } + private void callIdle() { + while (isActive()) { + try { + processIdle(); + logger.debug("Task completed successfully. Re-scheduling it again right away."); + } + catch (Exception ex) { + publishException(ex); + if (this.shouldReconnectAutomatically + && ex.getCause() instanceof jakarta.mail.MessagingException messagingException) { + + //run again after a delay + logger.info(messagingException, + () -> "Failed to execute IDLE task. Will attempt to resubmit in " + + this.reconnectDelay + " milliseconds."); + delayNextIdleCall(); + } + else { + logger.warn(ex, + "Failed to execute IDLE task. " + + "Won't resubmit since not a 'shouldReconnectAutomatically' " + + "or not a 'jakarta.mail.MessagingException'"); + break; + } + } + } + } + + private void processIdle() { + try { + logger.debug("waiting for mail"); + this.mailReceiver.waitForNewMessages(); + Folder folder = this.mailReceiver.getFolder(); + if (folder != null && folder.isOpen() && isRunning()) { + Object[] mailMessages = this.mailReceiver.receive(); + logger.debug(() -> "received " + mailMessages.length + " mail messages"); + for (Object mailMessage : mailMessages) { + if (isRunning()) { + this.messageSender.accept(mailMessage); + } + } + } + } + catch (jakarta.mail.MessagingException ex) { + logger.warn(ex, "error occurred in idle task"); + if (this.shouldReconnectAutomatically) { + throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", ex); + } + else { + throw new MessagingException("Failure in 'idle' task. Will NOT resubmit.", ex); + } + } + } + + private void delayNextIdleCall() { + try { + Thread.sleep(this.reconnectDelay); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(ex); + } + } + private class MessageSender implements Consumer { MessageSender() { @@ -227,112 +293,6 @@ public void accept(Object mailMessage) { } - private class ReceivingTask implements Runnable { - - ReceivingTask() { - } - - @Override - public void run() { - if (isRunning()) { - try { - ImapIdleChannelAdapter.this.idleTask.run(); - logger.debug("Task completed successfully. Re-scheduling it again right away."); - } - catch (Exception ex) { - if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically - && ex.getCause() instanceof jakarta.mail.MessagingException messagingException) { - - //run again after a delay - logger.info(messagingException, - () -> "Failed to execute IDLE task. Will attempt to resubmit in " - + ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds."); - ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution(); - } - else { - logger.warn(ex, - "Failed to execute IDLE task. " + - "Won't resubmit since not a 'shouldReconnectAutomatically' " + - "or not a 'jakarta.mail.MessagingException'"); - ImapIdleChannelAdapter.this.receivingTaskTrigger.stop(); - } - publishException(ex); - } - } - } - - } - - - private class IdleTask implements Runnable { - - IdleTask() { - } - - @Override - public void run() { - if (isRunning()) { - try { - logger.debug("waiting for mail"); - ImapIdleChannelAdapter.this.mailReceiver.waitForNewMessages(); - Folder folder = ImapIdleChannelAdapter.this.mailReceiver.getFolder(); - if (folder != null && folder.isOpen() && isRunning()) { - Object[] mailMessages = ImapIdleChannelAdapter.this.mailReceiver.receive(); - logger.debug(() -> "received " + mailMessages.length + " mail messages"); - for (Object mailMessage : mailMessages) { - if (isRunning()) { - ImapIdleChannelAdapter.this.messageSender.accept(mailMessage); - } - } - } - } - catch (jakarta.mail.MessagingException ex) { - logger.warn(ex, "error occurred in idle task"); - if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically) { - throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", ex); - } - else { - throw new MessagingException("Failure in 'idle' task. Will NOT resubmit.", ex); - } - } - } - } - - } - - private class ExceptionAwarePeriodicTrigger implements Trigger { - - private final AtomicBoolean delayNextExecution = new AtomicBoolean(); - - private final AtomicBoolean stop = new AtomicBoolean(); - - - ExceptionAwarePeriodicTrigger() { - } - - @Override - public Instant nextExecution(TriggerContext triggerContext) { - if (this.stop.getAndSet(false)) { - return null; - } - if (this.delayNextExecution.getAndSet(false)) { - return Instant.now().plusMillis(ImapIdleChannelAdapter.this.reconnectDelay); - } - else { - return Instant.now(); - } - } - - void delayNextExecution() { - this.delayNextExecution.set(true); - } - - void stop() { - this.stop.set(true); - } - - } - public class ImapIdleExceptionEvent extends MailIntegrationEvent { @Serial diff --git a/spring-integration-mail/src/main/java/org/springframework/integration/mail/dsl/ImapIdleChannelAdapterSpec.java b/spring-integration-mail/src/main/java/org/springframework/integration/mail/dsl/ImapIdleChannelAdapterSpec.java index ba10128c551..37e8e80e9e1 100644 --- a/spring-integration-mail/src/main/java/org/springframework/integration/mail/dsl/ImapIdleChannelAdapterSpec.java +++ b/spring-integration-mail/src/main/java/org/springframework/integration/mail/dsl/ImapIdleChannelAdapterSpec.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.Function; @@ -395,6 +396,17 @@ public ImapIdleChannelAdapterSpec simpleContent(boolean simpleContent) { return _this(); } + /** + * Provide a managed {@link Executor} to schedule a receiving IDLE task. + * @param taskExecutor the {@link Executor} to use. + * @return the spec. + * @since 6.2 + */ + public ImapIdleChannelAdapterSpec taskExecutor(Executor taskExecutor) { + this.target.setTaskExecutor(taskExecutor); + return this; + } + @Override public Map getComponentsToRegister() { return this.componentsToRegister; diff --git a/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java b/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java index f62de485902..a993a4c56b2 100644 --- a/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java +++ b/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java @@ -59,7 +59,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -187,7 +186,6 @@ public void testIdleWithServerDefaultSearch() throws Exception { } @Test - @Disabled("GreenMail server closes socket for some reason") public void testIdleWithMessageMapping() throws Exception { ImapMailReceiver receiver = new ImapMailReceiver("imap://user:pw@localhost:" + imapIdleServer.getImap().getPort() + "/INBOX"); @@ -228,7 +226,6 @@ public void testIdleWithServerGuts(ImapMailReceiver receiver, boolean mapped, bo ImapIdleChannelAdapter adapter = new ImapIdleChannelAdapter(receiver); QueueChannel channel = new QueueChannel(); adapter.setOutputChannel(channel); - adapter.setTaskScheduler(taskScheduler); adapter.setReconnectDelay(10); adapter.afterPropertiesSet(); adapter.start(); @@ -781,17 +778,14 @@ public void testConnectionException() throws Exception { theEvent.set(event); latch.countDown(); }); - ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); - taskScheduler.initialize(); - adapter.setTaskScheduler(taskScheduler); adapter.setReconnectDelay(10); + adapter.afterPropertiesSet(); adapter.start(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(theEvent.get().toString()) .endsWith("cause=java.lang.IllegalStateException: Failure in 'idle' task. Will resubmit.]"); adapter.stop(); - taskScheduler.destroy(); } @Test // see INT-1801 @@ -967,19 +961,15 @@ public void testIdleReconnects() throws Exception { i.callRealMethod(); throw new FolderClosedException(folder, "test"); }).given(receiver).waitForNewMessages(); - ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); - taskScheduler.initialize(); - adapter.setTaskScheduler(taskScheduler); adapter.setReconnectDelay(10); - adapter.afterPropertiesSet(); - final CountDownLatch latch = new CountDownLatch(3); + CountDownLatch latch = new CountDownLatch(3); adapter.setApplicationEventPublisher(e -> latch.countDown()); + adapter.afterPropertiesSet(); adapter.start(); assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); verify(store, atLeast(3)).connect(); adapter.stop(); - taskScheduler.shutdown(); } private void setUpScheduler(ImapMailReceiver mailReceiver, ThreadPoolTaskScheduler taskScheduler) { From 5c44a69bcd3575d94579704060a166b88519c5a0 Mon Sep 17 00:00:00 2001 From: abilan Date: Mon, 10 Jul 2023 15:21:51 -0400 Subject: [PATCH 2/2] * The `testIdleWithMessageMapping()` still fails on GH actions --- .../springframework/integration/mail/ImapMailReceiverTests.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java b/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java index a993a4c56b2..d7167bb696e 100644 --- a/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java +++ b/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java @@ -59,6 +59,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -186,6 +187,7 @@ public void testIdleWithServerDefaultSearch() throws Exception { } @Test + @Disabled("GreenMail server closes socket for some reason") public void testIdleWithMessageMapping() throws Exception { ImapMailReceiver receiver = new ImapMailReceiver("imap://user:pw@localhost:" + imapIdleServer.getImap().getPort() + "/INBOX");