Skip to content

Commit e4c1851

Browse files
authored
Fix ImapIdleChA for scheduling race condition (#8670)
* Fix ImapIdleChA for scheduling race condition The IMAP IDLE is long-lived process and can be blocked waiting for any reply from the server. This way it is not suited to be used in a `TaskScheduler` especially when it has only one thread in its pool in Spring Boot by default. Another concurrent scheduled task is exactly an `ImapMailReceiver.IdleCanceller`. With a single thread in a `TaskScheduler` pool it cannot be reached therefore we never cancel and IDLE task and cannot react to the connection loss properly * Rework the `ImapIdleChannelAdapter` logic to use a regular `Executor` and `while()` loop with a `Thread.sleep()` when we lose connection * Clean up the `ImapMailReceiverTests` from `TaskScheduler` not used anymore. * Expose new `taskExecutor` option in the `ImapIdleChannelAdapterSpec` for Java DSL * Enable `ImapMailReceiverTests.testIdleWithMessageMapping()` with an attempt to see if this fix covers an unclear problem exposed before * * The `testIdleWithMessageMapping()` still fails on GH actions
1 parent 9597b7a commit e4c1851

File tree

3 files changed

+99
-135
lines changed

3 files changed

+99
-135
lines changed

spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java

Lines changed: 84 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717
package org.springframework.integration.mail;
1818

1919
import java.io.Serial;
20-
import java.time.Instant;
2120
import java.util.List;
21+
import java.util.concurrent.Executor;
2222
import java.util.concurrent.ScheduledFuture;
23-
import java.util.concurrent.atomic.AtomicBoolean;
2423
import java.util.function.Consumer;
2524

2625
import jakarta.mail.Folder;
@@ -31,15 +30,13 @@
3130
import org.springframework.beans.factory.BeanClassLoaderAware;
3231
import org.springframework.context.ApplicationEventPublisher;
3332
import org.springframework.context.ApplicationEventPublisherAware;
33+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
3434
import org.springframework.integration.endpoint.MessageProducerSupport;
3535
import org.springframework.integration.mail.event.MailIntegrationEvent;
3636
import org.springframework.integration.transaction.IntegrationResourceHolder;
3737
import org.springframework.integration.transaction.IntegrationResourceHolderSynchronization;
3838
import org.springframework.integration.transaction.TransactionSynchronizationFactory;
3939
import org.springframework.messaging.MessagingException;
40-
import org.springframework.scheduling.TaskScheduler;
41-
import org.springframework.scheduling.Trigger;
42-
import org.springframework.scheduling.TriggerContext;
4340
import org.springframework.transaction.support.TransactionSynchronization;
4441
import org.springframework.transaction.support.TransactionSynchronizationManager;
4542
import org.springframework.util.Assert;
@@ -63,12 +60,10 @@ public class ImapIdleChannelAdapter extends MessageProducerSupport implements Be
6360

6461
private static final int DEFAULT_RECONNECT_DELAY = 10000;
6562

66-
private final ExceptionAwarePeriodicTrigger receivingTaskTrigger = new ExceptionAwarePeriodicTrigger();
67-
68-
private final IdleTask idleTask = new IdleTask();
69-
7063
private final ImapMailReceiver mailReceiver;
7164

65+
private Executor taskExecutor;
66+
7267
private TransactionSynchronizationFactory transactionSynchronizationFactory;
7368

7469
private ClassLoader classLoader;
@@ -100,6 +95,16 @@ public void setAdviceChain(List<Advice> adviceChain) {
10095
this.adviceChain = adviceChain;
10196
}
10297

98+
/**
99+
* Provide a managed {@link Executor} to schedule a receiving IDLE task.
100+
* @param taskExecutor the {@link Executor} to use.
101+
* @since 6.2
102+
*/
103+
public void setTaskExecutor(Executor taskExecutor) {
104+
Assert.notNull(taskExecutor, "'taskExecutor' must not be null");
105+
this.taskExecutor = taskExecutor;
106+
}
107+
103108
/**
104109
* Specify whether the IDLE task should reconnect automatically after
105110
* catching a {@link jakarta.mail.MessagingException} while waiting for messages. The
@@ -139,6 +144,10 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
139144
protected void onInit() {
140145
super.onInit();
141146

147+
if (this.taskExecutor == null) {
148+
this.taskExecutor = new SimpleAsyncTaskExecutor(getBeanName() + "-");
149+
}
150+
142151
Consumer<?> messageSenderToUse = new MessageSender();
143152

144153
if (!CollectionUtils.isEmpty(this.adviceChain)) {
@@ -153,16 +162,9 @@ protected void onInit() {
153162
this.messageSender = (Consumer<Object>) messageSenderToUse;
154163
}
155164

156-
157-
/*
158-
* Lifecycle implementation
159-
*/
160-
161-
@Override // guarded by super#lifecycleLock
165+
@Override
162166
protected void doStart() {
163-
TaskScheduler scheduler = getTaskScheduler();
164-
Assert.notNull(scheduler, "'taskScheduler' must not be null");
165-
this.receivingTask = scheduler.schedule(new ReceivingTask(), this.receivingTaskTrigger);
167+
this.taskExecutor.execute(this::callIdle);
166168
}
167169

168170
@Override
@@ -190,6 +192,70 @@ private void publishException(Exception ex) {
190192
}
191193
}
192194

195+
private void callIdle() {
196+
while (isActive()) {
197+
try {
198+
processIdle();
199+
logger.debug("Task completed successfully. Re-scheduling it again right away.");
200+
}
201+
catch (Exception ex) {
202+
publishException(ex);
203+
if (this.shouldReconnectAutomatically
204+
&& ex.getCause() instanceof jakarta.mail.MessagingException messagingException) {
205+
206+
//run again after a delay
207+
logger.info(messagingException,
208+
() -> "Failed to execute IDLE task. Will attempt to resubmit in "
209+
+ this.reconnectDelay + " milliseconds.");
210+
delayNextIdleCall();
211+
}
212+
else {
213+
logger.warn(ex,
214+
"Failed to execute IDLE task. " +
215+
"Won't resubmit since not a 'shouldReconnectAutomatically' " +
216+
"or not a 'jakarta.mail.MessagingException'");
217+
break;
218+
}
219+
}
220+
}
221+
}
222+
223+
private void processIdle() {
224+
try {
225+
logger.debug("waiting for mail");
226+
this.mailReceiver.waitForNewMessages();
227+
Folder folder = this.mailReceiver.getFolder();
228+
if (folder != null && folder.isOpen() && isRunning()) {
229+
Object[] mailMessages = this.mailReceiver.receive();
230+
logger.debug(() -> "received " + mailMessages.length + " mail messages");
231+
for (Object mailMessage : mailMessages) {
232+
if (isRunning()) {
233+
this.messageSender.accept(mailMessage);
234+
}
235+
}
236+
}
237+
}
238+
catch (jakarta.mail.MessagingException ex) {
239+
logger.warn(ex, "error occurred in idle task");
240+
if (this.shouldReconnectAutomatically) {
241+
throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", ex);
242+
}
243+
else {
244+
throw new MessagingException("Failure in 'idle' task. Will NOT resubmit.", ex);
245+
}
246+
}
247+
}
248+
249+
private void delayNextIdleCall() {
250+
try {
251+
Thread.sleep(this.reconnectDelay);
252+
}
253+
catch (InterruptedException ex) {
254+
Thread.currentThread().interrupt();
255+
throw new IllegalStateException(ex);
256+
}
257+
}
258+
193259
private class MessageSender implements Consumer<Object> {
194260

195261
MessageSender() {
@@ -227,112 +293,6 @@ public void accept(Object mailMessage) {
227293

228294
}
229295

230-
private class ReceivingTask implements Runnable {
231-
232-
ReceivingTask() {
233-
}
234-
235-
@Override
236-
public void run() {
237-
if (isRunning()) {
238-
try {
239-
ImapIdleChannelAdapter.this.idleTask.run();
240-
logger.debug("Task completed successfully. Re-scheduling it again right away.");
241-
}
242-
catch (Exception ex) {
243-
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically
244-
&& ex.getCause() instanceof jakarta.mail.MessagingException messagingException) {
245-
246-
//run again after a delay
247-
logger.info(messagingException,
248-
() -> "Failed to execute IDLE task. Will attempt to resubmit in "
249-
+ ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds.");
250-
ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution();
251-
}
252-
else {
253-
logger.warn(ex,
254-
"Failed to execute IDLE task. " +
255-
"Won't resubmit since not a 'shouldReconnectAutomatically' " +
256-
"or not a 'jakarta.mail.MessagingException'");
257-
ImapIdleChannelAdapter.this.receivingTaskTrigger.stop();
258-
}
259-
publishException(ex);
260-
}
261-
}
262-
}
263-
264-
}
265-
266-
267-
private class IdleTask implements Runnable {
268-
269-
IdleTask() {
270-
}
271-
272-
@Override
273-
public void run() {
274-
if (isRunning()) {
275-
try {
276-
logger.debug("waiting for mail");
277-
ImapIdleChannelAdapter.this.mailReceiver.waitForNewMessages();
278-
Folder folder = ImapIdleChannelAdapter.this.mailReceiver.getFolder();
279-
if (folder != null && folder.isOpen() && isRunning()) {
280-
Object[] mailMessages = ImapIdleChannelAdapter.this.mailReceiver.receive();
281-
logger.debug(() -> "received " + mailMessages.length + " mail messages");
282-
for (Object mailMessage : mailMessages) {
283-
if (isRunning()) {
284-
ImapIdleChannelAdapter.this.messageSender.accept(mailMessage);
285-
}
286-
}
287-
}
288-
}
289-
catch (jakarta.mail.MessagingException ex) {
290-
logger.warn(ex, "error occurred in idle task");
291-
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically) {
292-
throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", ex);
293-
}
294-
else {
295-
throw new MessagingException("Failure in 'idle' task. Will NOT resubmit.", ex);
296-
}
297-
}
298-
}
299-
}
300-
301-
}
302-
303-
private class ExceptionAwarePeriodicTrigger implements Trigger {
304-
305-
private final AtomicBoolean delayNextExecution = new AtomicBoolean();
306-
307-
private final AtomicBoolean stop = new AtomicBoolean();
308-
309-
310-
ExceptionAwarePeriodicTrigger() {
311-
}
312-
313-
@Override
314-
public Instant nextExecution(TriggerContext triggerContext) {
315-
if (this.stop.getAndSet(false)) {
316-
return null;
317-
}
318-
if (this.delayNextExecution.getAndSet(false)) {
319-
return Instant.now().plusMillis(ImapIdleChannelAdapter.this.reconnectDelay);
320-
}
321-
else {
322-
return Instant.now();
323-
}
324-
}
325-
326-
void delayNextExecution() {
327-
this.delayNextExecution.set(true);
328-
}
329-
330-
void stop() {
331-
this.stop.set(true);
332-
}
333-
334-
}
335-
336296
public class ImapIdleExceptionEvent extends MailIntegrationEvent {
337297

338298
@Serial

spring-integration-mail/src/main/java/org/springframework/integration/mail/dsl/ImapIdleChannelAdapterSpec.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.Map;
2424
import java.util.Properties;
25+
import java.util.concurrent.Executor;
2526
import java.util.function.Consumer;
2627
import java.util.function.Function;
2728

@@ -395,6 +396,17 @@ public ImapIdleChannelAdapterSpec simpleContent(boolean simpleContent) {
395396
return _this();
396397
}
397398

399+
/**
400+
* Provide a managed {@link Executor} to schedule a receiving IDLE task.
401+
* @param taskExecutor the {@link Executor} to use.
402+
* @return the spec.
403+
* @since 6.2
404+
*/
405+
public ImapIdleChannelAdapterSpec taskExecutor(Executor taskExecutor) {
406+
this.target.setTaskExecutor(taskExecutor);
407+
return this;
408+
}
409+
398410
@Override
399411
public Map<Object, String> getComponentsToRegister() {
400412
return this.componentsToRegister;

spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,6 @@ public void testIdleWithServerGuts(ImapMailReceiver receiver, boolean mapped, bo
228228
ImapIdleChannelAdapter adapter = new ImapIdleChannelAdapter(receiver);
229229
QueueChannel channel = new QueueChannel();
230230
adapter.setOutputChannel(channel);
231-
adapter.setTaskScheduler(taskScheduler);
232231
adapter.setReconnectDelay(10);
233232
adapter.afterPropertiesSet();
234233
adapter.start();
@@ -781,17 +780,14 @@ public void testConnectionException() throws Exception {
781780
theEvent.set(event);
782781
latch.countDown();
783782
});
784-
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
785-
taskScheduler.initialize();
786-
adapter.setTaskScheduler(taskScheduler);
787783
adapter.setReconnectDelay(10);
784+
adapter.afterPropertiesSet();
788785
adapter.start();
789786
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
790787
assertThat(theEvent.get().toString())
791788
.endsWith("cause=java.lang.IllegalStateException: Failure in 'idle' task. Will resubmit.]");
792789

793790
adapter.stop();
794-
taskScheduler.destroy();
795791
}
796792

797793
@Test // see INT-1801
@@ -967,19 +963,15 @@ public void testIdleReconnects() throws Exception {
967963
i.callRealMethod();
968964
throw new FolderClosedException(folder, "test");
969965
}).given(receiver).waitForNewMessages();
970-
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
971-
taskScheduler.initialize();
972-
adapter.setTaskScheduler(taskScheduler);
973966
adapter.setReconnectDelay(10);
974-
adapter.afterPropertiesSet();
975-
final CountDownLatch latch = new CountDownLatch(3);
967+
CountDownLatch latch = new CountDownLatch(3);
976968
adapter.setApplicationEventPublisher(e -> latch.countDown());
969+
adapter.afterPropertiesSet();
977970
adapter.start();
978971
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
979972
verify(store, atLeast(3)).connect();
980973

981974
adapter.stop();
982-
taskScheduler.shutdown();
983975
}
984976

985977
private void setUpScheduler(ImapMailReceiver mailReceiver, ThreadPoolTaskScheduler taskScheduler) {

0 commit comments

Comments
 (0)