Skip to content

Commit a593444

Browse files
committed
Fix reactive save of Flux.
Original Pull Request #2581 Closes #2576 (cherry picked from commit d6b5540)
1 parent 89afa81 commit a593444

File tree

2 files changed

+42
-39
lines changed

2 files changed

+42
-39
lines changed

src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -225,42 +225,42 @@ public <T> Flux<T> save(Flux<T> entities, IndexCoordinates index, int bulkSize)
225225

226226
return Flux.defer(() -> {
227227
Sinks.Many<T> sink = Sinks.many().unicast().onBackpressureBuffer();
228-
entities //
229-
.bufferTimeout(bulkSize, Duration.ofMillis(200)) //
230-
.subscribe(new Subscriber<List<T>>() {
231-
private Subscription subscription;
232-
private AtomicBoolean upstreamComplete = new AtomicBoolean(false);
233-
234-
@Override
235-
public void onSubscribe(Subscription subscription) {
236-
this.subscription = subscription;
237-
subscription.request(1);
238-
}
239-
240-
@Override
241-
public void onNext(List<T> entityList) {
242-
saveAll(entityList, index) //
243-
.map(sink::tryEmitNext) //
244-
.doOnComplete(() -> {
245-
if (!upstreamComplete.get()) {
246-
subscription.request(1);
247-
} else {
248-
sink.tryEmitComplete();
249-
}
250-
}).subscribe();
251-
}
252-
253-
@Override
254-
public void onError(Throwable throwable) {
255-
subscription.cancel();
256-
sink.tryEmitError(throwable);
257-
}
258-
259-
@Override
260-
public void onComplete() {
261-
upstreamComplete.set(true);
262-
}
263-
});
228+
entities.window(bulkSize) //
229+
.concatMap(flux -> flux.collectList()) //
230+
.subscribe(new Subscriber<List<T>>() {
231+
private Subscription subscription;
232+
private AtomicBoolean upstreamComplete = new AtomicBoolean(false);
233+
234+
@Override
235+
public void onSubscribe(Subscription subscription) {
236+
this.subscription = subscription;
237+
subscription.request(1);
238+
}
239+
240+
@Override
241+
public void onNext(List<T> entityList) {
242+
saveAll(entityList, index) //
243+
.map(sink::tryEmitNext) //
244+
.doOnComplete(() -> {
245+
if (!upstreamComplete.get()) {
246+
subscription.request(1);
247+
} else {
248+
sink.tryEmitComplete();
249+
}
250+
}).subscribe();
251+
}
252+
253+
@Override
254+
public void onError(Throwable throwable) {
255+
subscription.cancel();
256+
sink.tryEmitError(throwable);
257+
}
258+
259+
@Override
260+
public void onComplete() {
261+
upstreamComplete.set(true);
262+
}
263+
});
264264
return sink.asFlux();
265265
});
266266

src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.lang.Integer;
2929
import java.lang.Long;
3030
import java.lang.Object;
31+
import java.time.Duration;
3132
import java.time.LocalDate;
3233
import java.time.format.DateTimeFormatter;
3334
import java.util.ArrayList;
@@ -1181,7 +1182,7 @@ void shouldWorkWithReadonlyId() {
11811182
}).verifyComplete();
11821183
}
11831184

1184-
@Test // #2496
1185+
@Test // #2496, #2576
11851186
@DisplayName("should save data from Flux and return saved data in a flux")
11861187
void shouldSaveDataFromFluxAndReturnSavedDataInAFlux() {
11871188

@@ -1190,9 +1191,11 @@ void shouldSaveDataFromFluxAndReturnSavedDataInAFlux() {
11901191
.mapToObj(SampleEntity::of) //
11911192
.collect(Collectors.toList());
11921193

1193-
var entityFlux = Flux.fromIterable(entityList);
1194+
// we add a random delay to make suure the underlying implementation handles irregular incoming data
1195+
var entities = Flux.fromIterable(entityList).concatMap(
1196+
entity -> Mono.just(entity).delay(Duration.ofMillis((long) (Math.random() * 10))).thenReturn(entity));
11941197

1195-
operations.save(entityFlux, SampleEntity.class).collectList() //
1198+
operations.save(entities, SampleEntity.class).collectList() //
11961199
.as(StepVerifier::create) //
11971200
.consumeNextWith(savedEntities -> {
11981201
assertThat(savedEntities).isEqualTo(entityList);

0 commit comments

Comments
 (0)