From 389038f4086c5b00bcf55ca5ac2c0154f6afa2f3 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 15 Sep 2023 16:35:07 -0400 Subject: [PATCH 1/4] Fix KV Stores for same message in multiple groups If same message is stored into different groups with the same KV store, the removal of one group would lead to removal the message for the other one. * Improve KV Store to save message with the key including a group id * Respectively, refine the removal API to include group id into keys * Also change the `MESSAGE_GROUP_KEY_PREFIX` for group records to `GROUP_OF_MESSAGES_` since the `MESSAGE_` prefix includes group records as well for various operations based on key pattern --- .../store/AbstractKeyValueMessageStore.java | 61 +++++++++---- .../store/HazelcastMessageStoreTests.java | 46 ++++++++-- .../store/RedisMessageGroupStoreTests.java | 90 ++++++++++--------- 3 files changed, 133 insertions(+), 64 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java index 68505a830b9..c9aa6053a73 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import java.util.stream.Stream; import org.springframework.jmx.export.annotation.ManagedAttribute; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.util.Assert; @@ -45,7 +46,7 @@ public abstract class AbstractKeyValueMessageStore extends AbstractMessageGroupS protected static final String MESSAGE_KEY_PREFIX = "MESSAGE_"; - protected static final String MESSAGE_GROUP_KEY_PREFIX = "MESSAGE_GROUP_"; + protected static final String MESSAGE_GROUP_KEY_PREFIX = "GROUP_OF_MESSAGES_"; private final String messagePrefix; @@ -57,9 +58,9 @@ protected AbstractKeyValueMessageStore() { /** * Construct an instance based on the provided prefix for keys to distinguish between - * different store instances in the same target key-value data base. Defaults to an + * different store instances in the same target key-value database. Defaults to an * empty string - no prefix. The actual prefix for messages is - * {@code prefix + MESSAGE_}; for message groups - {@code prefix + MESSAGE_GROUP_} + * {@code prefix + MESSAGE_}; for message groups - {@code prefix + GROUP_OF_MESSAGES_} * @param prefix the prefix to use * @since 4.3.12 */ @@ -71,7 +72,7 @@ protected AbstractKeyValueMessageStore(String prefix) { /** * Return the configured prefix for message keys to distinguish between different - * store instances in the same target key-value data base. Defaults to the + * store instances in the same target key-value database. Defaults to the * {@value MESSAGE_KEY_PREFIX} - without a custom prefix. * @return the prefix for keys * @since 4.3.12 @@ -82,7 +83,7 @@ protected String getMessagePrefix() { /** * Return the configured prefix for message group keys to distinguish between - * different store instances in the same target key-value data base. Defaults to the + * different store instances in the same target key-value database. Defaults to the * {@value MESSAGE_GROUP_KEY_PREFIX} - without custom prefix. * @return the prefix for keys * @since 4.3.12 @@ -140,10 +141,15 @@ public Message addMessage(Message message) { } protected void doAddMessage(Message message) { + doAddMessage(message, null); + } + + protected void doAddMessage(Message message, @Nullable Object groupId) { Assert.notNull(message, "'message' must not be null"); UUID messageId = message.getHeaders().getId(); Assert.notNull(messageId, "Cannot store messages without an ID header"); - doStoreIfAbsent(this.messagePrefix + messageId, new MessageHolder(message)); + String messageKey = this.messagePrefix + (groupId != null ? groupId.toString() + '_' : "") + messageId; + doStoreIfAbsent(messageKey, new MessageHolder(message)); } @Override @@ -211,7 +217,7 @@ public void addMessagesToGroup(Object groupId, Message... messages) { } for (Message message : messages) { - doAddMessage(message); + doAddMessage(message, groupId); if (metadata != null) { metadata.add(message.getHeaders().getId()); } @@ -253,7 +259,7 @@ public void removeMessagesFromGroup(Object groupId, Collection> messa List messageIds = new ArrayList<>(); for (UUID id : ids) { - messageIds.add(this.messagePrefix + id); + messageIds.add(this.messagePrefix + groupId + '_' + id); } doRemoveAll(messageIds); @@ -288,7 +294,7 @@ public void removeMessageGroup(Object groupId) { List messageIds = messageGroupMetadata.getMessageIds() .stream() - .map(id -> this.messagePrefix + id) + .map(id -> this.messagePrefix + groupId + '_' + id) .collect(Collectors.toList()); doRemoveAll(messageIds); @@ -326,24 +332,47 @@ public Message pollMessageFromGroup(Object groupId) { groupMetadata.remove(firstId); groupMetadata.setLastModified(System.currentTimeMillis()); doStore(this.groupPrefix + groupId, groupMetadata); - return removeMessage(firstId); + return removeMessageFromGroup(firstId, groupId); } } return null; } + private Message removeMessageFromGroup(UUID id, Object groupId) { + Assert.notNull(id, "'id' must not be null"); + Object object = doRemove(this.messagePrefix + groupId + '_' + id); + if (object != null) { + return extractMessage(object); + } + else { + return null; + } + } + @Override public Message getOneMessageFromGroup(Object groupId) { MessageGroupMetadata groupMetadata = getGroupMetadata(groupId); if (groupMetadata != null) { UUID messageId = groupMetadata.firstId(); if (messageId != null) { - return getMessage(messageId); + return getMessageFromGroup(messageId, groupId); } } return null; } + @Nullable + private Message getMessageFromGroup(UUID messageId, Object groupId) { + Assert.notNull(messageId, "'messageId' must not be null"); + Object object = doRetrieve(this.messagePrefix + groupId + '_' + messageId); + if (object != null) { + return extractMessage(object); + } + else { + return null; + } + } + @Override public Collection> getMessagesForGroup(Object groupId) { MessageGroupMetadata groupMetadata = getGroupMetadata(groupId); @@ -351,7 +380,7 @@ public Collection> getMessagesForGroup(Object groupId) { if (groupMetadata != null) { Iterator messageIds = groupMetadata.messageIdIterator(); while (messageIds.hasNext()) { - messages.add(getMessage(messageIds.next())); + messages.add(getMessageFromGroup(messageIds.next(), groupId)); } } return messages; @@ -362,7 +391,7 @@ public Stream> streamMessagesForGroup(Object groupId) { return getGroupMetadata(groupId) .getMessageIds() .stream() - .map(this::getMessage); + .map((messageId) -> getMessageFromGroup(messageId, groupId)); } @Override @@ -376,8 +405,8 @@ public Iterator iterator() { private Collection normalizeKeys(Collection keys) { Set normalizedKeys = new HashSet<>(); - for (Object key : keys) { - String strKey = (String) key; + for (String key : keys) { + String strKey = key; if (strKey.startsWith(this.groupPrefix)) { strKey = strKey.replace(this.groupPrefix, ""); } diff --git a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/store/HazelcastMessageStoreTests.java b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/store/HazelcastMessageStoreTests.java index 23cd21d10cf..7ef8d61e178 100644 --- a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/store/HazelcastMessageStoreTests.java +++ b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/store/HazelcastMessageStoreTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,10 +23,10 @@ import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.map.IMap; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.history.MessageHistory; @@ -50,19 +50,19 @@ public class HazelcastMessageStoreTests { private static IMap map; - @BeforeClass + @BeforeAll public static void init() { instance = Hazelcast.newHazelcastInstance(); map = instance.getMap("customTestsMessageStore"); store = new HazelcastMessageStore(map); } - @AfterClass + @AfterAll public static void destroy() { instance.shutdown(); } - @Before + @BeforeEach public void clean() { map.clear(); } @@ -145,4 +145,34 @@ public void messageStoreIterator() { assertThat(groupCount).isEqualTo(1); } + @Test + public void sameMessageInTwoGroupsNotRemovedByFirstGroup() { + GenericMessage testMessage = new GenericMessage<>("test data"); + + store.addMessageToGroup("1", testMessage); + store.addMessageToGroup("2", testMessage); + + store.removeMessageGroup("1"); + + assertThat(store.getMessageCount()).isEqualTo(1); + + store.removeMessageGroup("2"); + + assertThat(store.getMessageCount()).isEqualTo(0); + } + + @Test + public void removeMessagesFromGroupDontRemoveSameMessageInOtherGroup() { + GenericMessage testMessage = new GenericMessage<>("test data"); + + store.addMessageToGroup("1", testMessage); + store.addMessageToGroup("2", testMessage); + + store.removeMessagesFromGroup("1", testMessage); + + assertThat(store.getMessageCount()).isEqualTo(1); + assertThat(store.messageGroupSize("1")).isEqualTo(0); + assertThat(store.messageGroupSize("2")).isEqualTo(1); + } + } diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java index e70bf2c741c..bd84bf0184a 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2007-2022 the original author or authors. + * Copyright 2007-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,6 +56,7 @@ import org.springframework.messaging.support.GenericMessage; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.fail; /** @@ -65,6 +66,7 @@ * @author Artem Vozhdayenko */ class RedisMessageGroupStoreTests implements RedisContainerTest { + private static RedisConnectionFactory redisConnectionFactory; @BeforeAll @@ -74,17 +76,18 @@ static void setupConnection() { private final UUID groupId = UUID.randomUUID(); + RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); + @BeforeEach @AfterEach void setUpTearDown() { StringRedisTemplate template = RedisContainerTest.createStringRedisTemplate(redisConnectionFactory); - template.delete(template.keys("MESSAGE_GROUP_*")); + template.delete(template.keys("GROUP_OF_MESSAGES_*")); + template.delete(template.keys("MESSAGE_*")); } @Test void testNonExistingEmptyMessageGroup() { - RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); - MessageGroup messageGroup = store.getMessageGroup(this.groupId); assertThat(messageGroup).isNotNull(); assertThat(messageGroup).isInstanceOf(SimpleMessageGroup.class); @@ -93,8 +96,6 @@ void testNonExistingEmptyMessageGroup() { @Test void testMessageGroupUpdatedDateChangesWithEachAddedMessage() throws Exception { - RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); - Message message = new GenericMessage<>("Hello"); MessageGroup messageGroup = store.addMessageToGroup(this.groupId, message); assertThat(messageGroup.size()).isEqualTo(1); @@ -117,8 +118,6 @@ void testMessageGroupUpdatedDateChangesWithEachAddedMessage() throws Exception { @Test void testMessageGroupWithAddedMessage() { - RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); - Message message = new GenericMessage<>("Hello"); MessageGroup messageGroup = store.addMessageToGroup(this.groupId, message); assertThat(messageGroup.size()).isEqualTo(1); @@ -132,8 +131,6 @@ void testMessageGroupWithAddedMessage() { @Test void testRemoveMessageGroup() { - RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); - MessageGroup messageGroup = store.getMessageGroup(this.groupId); Message message = new GenericMessage<>("Hello"); messageGroup = store.addMessageToGroup(messageGroup.getGroupId(), message); @@ -157,8 +154,6 @@ void testRemoveMessageGroup() { @Test void testCompleteMessageGroup() { - RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); - MessageGroup messageGroup = store.getMessageGroup(this.groupId); Message message = new GenericMessage<>("Hello"); messageGroup = store.addMessageToGroup(messageGroup.getGroupId(), message); @@ -169,8 +164,6 @@ void testCompleteMessageGroup() { @Test void testLastReleasedSequenceNumber() { - RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); - MessageGroup messageGroup = store.getMessageGroup(this.groupId); Message message = new GenericMessage<>("Hello"); messageGroup = store.addMessageToGroup(messageGroup.getGroupId(), message); @@ -181,8 +174,6 @@ void testLastReleasedSequenceNumber() { @Test void testRemoveMessageFromTheGroup() { - RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); - MessageGroup messageGroup = store.getMessageGroup(this.groupId); Message message = new GenericMessage<>("2"); store.addMessagesToGroup(messageGroup.getGroupId(), new GenericMessage<>("1"), message); @@ -202,8 +193,6 @@ void testRemoveMessageFromTheGroup() { @Test void testWithMessageHistory() { - RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); - Message message = new GenericMessage<>("Hello"); DirectChannel fooChannel = new DirectChannel(); fooChannel.setBeanName("fooChannel"); @@ -227,17 +216,16 @@ void testWithMessageHistory() { @Test void testRemoveNonExistingMessageFromTheGroup() { - RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); - MessageGroup messageGroup = store.getMessageGroup(this.groupId); store.addMessagesToGroup(messageGroup.getGroupId(), new GenericMessage<>("1")); - store.removeMessagesFromGroup(this.groupId, new GenericMessage<>("2")); + assertThatNoException() + .isThrownBy(() -> store.removeMessagesFromGroup(this.groupId, new GenericMessage<>("2"))); } @Test void testRemoveNonExistingMessageFromNonExistingTheGroup() { - RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); - store.removeMessagesFromGroup(this.groupId, new GenericMessage<>("2")); + assertThatNoException() + .isThrownBy(() -> store.removeMessagesFromGroup(this.groupId, new GenericMessage<>("2"))); } @@ -283,14 +271,9 @@ void testIteratorOfMessageGroups() { while (messageGroups.hasNext()) { MessageGroup group = messageGroups.next(); String groupId = (String) group.getGroupId(); - if (groupId.equals("1")) { - assertThat(group.getMessages().size()).isEqualTo(1); - } - else if (groupId.equals("2")) { - assertThat(group.getMessages().size()).isEqualTo(1); - } - else if (groupId.equals("3")) { - assertThat(group.getMessages().size()).isEqualTo(2); + switch (groupId) { + case "1", "2" -> assertThat(group.getMessages().size()).isEqualTo(1); + case "3" -> assertThat(group.getMessages().size()).isEqualTo(2); } counter++; } @@ -390,25 +373,22 @@ void testWithAggregatorWithShutdown() { @Test void testAddAndRemoveMessagesFromMessageGroup() { - RedisMessageStore messageStore = new RedisMessageStore(redisConnectionFactory); - List> messages = new ArrayList>(); + List> messages = new ArrayList<>(); for (int i = 0; i < 25; i++) { Message message = MessageBuilder.withPayload("foo").setCorrelationId(this.groupId).build(); - messageStore.addMessagesToGroup(this.groupId, message); + store.addMessagesToGroup(this.groupId, message); messages.add(message); } - MessageGroup group = messageStore.getMessageGroup(this.groupId); + MessageGroup group = store.getMessageGroup(this.groupId); assertThat(group.size()).isEqualTo(25); - messageStore.removeMessagesFromGroup(this.groupId, messages); - group = messageStore.getMessageGroup(this.groupId); + store.removeMessagesFromGroup(this.groupId, messages); + group = store.getMessageGroup(this.groupId); assertThat(group.size()).isZero(); - messageStore.removeMessageGroup(this.groupId); + store.removeMessageGroup(this.groupId); } @Test void testJsonSerialization() { - RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); - ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper(); GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer(mapper); @@ -476,6 +456,36 @@ void testJsonSerialization() { assertThat(messageGroup.getMessages().iterator().next()).isEqualTo(fooMessage); } + @Test + public void sameMessageInTwoGroupsNotRemovedByFirstGroup() { + GenericMessage testMessage = new GenericMessage<>("test data"); + + store.addMessageToGroup("1", testMessage); + store.addMessageToGroup("2", testMessage); + + store.removeMessageGroup("1"); + + assertThat(store.getMessageCount()).isEqualTo(1); + + store.removeMessageGroup("2"); + + assertThat(store.getMessageCount()).isEqualTo(0); + } + + @Test + public void removeMessagesFromGroupDontRemoveSameMessageInOtherGroup() { + GenericMessage testMessage = new GenericMessage<>("test data"); + + store.addMessageToGroup("1", testMessage); + store.addMessageToGroup("2", testMessage); + + store.removeMessagesFromGroup("1", testMessage); + + assertThat(store.getMessageCount()).isEqualTo(1); + assertThat(store.messageGroupSize("1")).isEqualTo(0); + assertThat(store.messageGroupSize("2")).isEqualTo(1); + } + private static class Foo { private String foo; From dbcb8ca0e63fa0af11fcbc3bfb2a2b7aaf34fbea Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 18 Sep 2023 09:09:21 -0400 Subject: [PATCH 2/4] * Use `[^GROUP_]` in the pattern for messages count **Cherry-pick to `6.1.x`** --- .../integration/store/AbstractKeyValueMessageStore.java | 7 +++---- .../integration/hazelcast/store/HazelcastMessageStore.java | 6 +++--- .../hazelcast/store/HazelcastMessageStoreTests.java | 2 -- .../redis/store/RedisMessageGroupStoreTests.java | 1 - 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java index c9aa6053a73..2b9a9e11977 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java @@ -46,7 +46,7 @@ public abstract class AbstractKeyValueMessageStore extends AbstractMessageGroupS protected static final String MESSAGE_KEY_PREFIX = "MESSAGE_"; - protected static final String MESSAGE_GROUP_KEY_PREFIX = "GROUP_OF_MESSAGES_"; + protected static final String MESSAGE_GROUP_KEY_PREFIX = "MESSAGE_GROUP_"; private final String messagePrefix; @@ -77,7 +77,7 @@ protected AbstractKeyValueMessageStore(String prefix) { * @return the prefix for keys * @since 4.3.12 */ - protected String getMessagePrefix() { + public String getMessagePrefix() { return this.messagePrefix; } @@ -167,11 +167,10 @@ public Message removeMessage(UUID id) { @Override @ManagedAttribute public long getMessageCount() { - Collection messageIds = doListKeys(this.messagePrefix + '*'); + Collection messageIds = doListKeys(this.messagePrefix + "[^GROUP_]*"); return (messageIds != null) ? messageIds.size() : 0; } - // MessageGroupStore methods /** diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/store/HazelcastMessageStore.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/store/HazelcastMessageStore.java index a6e3cb51db9..c804cfc5df5 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/store/HazelcastMessageStore.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/store/HazelcastMessageStore.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -76,8 +76,8 @@ protected Object doRemove(Object id) { @Override protected Collection doListKeys(String keyPattern) { Assert.hasText(keyPattern, "'keyPattern' must not be empty"); - return this.map.keySet(Predicates.like(QueryConstants.KEY_ATTRIBUTE_NAME.value(), - keyPattern.replaceAll("\\*", "%"))); + return this.map.keySet(Predicates.regex(QueryConstants.KEY_ATTRIBUTE_NAME.value(), + keyPattern.replaceAll("\\*", ".+"))); } } diff --git a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/store/HazelcastMessageStoreTests.java b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/store/HazelcastMessageStoreTests.java index 7ef8d61e178..d2b88a377e6 100644 --- a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/store/HazelcastMessageStoreTests.java +++ b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/store/HazelcastMessageStoreTests.java @@ -69,7 +69,6 @@ public void clean() { @Test public void testWithMessageHistory() { - Message message = new GenericMessage<>("Hello"); DirectChannel fooChannel = new DirectChannel(); fooChannel.setBeanName("fooChannel"); @@ -107,7 +106,6 @@ public void testAddAndRemoveMessagesFromMessageGroup() { @Test public void addAndGetMessage() { - Message message = MessageBuilder.withPayload("test").build(); store.addMessage(message); Message retrieved = store.getMessage(message.getHeaders().getId()); diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java index bd84bf0184a..a74daa58941 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java @@ -82,7 +82,6 @@ static void setupConnection() { @AfterEach void setUpTearDown() { StringRedisTemplate template = RedisContainerTest.createStringRedisTemplate(redisConnectionFactory); - template.delete(template.keys("GROUP_OF_MESSAGES_*")); template.delete(template.keys("MESSAGE_*")); } From 9cf5c00d7906d5b0a46b590b15eed5bb5d7877fe Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 18 Sep 2023 10:01:10 -0400 Subject: [PATCH 3/4] * Fix MongoDB MS for message removal logic --- .../ConfigurableMongoDbMessageStore.java | 5 +++-- .../mongodb/store/MongoDbMessageStore.java | 9 ++++---- ...AbstractMongoDbMessageGroupStoreTests.java | 22 +++++++++++++++++++ 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java index cb6c809f8fb..df6ab0363cb 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2022 the original author or authors. + * Copyright 2013-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -101,7 +101,8 @@ public Message addMessage(Message message) { @Override public Message removeMessage(UUID id) { Assert.notNull(id, "'id' must not be null"); - Query query = Query.query(Criteria.where(MessageDocumentFields.MESSAGE_ID).is(id)); + Query query = Query.query(Criteria.where(MessageDocumentFields.MESSAGE_ID).is(id) + .and(MessageDocumentFields.GROUP_ID).exists(false)); MessageDocument document = getMongoTemplate().findAndRemove(query, MessageDocument.class, this.collectionName); return (document != null) ? document.getMessage() : null; } diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java index 9696ffa387e..81e68d6620a 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -258,14 +258,15 @@ public MessageMetadata getMessageMetadata(UUID id) { @Override @ManagedAttribute public long getMessageCount() { - return this.template.getCollection(this.collectionName).countDocuments(); + Query query = Query.query(Criteria.where("headers.id").exists(true).and(GROUP_ID_KEY).exists(false)); + return this.template.getCollection(this.collectionName).countDocuments(query.getQueryObject()); } @Override public Message removeMessage(UUID id) { Assert.notNull(id, "'id' must not be null"); - MessageWrapper messageWrapper = - this.template.findAndRemove(whereMessageIdIs(id), MessageWrapper.class, this.collectionName); + Query query = Query.query(Criteria.where("headers.id").is(id).and(GROUP_ID_KEY).exists(false)); + MessageWrapper messageWrapper = this.template.findAndRemove(query, MessageWrapper.class, this.collectionName); return (messageWrapper != null ? messageWrapper.getMessage() : null); } diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java index d8385cc1781..0c94b46eb13 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java @@ -536,6 +536,28 @@ void testWithMessageHistory() { .containsEntry("type", "channel"); } + @Test + public void removeMessageDoesntRemoveSameMessageInTheGroup() { + GenericMessage testMessage = new GenericMessage<>("test data"); + + MessageGroupStore store = getMessageGroupStore(); + + store.addMessageToGroup("1", testMessage); + + MessageStore messageStore = (MessageStore) store; + + messageStore.removeMessage(testMessage.getHeaders().getId()); + + assertThat(messageStore.getMessageCount()).isEqualTo(0); + assertThat(store.getMessageCountForAllMessageGroups()).isEqualTo(1); + assertThat(store.messageGroupSize("1")).isEqualTo(1); + + store.removeMessageGroup("1"); + + assertThat(store.getMessageCountForAllMessageGroups()).isEqualTo(0); + assertThat(store.messageGroupSize("1")).isEqualTo(0); + } + protected abstract MessageGroupStore getMessageGroupStore(); protected abstract MessageStore getMessageStore(); From cdcdecc8911a39bd3d450011afa33f0099308941 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 19 Sep 2023 10:14:27 -0400 Subject: [PATCH 4/4] * Bring back `GROUP_OF_MESSAGES_` prefix to avoid complex regexp and don't bother for edge cases, where even that regexp may fail --- .../integration/store/AbstractKeyValueMessageStore.java | 4 ++-- .../integration/hazelcast/store/HazelcastMessageStore.java | 4 ++-- .../integration/redis/store/RedisMessageGroupStoreTests.java | 1 + 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java index 2b9a9e11977..1f5eac29f68 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java @@ -46,7 +46,7 @@ public abstract class AbstractKeyValueMessageStore extends AbstractMessageGroupS protected static final String MESSAGE_KEY_PREFIX = "MESSAGE_"; - protected static final String MESSAGE_GROUP_KEY_PREFIX = "MESSAGE_GROUP_"; + protected static final String MESSAGE_GROUP_KEY_PREFIX = "GROUP_OF_MESSAGES_"; private final String messagePrefix; @@ -167,7 +167,7 @@ public Message removeMessage(UUID id) { @Override @ManagedAttribute public long getMessageCount() { - Collection messageIds = doListKeys(this.messagePrefix + "[^GROUP_]*"); + Collection messageIds = doListKeys(this.messagePrefix + '*'); return (messageIds != null) ? messageIds.size() : 0; } diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/store/HazelcastMessageStore.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/store/HazelcastMessageStore.java index c804cfc5df5..b58a20927cf 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/store/HazelcastMessageStore.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/store/HazelcastMessageStore.java @@ -76,8 +76,8 @@ protected Object doRemove(Object id) { @Override protected Collection doListKeys(String keyPattern) { Assert.hasText(keyPattern, "'keyPattern' must not be empty"); - return this.map.keySet(Predicates.regex(QueryConstants.KEY_ATTRIBUTE_NAME.value(), - keyPattern.replaceAll("\\*", ".+"))); + return this.map.keySet(Predicates.like(QueryConstants.KEY_ATTRIBUTE_NAME.value(), + keyPattern.replaceAll("\\*", "%"))); } } diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java index a74daa58941..1da003ac884 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java @@ -83,6 +83,7 @@ static void setupConnection() { void setUpTearDown() { StringRedisTemplate template = RedisContainerTest.createStringRedisTemplate(redisConnectionFactory); template.delete(template.keys("MESSAGE_*")); + template.delete(template.keys("GROUP_OF_MESSAGES_*")); } @Test