Skip to content

Fix ImapIdleChA for scheduling race condition #8670

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
package org.springframework.integration.mail;

import java.io.Serial;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import jakarta.mail.Folder;
Expand All @@ -31,15 +30,13 @@
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mail.event.MailIntegrationEvent;
import org.springframework.integration.transaction.IntegrationResourceHolder;
import org.springframework.integration.transaction.IntegrationResourceHolderSynchronization;
import org.springframework.integration.transaction.TransactionSynchronizationFactory;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
Expand All @@ -63,12 +60,10 @@ public class ImapIdleChannelAdapter extends MessageProducerSupport implements Be

private static final int DEFAULT_RECONNECT_DELAY = 10000;

private final ExceptionAwarePeriodicTrigger receivingTaskTrigger = new ExceptionAwarePeriodicTrigger();

private final IdleTask idleTask = new IdleTask();

private final ImapMailReceiver mailReceiver;

private Executor taskExecutor;

private TransactionSynchronizationFactory transactionSynchronizationFactory;

private ClassLoader classLoader;
Expand Down Expand Up @@ -100,6 +95,16 @@ public void setAdviceChain(List<Advice> adviceChain) {
this.adviceChain = adviceChain;
}

/**
* Provide a managed {@link Executor} to schedule a receiving IDLE task.
* @param taskExecutor the {@link Executor} to use.
* @since 6.2
*/
public void setTaskExecutor(Executor taskExecutor) {
Assert.notNull(taskExecutor, "'taskExecutor' must not be null");
this.taskExecutor = taskExecutor;
}

/**
* Specify whether the IDLE task should reconnect automatically after
* catching a {@link jakarta.mail.MessagingException} while waiting for messages. The
Expand Down Expand Up @@ -139,6 +144,10 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
protected void onInit() {
super.onInit();

if (this.taskExecutor == null) {
this.taskExecutor = new SimpleAsyncTaskExecutor(getBeanName() + "-");
}

Consumer<?> messageSenderToUse = new MessageSender();

if (!CollectionUtils.isEmpty(this.adviceChain)) {
Expand All @@ -153,16 +162,9 @@ protected void onInit() {
this.messageSender = (Consumer<Object>) messageSenderToUse;
}


/*
* Lifecycle implementation
*/

@Override // guarded by super#lifecycleLock
@Override
protected void doStart() {
TaskScheduler scheduler = getTaskScheduler();
Assert.notNull(scheduler, "'taskScheduler' must not be null");
this.receivingTask = scheduler.schedule(new ReceivingTask(), this.receivingTaskTrigger);
this.taskExecutor.execute(this::callIdle);
}

@Override
Expand Down Expand Up @@ -190,6 +192,70 @@ private void publishException(Exception ex) {
}
}

private void callIdle() {
while (isActive()) {
try {
processIdle();
logger.debug("Task completed successfully. Re-scheduling it again right away.");
}
catch (Exception ex) {
publishException(ex);
if (this.shouldReconnectAutomatically
&& ex.getCause() instanceof jakarta.mail.MessagingException messagingException) {

//run again after a delay
logger.info(messagingException,
() -> "Failed to execute IDLE task. Will attempt to resubmit in "
+ this.reconnectDelay + " milliseconds.");
delayNextIdleCall();
}
else {
logger.warn(ex,
"Failed to execute IDLE task. " +
"Won't resubmit since not a 'shouldReconnectAutomatically' " +
"or not a 'jakarta.mail.MessagingException'");
break;
}
}
}
}

private void processIdle() {
try {
logger.debug("waiting for mail");
this.mailReceiver.waitForNewMessages();
Folder folder = this.mailReceiver.getFolder();
if (folder != null && folder.isOpen() && isRunning()) {
Object[] mailMessages = this.mailReceiver.receive();
logger.debug(() -> "received " + mailMessages.length + " mail messages");
for (Object mailMessage : mailMessages) {
if (isRunning()) {
this.messageSender.accept(mailMessage);
}
}
}
}
catch (jakarta.mail.MessagingException ex) {
logger.warn(ex, "error occurred in idle task");
if (this.shouldReconnectAutomatically) {
throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", ex);
}
else {
throw new MessagingException("Failure in 'idle' task. Will NOT resubmit.", ex);
}
}
}

private void delayNextIdleCall() {
try {
Thread.sleep(this.reconnectDelay);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException(ex);
}
}

private class MessageSender implements Consumer<Object> {

MessageSender() {
Expand Down Expand Up @@ -227,112 +293,6 @@ public void accept(Object mailMessage) {

}

private class ReceivingTask implements Runnable {

ReceivingTask() {
}

@Override
public void run() {
if (isRunning()) {
try {
ImapIdleChannelAdapter.this.idleTask.run();
logger.debug("Task completed successfully. Re-scheduling it again right away.");
}
catch (Exception ex) {
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically
&& ex.getCause() instanceof jakarta.mail.MessagingException messagingException) {

//run again after a delay
logger.info(messagingException,
() -> "Failed to execute IDLE task. Will attempt to resubmit in "
+ ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds.");
ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution();
}
else {
logger.warn(ex,
"Failed to execute IDLE task. " +
"Won't resubmit since not a 'shouldReconnectAutomatically' " +
"or not a 'jakarta.mail.MessagingException'");
ImapIdleChannelAdapter.this.receivingTaskTrigger.stop();
}
publishException(ex);
}
}
}

}


private class IdleTask implements Runnable {

IdleTask() {
}

@Override
public void run() {
if (isRunning()) {
try {
logger.debug("waiting for mail");
ImapIdleChannelAdapter.this.mailReceiver.waitForNewMessages();
Folder folder = ImapIdleChannelAdapter.this.mailReceiver.getFolder();
if (folder != null && folder.isOpen() && isRunning()) {
Object[] mailMessages = ImapIdleChannelAdapter.this.mailReceiver.receive();
logger.debug(() -> "received " + mailMessages.length + " mail messages");
for (Object mailMessage : mailMessages) {
if (isRunning()) {
ImapIdleChannelAdapter.this.messageSender.accept(mailMessage);
}
}
}
}
catch (jakarta.mail.MessagingException ex) {
logger.warn(ex, "error occurred in idle task");
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically) {
throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", ex);
}
else {
throw new MessagingException("Failure in 'idle' task. Will NOT resubmit.", ex);
}
}
}
}

}

private class ExceptionAwarePeriodicTrigger implements Trigger {

private final AtomicBoolean delayNextExecution = new AtomicBoolean();

private final AtomicBoolean stop = new AtomicBoolean();


ExceptionAwarePeriodicTrigger() {
}

@Override
public Instant nextExecution(TriggerContext triggerContext) {
if (this.stop.getAndSet(false)) {
return null;
}
if (this.delayNextExecution.getAndSet(false)) {
return Instant.now().plusMillis(ImapIdleChannelAdapter.this.reconnectDelay);
}
else {
return Instant.now();
}
}

void delayNextExecution() {
this.delayNextExecution.set(true);
}

void stop() {
this.stop.set(true);
}

}

public class ImapIdleExceptionEvent extends MailIntegrationEvent {

@Serial
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -395,6 +396,17 @@ public ImapIdleChannelAdapterSpec simpleContent(boolean simpleContent) {
return _this();
}

/**
* Provide a managed {@link Executor} to schedule a receiving IDLE task.
* @param taskExecutor the {@link Executor} to use.
* @return the spec.
* @since 6.2
*/
public ImapIdleChannelAdapterSpec taskExecutor(Executor taskExecutor) {
this.target.setTaskExecutor(taskExecutor);
return this;
}

@Override
public Map<Object, String> getComponentsToRegister() {
return this.componentsToRegister;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ public void testIdleWithServerGuts(ImapMailReceiver receiver, boolean mapped, bo
ImapIdleChannelAdapter adapter = new ImapIdleChannelAdapter(receiver);
QueueChannel channel = new QueueChannel();
adapter.setOutputChannel(channel);
adapter.setTaskScheduler(taskScheduler);
adapter.setReconnectDelay(10);
adapter.afterPropertiesSet();
adapter.start();
Expand Down Expand Up @@ -781,17 +780,14 @@ public void testConnectionException() throws Exception {
theEvent.set(event);
latch.countDown();
});
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.initialize();
adapter.setTaskScheduler(taskScheduler);
adapter.setReconnectDelay(10);
adapter.afterPropertiesSet();
adapter.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(theEvent.get().toString())
.endsWith("cause=java.lang.IllegalStateException: Failure in 'idle' task. Will resubmit.]");

adapter.stop();
taskScheduler.destroy();
}

@Test // see INT-1801
Expand Down Expand Up @@ -967,19 +963,15 @@ public void testIdleReconnects() throws Exception {
i.callRealMethod();
throw new FolderClosedException(folder, "test");
}).given(receiver).waitForNewMessages();
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.initialize();
adapter.setTaskScheduler(taskScheduler);
adapter.setReconnectDelay(10);
adapter.afterPropertiesSet();
final CountDownLatch latch = new CountDownLatch(3);
CountDownLatch latch = new CountDownLatch(3);
adapter.setApplicationEventPublisher(e -> latch.countDown());
adapter.afterPropertiesSet();
adapter.start();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
verify(store, atLeast(3)).connect();

adapter.stop();
taskScheduler.shutdown();
}

private void setUpScheduler(ImapMailReceiver mailReceiver, ThreadPoolTaskScheduler taskScheduler) {
Expand Down