Skip to content

Commit a0e3434

Browse files
committed
INT-3529 Polishing
Move group completion to the Resequencer's `afterRelease()` when the group is timed out.
1 parent d0d215a commit a0e3434

File tree

2 files changed

+20
-2
lines changed

2 files changed

+20
-2
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,17 @@ private void discardMessage(Message<?> message) {
483483
*/
484484
protected abstract void afterRelease(MessageGroup group, Collection<Message<?>> completedMessages);
485485

486+
/**
487+
* Subclasses may override if special action is needed because the group was released or discarded
488+
* due to a timeout. By default, {@link #afterRelease(MessageGroup, Collection)} is invoked.
489+
* @param group The group.
490+
* @param completedMessages The completed messages.
491+
* @param timeout True if the release/discard was due to a timeout.
492+
*/
493+
protected void afterRelease(MessageGroup group, Collection<Message<?>> completedMessages, boolean timeout) {
494+
afterRelease(group, completedMessages);
495+
}
496+
486497
protected void forceComplete(MessageGroup group) {
487498

488499
Object correlationKey = group.getGroupId();
@@ -533,9 +544,8 @@ protected void forceComplete(MessageGroup group) {
533544
expireGroup(correlationKey, groupNow);
534545
}
535546
if (!this.expireGroupsUponTimeout) {
536-
afterRelease(groupNow, groupNow.getMessages());
547+
afterRelease(groupNow, groupNow.getMessages(), true);
537548
removeGroup = false;
538-
this.messageStore.completeGroup(correlationKey); // late messages immediately discarded
539549
}
540550
}
541551
else {

spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
6363

6464
@Override
6565
protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> completedMessages) {
66+
afterRelease(messageGroup, completedMessages, false);
67+
}
68+
69+
@Override
70+
protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> completedMessages, boolean timeout) {
6671

6772
int size = messageGroup.getMessages().size();
6873
int sequenceSize = 0;
@@ -82,6 +87,9 @@ protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> co
8287
this.messageStore.removeMessageFromGroup(messageGroup.getGroupId(), msg);
8388
}
8489
}
90+
if (timeout) {
91+
this.messageStore.completeGroup(messageGroup.getGroupId());
92+
}
8593
}
8694
}
8795

0 commit comments

Comments
 (0)