Skip to content

Commit d6b5540

Browse files
authored
Fix reactive save of Flux.
Original Pull Request #2581 Closes #2576
1 parent 11fc225 commit d6b5540

File tree

2 files changed

+42
-39
lines changed

2 files changed

+42
-39
lines changed

src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -225,42 +225,42 @@ public <T> Flux<T> save(Flux<T> entities, IndexCoordinates index, int bulkSize)
225225

226226
return Flux.defer(() -> {
227227
Sinks.Many<T> sink = Sinks.many().unicast().onBackpressureBuffer();
228-
entities //
229-
.bufferTimeout(bulkSize, Duration.ofMillis(200)) //
230-
.subscribe(new Subscriber<List<T>>() {
231-
private Subscription subscription;
232-
private AtomicBoolean upstreamComplete = new AtomicBoolean(false);
233-
234-
@Override
235-
public void onSubscribe(Subscription subscription) {
236-
this.subscription = subscription;
237-
subscription.request(1);
238-
}
239-
240-
@Override
241-
public void onNext(List<T> entityList) {
242-
saveAll(entityList, index) //
243-
.map(sink::tryEmitNext) //
244-
.doOnComplete(() -> {
245-
if (!upstreamComplete.get()) {
246-
subscription.request(1);
247-
} else {
248-
sink.tryEmitComplete();
249-
}
250-
}).subscribe();
251-
}
252-
253-
@Override
254-
public void onError(Throwable throwable) {
255-
subscription.cancel();
256-
sink.tryEmitError(throwable);
257-
}
258-
259-
@Override
260-
public void onComplete() {
261-
upstreamComplete.set(true);
262-
}
263-
});
228+
entities.window(bulkSize) //
229+
.concatMap(flux -> flux.collectList()) //
230+
.subscribe(new Subscriber<List<T>>() {
231+
private Subscription subscription;
232+
private AtomicBoolean upstreamComplete = new AtomicBoolean(false);
233+
234+
@Override
235+
public void onSubscribe(Subscription subscription) {
236+
this.subscription = subscription;
237+
subscription.request(1);
238+
}
239+
240+
@Override
241+
public void onNext(List<T> entityList) {
242+
saveAll(entityList, index) //
243+
.map(sink::tryEmitNext) //
244+
.doOnComplete(() -> {
245+
if (!upstreamComplete.get()) {
246+
subscription.request(1);
247+
} else {
248+
sink.tryEmitComplete();
249+
}
250+
}).subscribe();
251+
}
252+
253+
@Override
254+
public void onError(Throwable throwable) {
255+
subscription.cancel();
256+
sink.tryEmitError(throwable);
257+
}
258+
259+
@Override
260+
public void onComplete() {
261+
upstreamComplete.set(true);
262+
}
263+
});
264264
return sink.asFlux();
265265
});
266266

src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.lang.Integer;
2929
import java.lang.Long;
3030
import java.lang.Object;
31+
import java.time.Duration;
3132
import java.time.LocalDate;
3233
import java.time.format.DateTimeFormatter;
3334
import java.util.ArrayList;
@@ -1171,7 +1172,7 @@ void shouldWorkWithReadonlyId() {
11711172
}).verifyComplete();
11721173
}
11731174

1174-
@Test // #2496
1175+
@Test // #2496, #2576
11751176
@DisplayName("should save data from Flux and return saved data in a flux")
11761177
void shouldSaveDataFromFluxAndReturnSavedDataInAFlux() {
11771178

@@ -1180,9 +1181,11 @@ void shouldSaveDataFromFluxAndReturnSavedDataInAFlux() {
11801181
.mapToObj(SampleEntity::of) //
11811182
.collect(Collectors.toList());
11821183

1183-
var entityFlux = Flux.fromIterable(entityList);
1184+
// we add a random delay to make suure the underlying implementation handles irregular incoming data
1185+
var entities = Flux.fromIterable(entityList).concatMap(
1186+
entity -> Mono.just(entity).delay(Duration.ofMillis((long) (Math.random() * 10))).thenReturn(entity));
11841187

1185-
operations.save(entityFlux, SampleEntity.class).collectList() //
1188+
operations.save(entities, SampleEntity.class).collectList() //
11861189
.as(StepVerifier::create) //
11871190
.consumeNextWith(savedEntities -> {
11881191
assertThat(savedEntities).isEqualTo(entityList);

0 commit comments

Comments
 (0)