Skip to content

Commit 9cf5c00

Browse files
committed
* Fix MongoDB MS for message removal logic
1 parent dbcb8ca commit 9cf5c00

File tree

3 files changed

+30
-6
lines changed

3 files changed

+30
-6
lines changed

spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2022 the original author or authors.
2+
* Copyright 2013-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -101,7 +101,8 @@ public <T> Message<T> addMessage(Message<T> message) {
101101
@Override
102102
public Message<?> removeMessage(UUID id) {
103103
Assert.notNull(id, "'id' must not be null");
104-
Query query = Query.query(Criteria.where(MessageDocumentFields.MESSAGE_ID).is(id));
104+
Query query = Query.query(Criteria.where(MessageDocumentFields.MESSAGE_ID).is(id)
105+
.and(MessageDocumentFields.GROUP_ID).exists(false));
105106
MessageDocument document = getMongoTemplate().findAndRemove(query, MessageDocument.class, this.collectionName);
106107
return (document != null) ? document.getMessage() : null;
107108
}

spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -258,14 +258,15 @@ public MessageMetadata getMessageMetadata(UUID id) {
258258
@Override
259259
@ManagedAttribute
260260
public long getMessageCount() {
261-
return this.template.getCollection(this.collectionName).countDocuments();
261+
Query query = Query.query(Criteria.where("headers.id").exists(true).and(GROUP_ID_KEY).exists(false));
262+
return this.template.getCollection(this.collectionName).countDocuments(query.getQueryObject());
262263
}
263264

264265
@Override
265266
public Message<?> removeMessage(UUID id) {
266267
Assert.notNull(id, "'id' must not be null");
267-
MessageWrapper messageWrapper =
268-
this.template.findAndRemove(whereMessageIdIs(id), MessageWrapper.class, this.collectionName);
268+
Query query = Query.query(Criteria.where("headers.id").is(id).and(GROUP_ID_KEY).exists(false));
269+
MessageWrapper messageWrapper = this.template.findAndRemove(query, MessageWrapper.class, this.collectionName);
269270
return (messageWrapper != null ? messageWrapper.getMessage() : null);
270271
}
271272

spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,28 @@ void testWithMessageHistory() {
536536
.containsEntry("type", "channel");
537537
}
538538

539+
@Test
540+
public void removeMessageDoesntRemoveSameMessageInTheGroup() {
541+
GenericMessage<String> testMessage = new GenericMessage<>("test data");
542+
543+
MessageGroupStore store = getMessageGroupStore();
544+
545+
store.addMessageToGroup("1", testMessage);
546+
547+
MessageStore messageStore = (MessageStore) store;
548+
549+
messageStore.removeMessage(testMessage.getHeaders().getId());
550+
551+
assertThat(messageStore.getMessageCount()).isEqualTo(0);
552+
assertThat(store.getMessageCountForAllMessageGroups()).isEqualTo(1);
553+
assertThat(store.messageGroupSize("1")).isEqualTo(1);
554+
555+
store.removeMessageGroup("1");
556+
557+
assertThat(store.getMessageCountForAllMessageGroups()).isEqualTo(0);
558+
assertThat(store.messageGroupSize("1")).isEqualTo(0);
559+
}
560+
539561
protected abstract MessageGroupStore getMessageGroupStore();
540562

541563
protected abstract MessageStore getMessageStore();

0 commit comments

Comments
 (0)