Skip to content

Commit 2edc462

Browse files
committed
Merge branch 'INT-3528'
2 parents 252c53d + dc2aee5 commit 2edc462

File tree

5 files changed

+27
-24
lines changed

5 files changed

+27
-24
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,11 @@
1313

1414
package org.springframework.integration.aggregator;
1515

16-
import java.util.ArrayList;
1716
import java.util.Collection;
1817
import java.util.Collections;
18+
import java.util.Comparator;
1919
import java.util.Date;
2020
import java.util.HashMap;
21-
import java.util.List;
2221
import java.util.Map;
2322
import java.util.UUID;
2423
import java.util.concurrent.ScheduledFuture;
@@ -81,6 +80,7 @@
8180
* @author Gary Russell
8281
* @author Artem Bilan
8382
* @author David Liu
83+
* @author Enrique Rodríguez
8484
* @since 2.0
8585
*/
8686
public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageProducingHandler
@@ -90,6 +90,8 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
9090

9191
public static final long DEFAULT_SEND_TIMEOUT = 1000L;
9292

93+
private final Comparator<Message<?>> sequenceNumberComparator = new SequenceNumberComparator();
94+
9395
private final Map<UUID, ScheduledFuture<?>> expireGroupScheduledFutures = new HashMap<UUID, ScheduledFuture<?>>();
9496

9597
protected volatile MessageGroupStore messageStore;
@@ -588,11 +590,7 @@ void remove(MessageGroup group) {
588590
}
589591

590592
protected int findLastReleasedSequenceNumber(Object groupId, Collection<Message<?>> partialSequence) {
591-
List<Message<?>> sorted = new ArrayList<Message<?>>(partialSequence);
592-
Collections.sort(sorted, new SequenceNumberComparator());
593-
594-
Message<?> lastReleasedMessage = sorted.get(partialSequence.size() - 1);
595-
593+
Message<?> lastReleasedMessage = Collections.max(partialSequence, this.sequenceNumberComparator);
596594
return new IntegrationMessageHeaderAccessor(lastReleasedMessage).getSequenceNumber();
597595
}
598596

spring-integration-core/src/main/java/org/springframework/integration/aggregator/SequenceSizeReleaseStrategy.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,9 @@
1616

1717
package org.springframework.integration.aggregator;
1818

19-
import java.util.ArrayList;
2019
import java.util.Collection;
2120
import java.util.Collections;
2221
import java.util.Comparator;
23-
import java.util.List;
2422

2523
import org.apache.commons.logging.Log;
2624
import org.apache.commons.logging.LogFactory;
@@ -38,12 +36,13 @@
3836
* @author Dave Syer
3937
* @author Iwein Fuld
4038
* @author Oleg Zhurakousky
39+
* @author Enrique Rodríguez
4140
*/
4241
public class SequenceSizeReleaseStrategy implements ReleaseStrategy {
4342

4443
private static final Log logger = LogFactory.getLog(SequenceSizeReleaseStrategy.class);
4544

46-
private volatile Comparator<Message<?>> comparator = new SequenceNumberComparator();
45+
private final Comparator<Message<?>> comparator = new SequenceNumberComparator();
4746

4847
private volatile boolean releasePartialSequences;
4948

@@ -77,10 +76,9 @@ public boolean canRelease(MessageGroup messageGroup) {
7776
if (logger.isTraceEnabled()) {
7877
logger.trace("Considering partial release of group [" + messageGroup + "]");
7978
}
80-
List<Message<?>> sorted = new ArrayList<Message<?>>(messages);
81-
Collections.sort(sorted, comparator);
79+
Message<?> minMessage = Collections.min(messages, this.comparator);
8280

83-
int nextSequenceNumber = new IntegrationMessageHeaderAccessor(sorted.get(0)).getSequenceNumber();
81+
int nextSequenceNumber = new IntegrationMessageHeaderAccessor(minMessage).getSequenceNumber();
8482
int lastReleasedMessageSequence = messageGroup.getLastReleasedMessageSequenceNumber();
8583

8684
if (nextSequenceNumber - lastReleasedMessageSequence == 1){

spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.channel;
1818

1919
import java.util.Collections;
20+
import java.util.Comparator;
2021
import java.util.List;
2122
import java.util.concurrent.CopyOnWriteArrayList;
2223

@@ -50,12 +51,14 @@
5051
public abstract class AbstractMessageChannel extends IntegrationObjectSupport
5152
implements MessageChannel, TrackableComponent, ChannelInterceptorAware {
5253

54+
private final ChannelInterceptorList interceptors = new ChannelInterceptorList();
55+
56+
private final Comparator<Object> orderComparator = new OrderComparator();
57+
5358
private volatile boolean shouldTrack = false;
5459

5560
private volatile Class<?>[] datatypes = new Class<?>[0];
5661

57-
private final ChannelInterceptorList interceptors = new ChannelInterceptorList();
58-
5962
private volatile String fullChannelName;
6063

6164
private volatile MessageConverter messageConverter;
@@ -96,7 +99,7 @@ public void setDatatypes(Class<?>... datatypes) {
9699
*/
97100
@Override
98101
public void setInterceptors(List<ChannelInterceptor> interceptors) {
99-
Collections.sort(interceptors, new OrderComparator());
102+
Collections.sort(interceptors, this.orderComparator);
100103
this.interceptors.set(interceptors);
101104
}
102105

spring-integration-core/src/main/java/org/springframework/integration/config/GlobalChannelInterceptorProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ private void addMatchingInterceptors(ChannelInterceptorAware channel, String bea
153153
tempInterceptors.add(globalChannelInterceptorWrapper);
154154
}
155155
}
156-
Collections.sort(tempInterceptors, comparator);
156+
Collections.sort(tempInterceptors, this.comparator);
157157
if (!tempInterceptors.isEmpty()) {
158158
for (int i = tempInterceptors.size() - 1; i >= 0; i--) {
159159
ChannelInterceptor channelInterceptor = tempInterceptors.get(i).getChannelInterceptor();

spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisChannelPriorityMessageStore.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
1617
package org.springframework.integration.redis.store;
1718

1819
import java.util.Collections;
@@ -45,6 +46,15 @@
4546
*/
4647
public class RedisChannelPriorityMessageStore extends RedisChannelMessageStore implements PriorityCapableChannelMessageStore {
4748

49+
private final Comparator<String> keysComparator = new Comparator<String>() {
50+
51+
@Override
52+
public int compare(String s1, String s2) {
53+
return s2.compareTo(s1);
54+
}
55+
56+
};
57+
4858
public RedisChannelPriorityMessageStore(RedisConnectionFactory connectionFactory) {
4959
super(connectionFactory);
5060
}
@@ -111,13 +121,7 @@ private List<String> sortedKeys(String groupId) {
111121
Assert.isInstanceOf(String.class, key);
112122
list.add((String) key);
113123
}
114-
Collections.sort(list, new Comparator<String>() {
115-
116-
@Override
117-
public int compare(String s1, String s2) {
118-
return s2.compareTo(s1);
119-
}
120-
});
124+
Collections.sort(list, this.keysComparator);
121125
return list;
122126
}
123127

0 commit comments

Comments
 (0)