Skip to content

Commit 3ddd5e9

Browse files
committed
Fix KV Stores for same message in multiple groups
If same message is stored into different groups with the same KV store, the removal of one group would lead to removal the message for the other one. * Improve KV Store to save message with the key including a group id * Respectively, refine the removal API to include group id into keys * Also change the `MESSAGE_GROUP_KEY_PREFIX` for group records to `GROUP_OF_MESSAGES_` since the `MESSAGE_` prefix includes group records as well for various operations based on key pattern
1 parent 78367f2 commit 3ddd5e9

File tree

3 files changed

+133
-64
lines changed

3 files changed

+133
-64
lines changed

spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
2727
import java.util.stream.Stream;
2828

2929
import org.springframework.jmx.export.annotation.ManagedAttribute;
30+
import org.springframework.lang.Nullable;
3031
import org.springframework.messaging.Message;
3132
import org.springframework.util.Assert;
3233

@@ -45,7 +46,7 @@ public abstract class AbstractKeyValueMessageStore extends AbstractMessageGroupS
4546

4647
protected static final String MESSAGE_KEY_PREFIX = "MESSAGE_";
4748

48-
protected static final String MESSAGE_GROUP_KEY_PREFIX = "MESSAGE_GROUP_";
49+
protected static final String MESSAGE_GROUP_KEY_PREFIX = "GROUP_OF_MESSAGES_";
4950

5051
private final String messagePrefix;
5152

@@ -57,9 +58,9 @@ protected AbstractKeyValueMessageStore() {
5758

5859
/**
5960
* Construct an instance based on the provided prefix for keys to distinguish between
60-
* different store instances in the same target key-value data base. Defaults to an
61+
* different store instances in the same target key-value database. Defaults to an
6162
* empty string - no prefix. The actual prefix for messages is
62-
* {@code prefix + MESSAGE_}; for message groups - {@code prefix + MESSAGE_GROUP_}
63+
* {@code prefix + MESSAGE_}; for message groups - {@code prefix + GROUP_OF_MESSAGES_}
6364
* @param prefix the prefix to use
6465
* @since 4.3.12
6566
*/
@@ -71,7 +72,7 @@ protected AbstractKeyValueMessageStore(String prefix) {
7172

7273
/**
7374
* Return the configured prefix for message keys to distinguish between different
74-
* store instances in the same target key-value data base. Defaults to the
75+
* store instances in the same target key-value database. Defaults to the
7576
* {@value MESSAGE_KEY_PREFIX} - without a custom prefix.
7677
* @return the prefix for keys
7778
* @since 4.3.12
@@ -82,7 +83,7 @@ protected String getMessagePrefix() {
8283

8384
/**
8485
* Return the configured prefix for message group keys to distinguish between
85-
* different store instances in the same target key-value data base. Defaults to the
86+
* different store instances in the same target key-value database. Defaults to the
8687
* {@value MESSAGE_GROUP_KEY_PREFIX} - without custom prefix.
8788
* @return the prefix for keys
8889
* @since 4.3.12
@@ -140,10 +141,15 @@ public <T> Message<T> addMessage(Message<T> message) {
140141
}
141142

142143
protected void doAddMessage(Message<?> message) {
144+
doAddMessage(message, null);
145+
}
146+
147+
protected void doAddMessage(Message<?> message, @Nullable Object groupId) {
143148
Assert.notNull(message, "'message' must not be null");
144149
UUID messageId = message.getHeaders().getId();
145150
Assert.notNull(messageId, "Cannot store messages without an ID header");
146-
doStoreIfAbsent(this.messagePrefix + messageId, new MessageHolder(message));
151+
String messageKey = this.messagePrefix + (groupId != null ? groupId.toString() + '_' : "") + messageId;
152+
doStoreIfAbsent(messageKey, new MessageHolder(message));
147153
}
148154

149155
@Override
@@ -211,7 +217,7 @@ public void addMessagesToGroup(Object groupId, Message<?>... messages) {
211217
}
212218

213219
for (Message<?> message : messages) {
214-
doAddMessage(message);
220+
doAddMessage(message, groupId);
215221
if (metadata != null) {
216222
metadata.add(message.getHeaders().getId());
217223
}
@@ -253,7 +259,7 @@ public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messa
253259

254260
List<Object> messageIds = new ArrayList<>();
255261
for (UUID id : ids) {
256-
messageIds.add(this.messagePrefix + id);
262+
messageIds.add(this.messagePrefix + groupId + '_' + id);
257263
}
258264

259265
doRemoveAll(messageIds);
@@ -288,7 +294,7 @@ public void removeMessageGroup(Object groupId) {
288294
List<Object> messageIds =
289295
messageGroupMetadata.getMessageIds()
290296
.stream()
291-
.map(id -> this.messagePrefix + id)
297+
.map(id -> this.messagePrefix + groupId + '_' + id)
292298
.collect(Collectors.toList());
293299

294300
doRemoveAll(messageIds);
@@ -326,32 +332,55 @@ public Message<?> pollMessageFromGroup(Object groupId) {
326332
groupMetadata.remove(firstId);
327333
groupMetadata.setLastModified(System.currentTimeMillis());
328334
doStore(this.groupPrefix + groupId, groupMetadata);
329-
return removeMessage(firstId);
335+
return removeMessageFromGroup(firstId, groupId);
330336
}
331337
}
332338
return null;
333339
}
334340

341+
private Message<?> removeMessageFromGroup(UUID id, Object groupId) {
342+
Assert.notNull(id, "'id' must not be null");
343+
Object object = doRemove(this.messagePrefix + groupId + '_' + id);
344+
if (object != null) {
345+
return extractMessage(object);
346+
}
347+
else {
348+
return null;
349+
}
350+
}
351+
335352
@Override
336353
public Message<?> getOneMessageFromGroup(Object groupId) {
337354
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
338355
if (groupMetadata != null) {
339356
UUID messageId = groupMetadata.firstId();
340357
if (messageId != null) {
341-
return getMessage(messageId);
358+
return getMessageFromGroup(messageId, groupId);
342359
}
343360
}
344361
return null;
345362
}
346363

364+
@Nullable
365+
private Message<?> getMessageFromGroup(UUID messageId, Object groupId) {
366+
Assert.notNull(messageId, "'messageId' must not be null");
367+
Object object = doRetrieve(this.messagePrefix + groupId + '_' + messageId);
368+
if (object != null) {
369+
return extractMessage(object);
370+
}
371+
else {
372+
return null;
373+
}
374+
}
375+
347376
@Override
348377
public Collection<Message<?>> getMessagesForGroup(Object groupId) {
349378
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
350379
ArrayList<Message<?>> messages = new ArrayList<>();
351380
if (groupMetadata != null) {
352381
Iterator<UUID> messageIds = groupMetadata.messageIdIterator();
353382
while (messageIds.hasNext()) {
354-
messages.add(getMessage(messageIds.next()));
383+
messages.add(getMessageFromGroup(messageIds.next(), groupId));
355384
}
356385
}
357386
return messages;
@@ -362,7 +391,7 @@ public Stream<Message<?>> streamMessagesForGroup(Object groupId) {
362391
return getGroupMetadata(groupId)
363392
.getMessageIds()
364393
.stream()
365-
.map(this::getMessage);
394+
.map((messageId) -> getMessageFromGroup(messageId, groupId));
366395
}
367396

368397
@Override
@@ -376,8 +405,8 @@ public Iterator<MessageGroup> iterator() {
376405

377406
private Collection<String> normalizeKeys(Collection<String> keys) {
378407
Set<String> normalizedKeys = new HashSet<>();
379-
for (Object key : keys) {
380-
String strKey = (String) key;
408+
for (String key : keys) {
409+
String strKey = key;
381410
if (strKey.startsWith(this.groupPrefix)) {
382411
strKey = strKey.replace(this.groupPrefix, "");
383412
}

spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/store/HazelcastMessageStoreTests.java

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,10 +23,10 @@
2323
import com.hazelcast.core.Hazelcast;
2424
import com.hazelcast.core.HazelcastInstance;
2525
import com.hazelcast.map.IMap;
26-
import org.junit.AfterClass;
27-
import org.junit.Before;
28-
import org.junit.BeforeClass;
29-
import org.junit.Test;
26+
import org.junit.jupiter.api.AfterAll;
27+
import org.junit.jupiter.api.BeforeAll;
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.api.Test;
3030

3131
import org.springframework.integration.channel.DirectChannel;
3232
import org.springframework.integration.history.MessageHistory;
@@ -50,19 +50,19 @@ public class HazelcastMessageStoreTests {
5050

5151
private static IMap<Object, Object> map;
5252

53-
@BeforeClass
53+
@BeforeAll
5454
public static void init() {
5555
instance = Hazelcast.newHazelcastInstance();
5656
map = instance.getMap("customTestsMessageStore");
5757
store = new HazelcastMessageStore(map);
5858
}
5959

60-
@AfterClass
60+
@AfterAll
6161
public static void destroy() {
6262
instance.shutdown();
6363
}
6464

65-
@Before
65+
@BeforeEach
6666
public void clean() {
6767
map.clear();
6868
}
@@ -145,4 +145,34 @@ public void messageStoreIterator() {
145145
assertThat(groupCount).isEqualTo(1);
146146
}
147147

148+
@Test
149+
public void sameMessageInTwoGroupsNotRemovedByFirstGroup() {
150+
GenericMessage<String> testMessage = new GenericMessage<>("test data");
151+
152+
store.addMessageToGroup("1", testMessage);
153+
store.addMessageToGroup("2", testMessage);
154+
155+
store.removeMessageGroup("1");
156+
157+
assertThat(store.getMessageCount()).isEqualTo(1);
158+
159+
store.removeMessageGroup("2");
160+
161+
assertThat(store.getMessageCount()).isEqualTo(0);
162+
}
163+
164+
@Test
165+
public void removeMessagesFromGroupDontRemoveSameMessageInOtherGroup() {
166+
GenericMessage<String> testMessage = new GenericMessage<>("test data");
167+
168+
store.addMessageToGroup("1", testMessage);
169+
store.addMessageToGroup("2", testMessage);
170+
171+
store.removeMessagesFromGroup("1", testMessage);
172+
173+
assertThat(store.getMessageCount()).isEqualTo(1);
174+
assertThat(store.messageGroupSize("1")).isEqualTo(0);
175+
assertThat(store.messageGroupSize("2")).isEqualTo(1);
176+
}
177+
148178
}

0 commit comments

Comments
 (0)