Skip to content

Commit ccaa853

Browse files
artembilangaryrussell
authored andcommitted
GH-8732 Don't remove JDBC message if other groups (#8733)
* GH-8732 Don't remove JDBC message if other groups Fixes #8732 When same message is added into different groups, its record in the `INT_MESSAGE` must remain until the last group is removed * Improve `JdbcMessageStore.DELETE_MESSAGES_FROM_GROUP` SQL to ignore those messages for removal which has other group records in the `INT_GROUP_TO_MESSAGE` table **Cherry-pick to `6.1.x`, `6.0.x` & `5.5.x`** * * Fix `JdbcMessageStore.removeMessage()` and `removeMessagesFromGroup()` to remove from `INT_MESSAGE` only if there is no other records in `INT_GROUP_TO_MESSAGE` * * Improve `MessageStore.removeMessage()` Javadoc mentioning `MessageGroupStore` specifics
1 parent ee7ad14 commit ccaa853

File tree

3 files changed

+56
-12
lines changed

3 files changed

+56
-12
lines changed

spring-integration-core/src/main/java/org/springframework/integration/store/MessageStore.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -62,11 +62,12 @@ public interface MessageStore {
6262
<T> Message<T> addMessage(Message<T> message);
6363

6464
/**
65-
* Remove the Message with the given id from the MessageStore, if present, and return it. If no Message with that id
66-
* is present in the store, this will return <i>null</i>.
67-
*
68-
* @param id THe message identifier.
69-
* @return The message.
65+
* Remove the Message with the given id from the MessageStore, if present, and return it.
66+
* If no Message with that id is present in the store, this will return {@code null}.
67+
* If this method is implemented on a {@link MessageGroupStore},
68+
* the message is removed from the store only if no groups holding this message.
69+
* @param id the message identifier.
70+
* @return the message (if any).
7071
*/
7172
Message<?> removeMessage(UUID id);
7273

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,9 @@ SELECT COUNT(MESSAGE_ID)
164164
DELETE_MESSAGE("""
165165
DELETE from %PREFIX%MESSAGE
166166
where MESSAGE_ID=? and REGION=?
167+
and MESSAGE_ID not in (
168+
SELECT MESSAGE_ID from %PREFIX%GROUP_TO_MESSAGE
169+
where MESSAGE_ID=? and REGION = ?)
167170
"""),
168171

169172
CREATE_MESSAGE("""
@@ -191,7 +194,12 @@ SELECT COUNT(GROUP_KEY)
191194

192195
DELETE_MESSAGES_FROM_GROUP("""
193196
DELETE from %PREFIX%MESSAGE
194-
where MESSAGE_ID in (SELECT MESSAGE_ID from %PREFIX%GROUP_TO_MESSAGE where GROUP_KEY = ? and REGION = ?)
197+
where MESSAGE_ID in (
198+
SELECT MESSAGE_ID from %PREFIX%GROUP_TO_MESSAGE where GROUP_KEY = ? and REGION = ?
199+
and MESSAGE_ID not in (
200+
SELECT MESSAGE_ID from %PREFIX%GROUP_TO_MESSAGE
201+
where GROUP_KEY != ? and REGION = ?)
202+
)
195203
and REGION = ?
196204
"""),
197205

@@ -337,7 +345,8 @@ public Message<?> removeMessage(UUID id) {
337345
if (message == null) {
338346
return null;
339347
}
340-
int updated = this.jdbcTemplate.update(getQuery(Query.DELETE_MESSAGE), getKey(id), this.region);
348+
String key = getKey(id);
349+
int updated = this.jdbcTemplate.update(getQuery(Query.DELETE_MESSAGE), key, this.region, key, this.region);
341350
if (updated != 0) {
342351
return message;
343352
}
@@ -528,15 +537,18 @@ public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messa
528537
(ps, messageToRemove) -> {
529538
ps.setString(1, groupKey); // NOSONAR - magic number
530539
ps.setString(2, getKey(messageToRemove.getHeaders().getId())); // NOSONAR - magic number
531-
ps.setString(3, JdbcMessageStore.this.region); // NOSONAR - magic number
540+
ps.setString(3, this.region); // NOSONAR - magic number
532541
});
533542

534543
this.jdbcTemplate.batchUpdate(getQuery(Query.DELETE_MESSAGE),
535544
messages,
536545
getRemoveBatchSize(),
537546
(ps, messageToRemove) -> {
538-
ps.setString(1, getKey(messageToRemove.getHeaders().getId())); // NOSONAR - magic number
539-
ps.setString(2, JdbcMessageStore.this.region); // NOSONAR - magic number
547+
String key = getKey(messageToRemove.getHeaders().getId());
548+
ps.setString(1, key); // NOSONAR - magic number
549+
ps.setString(2, this.region); // NOSONAR - magic number
550+
ps.setString(3, key); // NOSONAR - magic number
551+
ps.setString(4, this.region); // NOSONAR - magic number
540552
});
541553

542554
updateMessageGroup(groupKey);
@@ -546,7 +558,8 @@ public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messa
546558
public void removeMessageGroup(Object groupId) {
547559
String groupKey = getKey(groupId);
548560

549-
this.jdbcTemplate.update(getQuery(Query.DELETE_MESSAGES_FROM_GROUP), groupKey, this.region, this.region);
561+
this.jdbcTemplate.update(getQuery(Query.DELETE_MESSAGES_FROM_GROUP),
562+
groupKey, this.region, groupKey, this.region, this.region);
550563

551564
if (logger.isDebugEnabled()) {
552565
logger.debug("Removing relationships for the group with group key=" + groupKey);

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/mysql/MySqlJdbcMessageStoreTests.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,36 @@ public void testMessageGroupCondition() {
498498
assertThat(this.messageStore.getMessageGroup(groupId).getCondition()).isEqualTo("testCondition");
499499
}
500500

501+
@Test
502+
public void sameMessageInTwoGroupsNotRemovedByFirstGroup() {
503+
GenericMessage<String> testMessage = new GenericMessage<>("test data");
504+
505+
messageStore.addMessageToGroup("1", testMessage);
506+
messageStore.addMessageToGroup("2", testMessage);
507+
508+
messageStore.removeMessageGroup("1");
509+
510+
assertThat(messageStore.getMessageCount()).isEqualTo(1);
511+
512+
messageStore.removeMessageGroup("2");
513+
514+
assertThat(messageStore.getMessageCount()).isEqualTo(0);
515+
}
516+
517+
@Test
518+
public void removeMessagesFromGroupDontRemoveSameMessageInOtherGroup() {
519+
GenericMessage<String> testMessage = new GenericMessage<>("test data");
520+
521+
messageStore.addMessageToGroup("1", testMessage);
522+
messageStore.addMessageToGroup("2", testMessage);
523+
524+
messageStore.removeMessagesFromGroup("1", testMessage);
525+
526+
assertThat(messageStore.getMessageCount()).isEqualTo(1);
527+
assertThat(messageStore.messageGroupSize("1")).isEqualTo(0);
528+
assertThat(messageStore.messageGroupSize("2")).isEqualTo(1);
529+
}
530+
501531
@Configuration
502532
public static class Config {
503533

0 commit comments

Comments
 (0)