Skip to content

Commit 64f8ed5

Browse files
authored
Fix KV Stores for same message in multiple groups (#8737)
* Fix KV Stores for same message in multiple groups If same message is stored into different groups with the same KV store, the removal of one group would lead to removal the message for the other one. * Improve KV Store to save message with the key including a group id * Respectively, refine the removal API to include group id into keys * Also change the `MESSAGE_GROUP_KEY_PREFIX` for group records to `GROUP_OF_MESSAGES_` since the `MESSAGE_` prefix includes group records as well for various operations based on key pattern * * Use `[^GROUP_]` in the pattern for messages count **Cherry-pick to `6.1.x`** * * Fix MongoDB MS for message removal logic * * Bring back `GROUP_OF_MESSAGES_` prefix to avoid complex regexp and don't bother for edge cases, where even that regexp may fail
1 parent 9ad2b8a commit 64f8ed5

File tree

7 files changed

+165
-75
lines changed

7 files changed

+165
-75
lines changed

spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
2727
import java.util.stream.Stream;
2828

2929
import org.springframework.jmx.export.annotation.ManagedAttribute;
30+
import org.springframework.lang.Nullable;
3031
import org.springframework.messaging.Message;
3132
import org.springframework.util.Assert;
3233

@@ -45,7 +46,7 @@ public abstract class AbstractKeyValueMessageStore extends AbstractMessageGroupS
4546

4647
protected static final String MESSAGE_KEY_PREFIX = "MESSAGE_";
4748

48-
protected static final String MESSAGE_GROUP_KEY_PREFIX = "MESSAGE_GROUP_";
49+
protected static final String MESSAGE_GROUP_KEY_PREFIX = "GROUP_OF_MESSAGES_";
4950

5051
private final String messagePrefix;
5152

@@ -57,9 +58,9 @@ protected AbstractKeyValueMessageStore() {
5758

5859
/**
5960
* Construct an instance based on the provided prefix for keys to distinguish between
60-
* different store instances in the same target key-value data base. Defaults to an
61+
* different store instances in the same target key-value database. Defaults to an
6162
* empty string - no prefix. The actual prefix for messages is
62-
* {@code prefix + MESSAGE_}; for message groups - {@code prefix + MESSAGE_GROUP_}
63+
* {@code prefix + MESSAGE_}; for message groups - {@code prefix + GROUP_OF_MESSAGES_}
6364
* @param prefix the prefix to use
6465
* @since 4.3.12
6566
*/
@@ -71,18 +72,18 @@ protected AbstractKeyValueMessageStore(String prefix) {
7172

7273
/**
7374
* Return the configured prefix for message keys to distinguish between different
74-
* store instances in the same target key-value data base. Defaults to the
75+
* store instances in the same target key-value database. Defaults to the
7576
* {@value MESSAGE_KEY_PREFIX} - without a custom prefix.
7677
* @return the prefix for keys
7778
* @since 4.3.12
7879
*/
79-
protected String getMessagePrefix() {
80+
public String getMessagePrefix() {
8081
return this.messagePrefix;
8182
}
8283

8384
/**
8485
* Return the configured prefix for message group keys to distinguish between
85-
* different store instances in the same target key-value data base. Defaults to the
86+
* different store instances in the same target key-value database. Defaults to the
8687
* {@value MESSAGE_GROUP_KEY_PREFIX} - without custom prefix.
8788
* @return the prefix for keys
8889
* @since 4.3.12
@@ -140,10 +141,15 @@ public <T> Message<T> addMessage(Message<T> message) {
140141
}
141142

142143
protected void doAddMessage(Message<?> message) {
144+
doAddMessage(message, null);
145+
}
146+
147+
protected void doAddMessage(Message<?> message, @Nullable Object groupId) {
143148
Assert.notNull(message, "'message' must not be null");
144149
UUID messageId = message.getHeaders().getId();
145150
Assert.notNull(messageId, "Cannot store messages without an ID header");
146-
doStoreIfAbsent(this.messagePrefix + messageId, new MessageHolder(message));
151+
String messageKey = this.messagePrefix + (groupId != null ? groupId.toString() + '_' : "") + messageId;
152+
doStoreIfAbsent(messageKey, new MessageHolder(message));
147153
}
148154

149155
@Override
@@ -165,7 +171,6 @@ public long getMessageCount() {
165171
return (messageIds != null) ? messageIds.size() : 0;
166172
}
167173

168-
169174
// MessageGroupStore methods
170175

171176
/**
@@ -211,7 +216,7 @@ public void addMessagesToGroup(Object groupId, Message<?>... messages) {
211216
}
212217

213218
for (Message<?> message : messages) {
214-
doAddMessage(message);
219+
doAddMessage(message, groupId);
215220
if (metadata != null) {
216221
metadata.add(message.getHeaders().getId());
217222
}
@@ -253,7 +258,7 @@ public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messa
253258

254259
List<Object> messageIds = new ArrayList<>();
255260
for (UUID id : ids) {
256-
messageIds.add(this.messagePrefix + id);
261+
messageIds.add(this.messagePrefix + groupId + '_' + id);
257262
}
258263

259264
doRemoveAll(messageIds);
@@ -288,7 +293,7 @@ public void removeMessageGroup(Object groupId) {
288293
List<Object> messageIds =
289294
messageGroupMetadata.getMessageIds()
290295
.stream()
291-
.map(id -> this.messagePrefix + id)
296+
.map(id -> this.messagePrefix + groupId + '_' + id)
292297
.collect(Collectors.toList());
293298

294299
doRemoveAll(messageIds);
@@ -326,32 +331,55 @@ public Message<?> pollMessageFromGroup(Object groupId) {
326331
groupMetadata.remove(firstId);
327332
groupMetadata.setLastModified(System.currentTimeMillis());
328333
doStore(this.groupPrefix + groupId, groupMetadata);
329-
return removeMessage(firstId);
334+
return removeMessageFromGroup(firstId, groupId);
330335
}
331336
}
332337
return null;
333338
}
334339

340+
private Message<?> removeMessageFromGroup(UUID id, Object groupId) {
341+
Assert.notNull(id, "'id' must not be null");
342+
Object object = doRemove(this.messagePrefix + groupId + '_' + id);
343+
if (object != null) {
344+
return extractMessage(object);
345+
}
346+
else {
347+
return null;
348+
}
349+
}
350+
335351
@Override
336352
public Message<?> getOneMessageFromGroup(Object groupId) {
337353
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
338354
if (groupMetadata != null) {
339355
UUID messageId = groupMetadata.firstId();
340356
if (messageId != null) {
341-
return getMessage(messageId);
357+
return getMessageFromGroup(messageId, groupId);
342358
}
343359
}
344360
return null;
345361
}
346362

363+
@Nullable
364+
private Message<?> getMessageFromGroup(UUID messageId, Object groupId) {
365+
Assert.notNull(messageId, "'messageId' must not be null");
366+
Object object = doRetrieve(this.messagePrefix + groupId + '_' + messageId);
367+
if (object != null) {
368+
return extractMessage(object);
369+
}
370+
else {
371+
return null;
372+
}
373+
}
374+
347375
@Override
348376
public Collection<Message<?>> getMessagesForGroup(Object groupId) {
349377
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
350378
ArrayList<Message<?>> messages = new ArrayList<>();
351379
if (groupMetadata != null) {
352380
Iterator<UUID> messageIds = groupMetadata.messageIdIterator();
353381
while (messageIds.hasNext()) {
354-
messages.add(getMessage(messageIds.next()));
382+
messages.add(getMessageFromGroup(messageIds.next(), groupId));
355383
}
356384
}
357385
return messages;
@@ -362,7 +390,7 @@ public Stream<Message<?>> streamMessagesForGroup(Object groupId) {
362390
return getGroupMetadata(groupId)
363391
.getMessageIds()
364392
.stream()
365-
.map(this::getMessage);
393+
.map((messageId) -> getMessageFromGroup(messageId, groupId));
366394
}
367395

368396
@Override
@@ -376,8 +404,8 @@ public Iterator<MessageGroup> iterator() {
376404

377405
private Collection<String> normalizeKeys(Collection<String> keys) {
378406
Set<String> normalizedKeys = new HashSet<>();
379-
for (Object key : keys) {
380-
String strKey = (String) key;
407+
for (String key : keys) {
408+
String strKey = key;
381409
if (strKey.startsWith(this.groupPrefix)) {
382410
strKey = strKey.replace(this.groupPrefix, "");
383411
}

spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/store/HazelcastMessageStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/store/HazelcastMessageStoreTests.java

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,10 +23,10 @@
2323
import com.hazelcast.core.Hazelcast;
2424
import com.hazelcast.core.HazelcastInstance;
2525
import com.hazelcast.map.IMap;
26-
import org.junit.AfterClass;
27-
import org.junit.Before;
28-
import org.junit.BeforeClass;
29-
import org.junit.Test;
26+
import org.junit.jupiter.api.AfterAll;
27+
import org.junit.jupiter.api.BeforeAll;
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.api.Test;
3030

3131
import org.springframework.integration.channel.DirectChannel;
3232
import org.springframework.integration.history.MessageHistory;
@@ -50,26 +50,25 @@ public class HazelcastMessageStoreTests {
5050

5151
private static IMap<Object, Object> map;
5252

53-
@BeforeClass
53+
@BeforeAll
5454
public static void init() {
5555
instance = Hazelcast.newHazelcastInstance();
5656
map = instance.getMap("customTestsMessageStore");
5757
store = new HazelcastMessageStore(map);
5858
}
5959

60-
@AfterClass
60+
@AfterAll
6161
public static void destroy() {
6262
instance.shutdown();
6363
}
6464

65-
@Before
65+
@BeforeEach
6666
public void clean() {
6767
map.clear();
6868
}
6969

7070
@Test
7171
public void testWithMessageHistory() {
72-
7372
Message<?> message = new GenericMessage<>("Hello");
7473
DirectChannel fooChannel = new DirectChannel();
7574
fooChannel.setBeanName("fooChannel");
@@ -107,7 +106,6 @@ public void testAddAndRemoveMessagesFromMessageGroup() {
107106

108107
@Test
109108
public void addAndGetMessage() {
110-
111109
Message<?> message = MessageBuilder.withPayload("test").build();
112110
store.addMessage(message);
113111
Message<?> retrieved = store.getMessage(message.getHeaders().getId());
@@ -145,4 +143,34 @@ public void messageStoreIterator() {
145143
assertThat(groupCount).isEqualTo(1);
146144
}
147145

146+
@Test
147+
public void sameMessageInTwoGroupsNotRemovedByFirstGroup() {
148+
GenericMessage<String> testMessage = new GenericMessage<>("test data");
149+
150+
store.addMessageToGroup("1", testMessage);
151+
store.addMessageToGroup("2", testMessage);
152+
153+
store.removeMessageGroup("1");
154+
155+
assertThat(store.getMessageCount()).isEqualTo(1);
156+
157+
store.removeMessageGroup("2");
158+
159+
assertThat(store.getMessageCount()).isEqualTo(0);
160+
}
161+
162+
@Test
163+
public void removeMessagesFromGroupDontRemoveSameMessageInOtherGroup() {
164+
GenericMessage<String> testMessage = new GenericMessage<>("test data");
165+
166+
store.addMessageToGroup("1", testMessage);
167+
store.addMessageToGroup("2", testMessage);
168+
169+
store.removeMessagesFromGroup("1", testMessage);
170+
171+
assertThat(store.getMessageCount()).isEqualTo(1);
172+
assertThat(store.messageGroupSize("1")).isEqualTo(0);
173+
assertThat(store.messageGroupSize("2")).isEqualTo(1);
174+
}
175+
148176
}

spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2022 the original author or authors.
2+
* Copyright 2013-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -101,7 +101,8 @@ public <T> Message<T> addMessage(Message<T> message) {
101101
@Override
102102
public Message<?> removeMessage(UUID id) {
103103
Assert.notNull(id, "'id' must not be null");
104-
Query query = Query.query(Criteria.where(MessageDocumentFields.MESSAGE_ID).is(id));
104+
Query query = Query.query(Criteria.where(MessageDocumentFields.MESSAGE_ID).is(id)
105+
.and(MessageDocumentFields.GROUP_ID).exists(false));
105106
MessageDocument document = getMongoTemplate().findAndRemove(query, MessageDocument.class, this.collectionName);
106107
return (document != null) ? document.getMessage() : null;
107108
}

spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -258,14 +258,15 @@ public MessageMetadata getMessageMetadata(UUID id) {
258258
@Override
259259
@ManagedAttribute
260260
public long getMessageCount() {
261-
return this.template.getCollection(this.collectionName).countDocuments();
261+
Query query = Query.query(Criteria.where("headers.id").exists(true).and(GROUP_ID_KEY).exists(false));
262+
return this.template.getCollection(this.collectionName).countDocuments(query.getQueryObject());
262263
}
263264

264265
@Override
265266
public Message<?> removeMessage(UUID id) {
266267
Assert.notNull(id, "'id' must not be null");
267-
MessageWrapper messageWrapper =
268-
this.template.findAndRemove(whereMessageIdIs(id), MessageWrapper.class, this.collectionName);
268+
Query query = Query.query(Criteria.where("headers.id").is(id).and(GROUP_ID_KEY).exists(false));
269+
MessageWrapper messageWrapper = this.template.findAndRemove(query, MessageWrapper.class, this.collectionName);
269270
return (messageWrapper != null ? messageWrapper.getMessage() : null);
270271
}
271272

spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,28 @@ void testWithMessageHistory() {
536536
.containsEntry("type", "channel");
537537
}
538538

539+
@Test
540+
public void removeMessageDoesntRemoveSameMessageInTheGroup() {
541+
GenericMessage<String> testMessage = new GenericMessage<>("test data");
542+
543+
MessageGroupStore store = getMessageGroupStore();
544+
545+
store.addMessageToGroup("1", testMessage);
546+
547+
MessageStore messageStore = (MessageStore) store;
548+
549+
messageStore.removeMessage(testMessage.getHeaders().getId());
550+
551+
assertThat(messageStore.getMessageCount()).isEqualTo(0);
552+
assertThat(store.getMessageCountForAllMessageGroups()).isEqualTo(1);
553+
assertThat(store.messageGroupSize("1")).isEqualTo(1);
554+
555+
store.removeMessageGroup("1");
556+
557+
assertThat(store.getMessageCountForAllMessageGroups()).isEqualTo(0);
558+
assertThat(store.messageGroupSize("1")).isEqualTo(0);
559+
}
560+
539561
protected abstract MessageGroupStore getMessageGroupStore();
540562

541563
protected abstract MessageStore getMessageStore();

0 commit comments

Comments
 (0)