diff --git a/pom.xml b/pom.xml index 0c2b25e8ef..a0acdb05db 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 4.4.0-SNAPSHOT + 4.4.x-GH-4804-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index acdc13437d..f327c700c4 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -15,7 +15,7 @@ org.springframework.data spring-data-mongodb-parent - 4.4.0-SNAPSHOT + 4.4.x-GH-4804-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index 2013d103d5..f9f3407008 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -13,7 +13,7 @@ org.springframework.data spring-data-mongodb-parent - 4.4.0-SNAPSHOT + 4.4.x-GH-4804-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index 10ae7a9ead..94d4c9cd18 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -1413,7 +1413,7 @@ protected Flux doInsertAll(Collection listToSave, MongoWrite }); return Flux.fromIterable(elementsByCollection.keySet()) - .flatMap(collectionName -> doInsertBatch(collectionName, elementsByCollection.get(collectionName), writer)); + .concatMap(collectionName -> doInsertBatch(collectionName, elementsByCollection.get(collectionName), writer)); } protected Flux doInsertBatch(String collectionName, Collection batchToSave, diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java index 41f9210b33..cfcc5cb88b 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java @@ -113,8 +113,8 @@ public Flux saveAll(Iterable entities) { Streamable source = Streamable.of(entities); return source.stream().allMatch(entityInformation::isNew) ? // - mongoOperations.insert(source.stream().collect(Collectors.toList()), entityInformation.getCollectionName()) : // - Flux.fromIterable(entities).flatMap(this::save); + insert(entities) : + Flux.fromIterable(entities).concatMap(this::save); } @Override @@ -122,7 +122,7 @@ public Flux saveAll(Publisher entityStream) { Assert.notNull(entityStream, "The given Publisher of entities must not be null"); - return Flux.from(entityStream).flatMapSequential(entity -> entityInformation.isNew(entity) ? // + return Flux.from(entityStream).concatMap(entity -> entityInformation.isNew(entity) ? // mongoOperations.insert(entity, entityInformation.getCollectionName()) : // mongoOperations.save(entity, entityInformation.getCollectionName())); } @@ -296,7 +296,7 @@ public Mono deleteAll(Publisher entityStream) { Optional readPreference = getReadPreference(); return Flux.from(entityStream)// .map(entityInformation::getRequiredId)// - .flatMap(id -> deleteById(id, readPreference))// + .concatMap(id -> deleteById(id, readPreference))// .then(); } @@ -337,8 +337,7 @@ public Flux insert(Iterable entities) { Assert.notNull(entities, "The given Iterable of entities must not be null"); Collection source = toCollection(entities); - - return source.isEmpty() ? Flux.empty() : mongoOperations.insertAll(source); + return source.isEmpty() ? Flux.empty() : mongoOperations.insert(source, entityInformation.getCollectionName()); } @Override @@ -346,8 +345,7 @@ public Flux insert(Publisher entities) { Assert.notNull(entities, "The given Publisher of entities must not be null"); - return Flux.from(entities) - .flatMapSequential(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName())); + return Flux.from(entities).concatMap(this::insert); } // ------------------------------------------------------------------------- diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java index f4d918f31e..88157024ba 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java @@ -453,7 +453,7 @@ public Flux saveWithLogs(Person person) { TransactionalOperator transactionalOperator = TransactionalOperator.create(manager, new DefaultTransactionDefinition()); - return Flux.merge(operations.save(new EventLog(new ObjectId(), "beforeConvert")), // + return Flux.concat(operations.save(new EventLog(new ObjectId(), "beforeConvert")), // operations.save(new EventLog(new ObjectId(), "afterConvert")), // operations.save(new EventLog(new ObjectId(), "beforeInsert")), // operations.save(person), //