Skip to content

Commit 88d639d

Browse files
mp911dechristophstrobl
authored andcommitted
Switch to Flux.flatMapSequential(…) to prevent backpressure shaping.
We now use Flux.flatMapSequential(…) instead of concatMap as concatMap reduces the request size to 1. The change in backpressure/request size reduces parallelism and impacts the batch size by fetching 2 documents instead of considering the actual backpressure. flatMapSequential doesn't tamper the requested amount while retaining the sequence order. Closes: #4543 Original Pull Request: #4550
1 parent faccaf0 commit 88d639d

File tree

4 files changed

+53
-14
lines changed

4 files changed

+53
-14
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ private Mono<BulkWriteResult> bulkWriteTo(MongoCollection<Document> collection)
216216
collection = collection.withWriteConcern(defaultWriteConcern);
217217
}
218218

219-
Flux<SourceAwareWriteModelHolder> concat = Flux.concat(models).flatMap(it -> {
219+
Flux<SourceAwareWriteModelHolder> concat = Flux.concat(models).flatMapSequential(it -> {
220220

221221
if (it.model()instanceof InsertOneModel<Document> iom) {
222222

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,7 +1041,7 @@ private <O> Flux<O> aggregateAndMap(MongoCollection<Document> collection, List<D
10411041
return (isOutOrMerge ? Flux.from(cursor.toCollection()) : Flux.from(cursor.first())).thenMany(Mono.empty());
10421042
}
10431043

1044-
return Flux.from(cursor).concatMap(readCallback::doWith);
1044+
return Flux.from(cursor).flatMapSequential(readCallback::doWith);
10451045
}
10461046

10471047
@Override
@@ -1088,7 +1088,7 @@ protected <T> Flux<GeoResult<T>> geoNear(NearQuery near, Class<?> entityClass, S
10881088
.withOptions(optionsBuilder.build());
10891089

10901090
return aggregate($geoNear, collection, Document.class) //
1091-
.concatMap(callback::doWith);
1091+
.flatMapSequential(callback::doWith);
10921092
}
10931093

10941094
@Override
@@ -1314,7 +1314,7 @@ public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> batchToSave
13141314

13151315
Assert.notNull(batchToSave, "Batch to insert must not be null");
13161316

1317-
return Flux.from(batchToSave).flatMap(collection -> insert(collection, collectionName));
1317+
return Flux.from(batchToSave).flatMapSequential(collection -> insert(collection, collectionName));
13181318
}
13191319

13201320
@Override
@@ -1382,7 +1382,7 @@ public <T> Flux<T> insertAll(Collection<? extends T> objectsToSave) {
13821382

13831383
@Override
13841384
public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> objectsToSave) {
1385-
return Flux.from(objectsToSave).flatMap(this::insertAll);
1385+
return Flux.from(objectsToSave).flatMapSequential(this::insertAll);
13861386
}
13871387

13881388
protected <T> Flux<T> doInsertAll(Collection<? extends T> listToSave, MongoWriter<Object> writer) {
@@ -1433,7 +1433,7 @@ protected <T> Flux<T> doInsertBatch(String collectionName, Collection<? extends
14331433
return insertDocumentList(collectionName, documents).thenMany(Flux.fromIterable(tuples));
14341434
});
14351435

1436-
return insertDocuments.flatMap(tuple -> {
1436+
return insertDocuments.flatMapSequential(tuple -> {
14371437

14381438
Document document = tuple.getT2();
14391439
Object id = MappedDocument.of(document).getId();
@@ -1590,7 +1590,7 @@ protected Flux<ObjectId> insertDocumentList(String collectionName, List<Document
15901590

15911591
return collectionToUse.insertMany(documents);
15921592

1593-
}).flatMap(s -> {
1593+
}).flatMapSequential(s -> {
15941594

15951595
return Flux.fromStream(documents.stream() //
15961596
.map(MappedDocument::of) //
@@ -2147,7 +2147,7 @@ public <T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, String inpu
21472147
publisher = collation.map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher);
21482148

21492149
return Flux.from(publisher)
2150-
.concatMap(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith);
2150+
.flatMapSequential(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith);
21512151
});
21522152
}
21532153

@@ -2215,7 +2215,7 @@ protected <T> Flux<T> doFindAndDelete(String collectionName, Query query, Class<
22152215

22162216
return Flux.from(flux).collectList().filter(it -> !it.isEmpty())
22172217
.flatMapMany(list -> Flux.from(remove(operations.getByIdInQuery(list), entityClass, collectionName))
2218-
.flatMap(deleteResult -> Flux.fromIterable(list)));
2218+
.flatMapSequential(deleteResult -> Flux.fromIterable(list)));
22192219
}
22202220

22212221
/**
@@ -2674,7 +2674,7 @@ private <T> Flux<T> executeFindMultiInternal(ReactiveCollectionQueryCallback<Doc
26742674

26752675
return createFlux(collectionName, collection -> {
26762676
return Flux.from(preparer.initiateFind(collection, collectionCallback::doInCollection))
2677-
.concatMap(objectCallback::doWith);
2677+
.flatMapSequential(objectCallback::doWith);
26782678
});
26792679
}
26802680

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
109109

110110
Assert.notNull(entityStream, "The given Publisher of entities must not be null");
111111

112-
return Flux.from(entityStream).flatMap(entity -> entityInformation.isNew(entity) ? //
112+
return Flux.from(entityStream).flatMapSequential(entity -> entityInformation.isNew(entity) ? //
113113
mongoOperations.insert(entity, entityInformation.getCollectionName()) : //
114114
mongoOperations.save(entity, entityInformation.getCollectionName()));
115115
}
@@ -167,7 +167,7 @@ public Flux<T> findAllById(Publisher<ID> ids) {
167167

168168
Assert.notNull(ids, "The given Publisher of Id's must not be null");
169169

170-
return Flux.from(ids).buffer().flatMap(this::findAllById);
170+
return Flux.from(ids).buffer().flatMapSequential(this::findAllById);
171171
}
172172

173173
@Override
@@ -297,7 +297,8 @@ public <S extends T> Flux<S> insert(Publisher<S> entities) {
297297

298298
Assert.notNull(entities, "The given Publisher of entities must not be null");
299299

300-
return Flux.from(entities).flatMap(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName()));
300+
return Flux.from(entities)
301+
.flatMapSequential(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName()));
301302
}
302303

303304
// -------------------------------------------------------------------------

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.List;
3737
import java.util.Map;
3838
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.atomic.AtomicLong;
3940
import java.util.stream.Collectors;
4041

4142
import org.assertj.core.api.Assertions;
@@ -698,6 +699,28 @@ void aggreateShouldUseReadReadPreference() {
698699
verify(collection).withReadPreference(ReadPreference.primaryPreferred());
699700
}
700701

702+
@Test // GH-4543
703+
void aggregateDoesNotLimitBackpressure() {
704+
705+
reset(collection);
706+
707+
AtomicLong request = new AtomicLong();
708+
Publisher<Document> realPublisher = Flux.just(new Document()).doOnRequest(request::addAndGet);
709+
710+
doAnswer(invocation -> {
711+
Subscriber<Document> subscriber = invocation.getArgument(0);
712+
realPublisher.subscribe(subscriber);
713+
return null;
714+
}).when(aggregatePublisher).subscribe(any());
715+
716+
when(collection.aggregate(anyList())).thenReturn(aggregatePublisher);
717+
when(collection.aggregate(anyList(), any(Class.class))).thenReturn(aggregatePublisher);
718+
719+
template.aggregate(newAggregation(Sith.class, project("id")), AutogenerateableId.class, Document.class).subscribe();
720+
721+
assertThat(request).hasValueGreaterThan(128);
722+
}
723+
701724
@Test // DATAMONGO-1854
702725
void aggreateShouldUseCollationFromOptionsEvenIfDefaultCollationIsPresent() {
703726

@@ -1262,6 +1285,17 @@ void findShouldInvokeAfterConvertCallbacks() {
12621285
assertThat(results.get(0).id).isEqualTo("after-convert");
12631286
}
12641287

1288+
@Test // GH-4543
1289+
void findShouldNotLimitBackpressure() {
1290+
1291+
AtomicLong request = new AtomicLong();
1292+
stubFindSubscribe(new Document(), request);
1293+
1294+
template.find(new Query(), Person.class).subscribe();
1295+
1296+
assertThat(request).hasValueGreaterThan(128);
1297+
}
1298+
12651299
@Test // DATAMONGO-2479
12661300
void findByIdShouldInvokeAfterConvertCallbacks() {
12671301

@@ -1626,8 +1660,12 @@ void changeStreamOptionFullDocumentBeforeChangeShouldBeApplied() {
16261660
}
16271661

16281662
private void stubFindSubscribe(Document document) {
1663+
stubFindSubscribe(document, new AtomicLong());
1664+
}
1665+
1666+
private void stubFindSubscribe(Document document, AtomicLong request) {
16291667

1630-
Publisher<Document> realPublisher = Flux.just(document);
1668+
Publisher<Document> realPublisher = Flux.just(document).doOnRequest(request::addAndGet);
16311669

16321670
doAnswer(invocation -> {
16331671
Subscriber<Document> subscriber = invocation.getArgument(0);

0 commit comments

Comments
 (0)