diff --git a/pom.xml b/pom.xml
index 0c2b25e8ef..a0acdb05db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.data
spring-data-mongodb-parent
- 4.4.0-SNAPSHOT
+ 4.4.x-GH-4804-SNAPSHOT
pom
Spring Data MongoDB
diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml
index acdc13437d..f327c700c4 100644
--- a/spring-data-mongodb-distribution/pom.xml
+++ b/spring-data-mongodb-distribution/pom.xml
@@ -15,7 +15,7 @@
org.springframework.data
spring-data-mongodb-parent
- 4.4.0-SNAPSHOT
+ 4.4.x-GH-4804-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml
index 2013d103d5..f9f3407008 100644
--- a/spring-data-mongodb/pom.xml
+++ b/spring-data-mongodb/pom.xml
@@ -13,7 +13,7 @@
org.springframework.data
spring-data-mongodb-parent
- 4.4.0-SNAPSHOT
+ 4.4.x-GH-4804-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
index 10ae7a9ead..94d4c9cd18 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
@@ -1413,7 +1413,7 @@ protected Flux doInsertAll(Collection extends T> listToSave, MongoWrite
});
return Flux.fromIterable(elementsByCollection.keySet())
- .flatMap(collectionName -> doInsertBatch(collectionName, elementsByCollection.get(collectionName), writer));
+ .concatMap(collectionName -> doInsertBatch(collectionName, elementsByCollection.get(collectionName), writer));
}
protected Flux doInsertBatch(String collectionName, Collection extends T> batchToSave,
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java
index 41f9210b33..cfcc5cb88b 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java
@@ -113,8 +113,8 @@ public Flux saveAll(Iterable entities) {
Streamable source = Streamable.of(entities);
return source.stream().allMatch(entityInformation::isNew) ? //
- mongoOperations.insert(source.stream().collect(Collectors.toList()), entityInformation.getCollectionName()) : //
- Flux.fromIterable(entities).flatMap(this::save);
+ insert(entities) :
+ Flux.fromIterable(entities).concatMap(this::save);
}
@Override
@@ -122,7 +122,7 @@ public Flux saveAll(Publisher entityStream) {
Assert.notNull(entityStream, "The given Publisher of entities must not be null");
- return Flux.from(entityStream).flatMapSequential(entity -> entityInformation.isNew(entity) ? //
+ return Flux.from(entityStream).concatMap(entity -> entityInformation.isNew(entity) ? //
mongoOperations.insert(entity, entityInformation.getCollectionName()) : //
mongoOperations.save(entity, entityInformation.getCollectionName()));
}
@@ -296,7 +296,7 @@ public Mono deleteAll(Publisher extends T> entityStream) {
Optional readPreference = getReadPreference();
return Flux.from(entityStream)//
.map(entityInformation::getRequiredId)//
- .flatMap(id -> deleteById(id, readPreference))//
+ .concatMap(id -> deleteById(id, readPreference))//
.then();
}
@@ -337,8 +337,7 @@ public Flux insert(Iterable entities) {
Assert.notNull(entities, "The given Iterable of entities must not be null");
Collection source = toCollection(entities);
-
- return source.isEmpty() ? Flux.empty() : mongoOperations.insertAll(source);
+ return source.isEmpty() ? Flux.empty() : mongoOperations.insert(source, entityInformation.getCollectionName());
}
@Override
@@ -346,8 +345,7 @@ public Flux insert(Publisher entities) {
Assert.notNull(entities, "The given Publisher of entities must not be null");
- return Flux.from(entities)
- .flatMapSequential(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName()));
+ return Flux.from(entities).concatMap(this::insert);
}
// -------------------------------------------------------------------------
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java
index f4d918f31e..88157024ba 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java
@@ -453,7 +453,7 @@ public Flux saveWithLogs(Person person) {
TransactionalOperator transactionalOperator = TransactionalOperator.create(manager,
new DefaultTransactionDefinition());
- return Flux.merge(operations.save(new EventLog(new ObjectId(), "beforeConvert")), //
+ return Flux.concat(operations.save(new EventLog(new ObjectId(), "beforeConvert")), //
operations.save(new EventLog(new ObjectId(), "afterConvert")), //
operations.save(new EventLog(new ObjectId(), "beforeInsert")), //
operations.save(person), //