Skip to content

Commit 286c421

Browse files
artembilangaryrussell
authored andcommitted
INT-3387: MessageGroupStore Improvements
JIRA: https://jira.spring.io/browse/INT-3387, https://jira.spring.io/browse/INT-3806 * Introduce ``` MessageGroupStore void addMessagesToGroup(Object groupId, Message<?>... messages); ``` And implement it in all stores. * Use new `addMessagesToGroup` where it is reasonable, e.g. `DelayHandler` * Optimize test-case to use a new store method (where it is possible) * Fix timing delays in the `JdbcMessageStoreTests` * Introduce `PersistentMessageGroup` * Add `AbstractMessageGroupStore#proxyMessageGroupForLazyLoad` to wrap the raw `MessageGroup` to the `PersistentMessageGroup` for lazy-load * Rework `MessageGroupMetadata` do not be `immutable` and allow to store/restore in the `AbstractKeyValueMessageStore` only the `MessageGroupMetadata` * Refactor `ResequencingMessageHandler` and `SequenceSizeReleaseStrategy` a bit for better performance when interact with the `MessageGroup` * Add `AbstractMessageGroupStore#setLazyLoadMessageGroups` to switch off the `lazy-load` behavior and restore the previous full `MessageGroup` logic * Add `What's New` note and `message-store.adoc` paragraph for the lazy-load functionality `GroupType.PERSISTENT` and not lazy by default PR Comments Fix `JdbcMessageStoreTests` timing issues Address PR comments * Add performance test to the `ConfigurableMongoDbMessageGroupStoreTests` * Add JavaDocs for the `MessageGroupFactory` methods * Add `log4j.properties` into the `test` MongoDB module for better traceability * Fix `JdbcMessageStore#getOneMessageFromGroup()` over the `doPollForMessage()` delegation. The `jdbcTemplate.queryForObject()` requires exactly one and only one raw in `resultSet` * Add performance test results into the `message-store.adoc`
1 parent 4e4763d commit 286c421

File tree

34 files changed

+1084
-504
lines changed

34 files changed

+1084
-504
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/CorrelatingMessageBarrier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ protected void handleMessageInternal(Message<?> message) throws Exception {
9595
Object correlationKey = this.correlationStrategy.getCorrelationKey(message);
9696
Object lock = getLock(correlationKey);
9797
synchronized (lock) {
98-
this.store.addMessageToGroup(correlationKey, message);
98+
this.store.addMessagesToGroup(correlationKey, message);
9999
}
100100
if (log.isDebugEnabled()) {
101101
log.debug(String.format("Handled message for key [%s]: %s.", correlationKey, message));

spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import java.util.Collection;
2020

21-
import org.springframework.integration.IntegrationMessageHeaderAccessor;
2221
import org.springframework.integration.store.MessageGroup;
2322
import org.springframework.integration.store.MessageGroupStore;
2423
import org.springframework.integration.store.SimpleMessageStore;
@@ -74,12 +73,9 @@ protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> co
7473

7574
@Override
7675
protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> completedMessages, boolean timeout) {
77-
int size = messageGroup.getMessages().size();
78-
int sequenceSize = 0;
79-
Message<?> message = messageGroup.getOne();
80-
if (message != null) {
81-
sequenceSize = new IntegrationMessageHeaderAccessor(message).getSequenceSize();
82-
}
76+
int size = messageGroup.size();
77+
int sequenceSize = messageGroup.getSequenceSize();
78+
8379
// If there is no sequence then it must be incomplete or unbounded
8480
if (sequenceSize > 0 && sequenceSize == size) {
8581
remove(messageGroup);

spring-integration-core/src/main/java/org/springframework/integration/aggregator/SequenceSizeReleaseStrategy.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
* @author Dave Syer
3737
* @author Iwein Fuld
3838
* @author Oleg Zhurakousky
39+
* @author Artem Bilan
3940
* @author Enrique Rodríguez
4041
*/
4142
public class SequenceSizeReleaseStrategy implements ReleaseStrategy {
@@ -55,9 +56,9 @@ public SequenceSizeReleaseStrategy(boolean releasePartialSequences) {
5556
}
5657

5758
/**
58-
* Flag that determines if partial sequences are allowed. If true then as soon as enough messages arrive that can be
59-
* ordered they will be released, provided they all have sequence numbers greater than those already released.
60-
*
59+
* Flag that determines if partial sequences are allowed. If true then as soon as
60+
* enough messages arrive that can be ordered they will be released, provided they
61+
* all have sequence numbers greater than those already released.
6162
* @param releasePartialSequences true when partial sequences should be released.
6263
*/
6364
public void setReleasePartialSequences(boolean releasePartialSequences) {
@@ -69,13 +70,12 @@ public boolean canRelease(MessageGroup messageGroup) {
6970

7071
boolean canRelease = false;
7172

72-
Collection<Message<?>> messages = messageGroup.getMessages();
73-
74-
if (this.releasePartialSequences && !messages.isEmpty()) {
75-
73+
int size = messageGroup.size();
74+
if (this.releasePartialSequences && size > 0) {
7675
if (logger.isTraceEnabled()) {
7776
logger.trace("Considering partial release of group [" + messageGroup + "]");
7877
}
78+
Collection<Message<?>> messages = messageGroup.getMessages();
7979
Message<?> minMessage = Collections.min(messages, this.comparator);
8080

8181
int nextSequenceNumber = new IntegrationMessageHeaderAccessor(minMessage).getSequenceNumber();
@@ -86,13 +86,11 @@ public boolean canRelease(MessageGroup messageGroup) {
8686
}
8787
}
8888
else {
89-
int size = messages.size();
90-
9189
if (size == 0) {
9290
canRelease = true;
9391
}
9492
else {
95-
int sequenceSize = new IntegrationMessageHeaderAccessor(messageGroup.getOne()).getSequenceSize();
93+
int sequenceSize = messageGroup.getSequenceSize();
9694
// If there is no sequence then it must be incomplete....
9795
if (sequenceSize == size) {
9896
canRelease = true;

spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java

Lines changed: 105 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -94,38 +94,62 @@ public long getMessageCount() {
9494
*/
9595
@Override
9696
public MessageGroup getMessageGroup(Object groupId) {
97-
return buildMessageGroup(groupId, false);
98-
}
97+
MessageGroupMetadata metadata = getGroupMetadata(groupId);
98+
if (metadata != null) {
9999

100+
MessageGroup messageGroup = getMessageGroupFactory()
101+
.create(this, groupId, metadata.getTimestamp(), metadata.isComplete());
102+
messageGroup.setLastModified(metadata.getLastModified());
103+
messageGroup.setLastReleasedMessageSequenceNumber(metadata.getLastReleasedMessageSequenceNumber());
104+
return messageGroup;
105+
}
106+
else {
107+
return new SimpleMessageGroup(groupId);
108+
}
109+
}
100110

101-
/**
102-
* Add a Message to the group with the provided group ID.
103-
*/
104111
@Override
105-
public MessageGroup addMessageToGroup(Object groupId, Message<?> message) {
112+
public MessageGroupMetadata getGroupMetadata(Object groupId) {
106113
Assert.notNull(groupId, "'groupId' must not be null");
107-
Assert.notNull(message, "'message' must not be null");
114+
Object mgm = this.doRetrieve(MESSAGE_GROUP_KEY_PREFIX + groupId);
115+
if (mgm != null) {
116+
Assert.isInstanceOf(MessageGroupMetadata.class, mgm);
117+
return (MessageGroupMetadata) mgm;
118+
}
119+
return null;
120+
}
108121

109-
// add message as is to the MG accessible by the caller
110-
MessageGroup messageGroup = getMessageGroup(groupId);
122+
@Override
123+
public void addMessagesToGroup(Object groupId, Message<?>... messages) {
124+
Assert.notNull(groupId, "'groupId' must not be null");
125+
Assert.notNull(messages, "'messages' must not be null");
111126

112-
messageGroup.add(message);
127+
MessageGroupMetadata metadata = getGroupMetadata(groupId);
128+
SimpleMessageGroup group = null;
129+
if (metadata == null) {
130+
group = new SimpleMessageGroup(groupId);
131+
}
113132

114-
// enrich Message with additional headers and add it to MS
115-
Message<?> enrichedMessage = enrichMessage(message);
133+
for (Message<?> message : messages) {
134+
// enrich Message with additional headers and add it to MS
135+
Message<?> enrichedMessage = enrichMessage(message);
136+
addMessage(enrichedMessage);
137+
if (metadata != null) {
138+
metadata.add(enrichedMessage.getHeaders().getId());
139+
}
140+
else {
141+
group.add(enrichedMessage);
142+
}
143+
}
116144

117-
addMessage(enrichedMessage);
145+
if (group != null) {
146+
metadata = new MessageGroupMetadata(group);
147+
}
118148

119-
// build raw MessageGroup and add enriched Message to it
120-
MessageGroup rawGroup = buildMessageGroup(groupId, true);
121-
rawGroup.setLastModified(System.currentTimeMillis());
122-
rawGroup.add(enrichedMessage);
149+
metadata.setLastModified(System.currentTimeMillis());
123150

124151
// store MessageGroupMetadata built from enriched MG
125-
doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, new MessageGroupMetadata(rawGroup));
126-
127-
// return clean MG
128-
return getMessageGroup(groupId);
152+
doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, metadata);
129153
}
130154

131155
/**
@@ -137,32 +161,17 @@ public MessageGroup removeMessageFromGroup(Object groupId, Message<?> messageToR
137161
Assert.notNull(groupId, "'groupId' must not be null");
138162
Assert.notNull(messageToRemove, "'messageToRemove' must not be null");
139163

140-
// build raw MG
141-
MessageGroup rawGroup = buildMessageGroup(groupId, true);
142-
143-
// create a clean instance of
144-
MessageGroup messageGroup = normalizeSimpleMessageGroup(rawGroup);
145-
146-
147-
Message<?> actualMessageToRemove = null;
148-
149-
for (Message<?> message : rawGroup.getMessages()) {
150-
if (message.getHeaders().getId().equals(messageToRemove.getHeaders().getId())) {
151-
actualMessageToRemove = message;
152-
break;
153-
}
154-
}
155-
156-
if (actualMessageToRemove != null) {
157-
rawGroup.remove(actualMessageToRemove);
158-
removeMessage(messageToRemove.getHeaders().getId());
159-
rawGroup.setLastModified(System.currentTimeMillis());
164+
UUID id = messageToRemove.getHeaders().getId();
165+
removeMessage(id);
160166

161-
doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, new MessageGroupMetadata(rawGroup));
162-
messageGroup = getMessageGroup(groupId);
167+
MessageGroupMetadata metadata = getGroupMetadata(groupId);
168+
if (metadata != null) {
169+
metadata.remove(id);
170+
metadata.setLastModified(System.currentTimeMillis());
171+
doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, metadata);
163172
}
164173

165-
return messageGroup;
174+
return getMessageGroup(groupId);
166175
}
167176

168177

@@ -188,10 +197,12 @@ public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messa
188197
@Override
189198
public void completeGroup(Object groupId) {
190199
Assert.notNull(groupId, "'groupId' must not be null");
191-
MessageGroup messageGroup = buildMessageGroup(groupId, true);
192-
messageGroup.complete();
193-
messageGroup.setLastModified(System.currentTimeMillis());
194-
doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, new MessageGroupMetadata(messageGroup));
200+
MessageGroupMetadata metadata = getGroupMetadata(groupId);
201+
if (metadata != null) {
202+
metadata.complete();
203+
metadata.setLastModified(System.currentTimeMillis());
204+
doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, metadata);
205+
}
195206
}
196207

197208
/**
@@ -215,37 +226,62 @@ public void removeMessageGroup(Object groupId) {
215226
@Override
216227
public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) {
217228
Assert.notNull(groupId, "'groupId' must not be null");
218-
MessageGroup messageGroup = buildMessageGroup(groupId, true);
219-
messageGroup.setLastReleasedMessageSequenceNumber(sequenceNumber);
220-
messageGroup.setLastModified(System.currentTimeMillis());
221-
doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, new MessageGroupMetadata(messageGroup));
229+
MessageGroupMetadata metadata = getGroupMetadata(groupId);
230+
if (metadata == null) {
231+
SimpleMessageGroup messageGroup = new SimpleMessageGroup(groupId);
232+
metadata = new MessageGroupMetadata(messageGroup);
233+
}
234+
metadata.setLastReleasedMessageSequenceNumber(sequenceNumber);
235+
metadata.setLastModified(System.currentTimeMillis());
236+
doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, metadata);
222237
}
223238

224239
@Override
225240
public Message<?> pollMessageFromGroup(Object groupId) {
226-
Assert.notNull(groupId, "'groupId' must not be null");
227-
Object mgm = doRetrieve(MESSAGE_GROUP_KEY_PREFIX + groupId);
228-
if (mgm != null) {
229-
Assert.isInstanceOf(MessageGroupMetadata.class, mgm);
230-
MessageGroupMetadata messageGroupMetadata = (MessageGroupMetadata) mgm;
231-
232-
UUID firstId = messageGroupMetadata.firstId();
241+
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
242+
if (groupMetadata != null) {
243+
UUID firstId = groupMetadata.firstId();
233244
if (firstId != null) {
234-
messageGroupMetadata.remove(firstId);
235-
messageGroupMetadata.setLastModified(System.currentTimeMillis());
236-
doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, messageGroupMetadata);
245+
groupMetadata.remove(firstId);
246+
groupMetadata.setLastModified(System.currentTimeMillis());
247+
doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, groupMetadata);
237248
return removeMessage(firstId);
238249
}
239250
}
240251
return null;
241252
}
242253

254+
@Override
255+
public Message<?> getOneMessageFromGroup(Object groupId) {
256+
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
257+
if (groupMetadata != null) {
258+
UUID messageId = groupMetadata.firstId();
259+
if (messageId != null) {
260+
return getMessage(messageId);
261+
}
262+
}
263+
return null;
264+
}
265+
266+
@Override
267+
public Collection<Message<?>> getMessagesForGroup(Object groupId) {
268+
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
269+
ArrayList<Message<?>> messages = new ArrayList<Message<?>>();
270+
if (groupMetadata != null) {
271+
Iterator<UUID> messageIds = groupMetadata.messageIdIterator();
272+
while (messageIds.hasNext()) {
273+
messages.add(getMessage(messageIds.next()));
274+
}
275+
}
276+
return messages;
277+
}
278+
243279
@Override
244280
@SuppressWarnings("unchecked")
245281
public Iterator<MessageGroup> iterator() {
246282
final Iterator<?> idIterator = normalizeKeys(
247283
(Collection<String>) doListKeys(MESSAGE_GROUP_KEY_PREFIX + "*"))
248-
.iterator();
284+
.iterator();
249285
return new MessageGroupIterator(idIterator);
250286
}
251287

@@ -266,13 +302,13 @@ else if (strKey.startsWith(MESSAGE_KEY_PREFIX)) {
266302

267303
@Override
268304
public int messageGroupSize(Object groupId) {
269-
Object mgm = doRetrieve(MESSAGE_GROUP_KEY_PREFIX + groupId);
305+
MessageGroupMetadata mgm = getGroupMetadata(groupId);
270306
if (mgm != null) {
271-
Assert.isInstanceOf(MessageGroupMetadata.class, mgm);
272-
MessageGroupMetadata messageGroupMetadata = (MessageGroupMetadata) mgm;
273-
return messageGroupMetadata.size();
307+
return mgm.size();
308+
}
309+
else {
310+
return 0;
274311
}
275-
return 0;
276312
}
277313

278314
protected abstract Object doRetrieve(Object id);
@@ -308,48 +344,6 @@ private Message<?> enrichMessage(Message<?> message) {
308344
return enrichedMessage;
309345
}
310346

311-
private MessageGroup buildMessageGroup(Object groupId, boolean raw) {
312-
Assert.notNull(groupId, "'groupId' must not be null");
313-
Object mgm = doRetrieve(MESSAGE_GROUP_KEY_PREFIX + groupId);
314-
if (mgm != null) {
315-
Assert.isInstanceOf(MessageGroupMetadata.class, mgm);
316-
MessageGroupMetadata messageGroupMetadata = (MessageGroupMetadata) mgm;
317-
ArrayList<Message<?>> messages = new ArrayList<Message<?>>();
318-
319-
Iterator<UUID> messageIds = messageGroupMetadata.messageIdIterator();
320-
while (messageIds.hasNext()) {
321-
UUID next = messageIds.next();
322-
if (next != null) {
323-
if (raw) {
324-
messages.add(getRawMessage(next));
325-
}
326-
else {
327-
messages.add(getMessage(next));
328-
}
329-
}
330-
}
331-
332-
MessageGroup messageGroup = getMessageGroupFactory()
333-
.create(messages, groupId, messageGroupMetadata.getTimestamp(), messageGroupMetadata.isComplete());
334-
messageGroup.setLastModified(messageGroupMetadata.getLastModified());
335-
messageGroup.setLastReleasedMessageSequenceNumber(
336-
messageGroupMetadata.getLastReleasedMessageSequenceNumber());
337-
return messageGroup;
338-
}
339-
else {
340-
return getMessageGroupFactory().create(groupId);
341-
}
342-
}
343-
344-
private MessageGroup normalizeSimpleMessageGroup(MessageGroup messageGroup) {
345-
MessageGroup normalizedGroup = getMessageGroupFactory().create(messageGroup.getGroupId());
346-
for (Message<?> message : messageGroup.getMessages()) {
347-
Message<?> normalizedMessage = normalizeMessage(message);
348-
normalizedGroup.add(normalizedMessage);
349-
}
350-
return normalizedGroup;
351-
}
352-
353347
private Message<?> getRawMessage(UUID id) {
354348
Assert.notNull(id, "'id' must not be null");
355349
Object message = doRetrieve(MESSAGE_KEY_PREFIX + id);
@@ -380,4 +374,5 @@ public void remove() {
380374
throw new UnsupportedOperationException();
381375
}
382376
}
377+
383378
}

0 commit comments

Comments
 (0)