Skip to content

Commit 38b98f9

Browse files
committed
fix: bulk ingester might skip listener requests (#867)
* fix: bulk ingester might skip lister requests * minor: fix style * always waiting for listener to be done before closing --------- Co-authored-by: Laura Trotta <laura.trotta@elastic.co> Co-authored-by: Laura Trotta <153528055+l-trotta@users.noreply.github.com>
1 parent 004d77d commit 38b98f9

File tree

2 files changed

+57
-12
lines changed

2 files changed

+57
-12
lines changed

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public class BulkIngester<Context> implements AutoCloseable {
7575
private final FnCondition addCondition = new FnCondition(lock, this::canAddOperation);
7676
private final FnCondition sendRequestCondition = new FnCondition(lock, this::canSendRequest);
7777
private final FnCondition closeCondition = new FnCondition(lock, this::closedAndFlushed);
78+
private AtomicInteger listenerInProgressCount = new AtomicInteger();
7879

7980
private static class RequestExecution<Context> {
8081
public final long id;
@@ -235,7 +236,7 @@ private boolean canAddOperation() {
235236
}
236237

237238
private boolean closedAndFlushed() {
238-
return isClosed && operations.isEmpty() && requestsInFlightCount == 0;
239+
return isClosed && operations.isEmpty() && requestsInFlightCount == 0 && listenerInProgressCount.get() == 0;
239240
}
240241

241242
//----- Ingester logic
@@ -308,23 +309,42 @@ public void flush() {
308309
if (exec != null) {
309310
// A request was actually sent
310311
exec.futureResponse.handle((resp, thr) -> {
311-
312-
sendRequestCondition.signalIfReadyAfter(() -> {
313-
requestsInFlightCount--;
314-
closeCondition.signalAllIfReady();
315-
});
316-
317312
if (resp != null) {
318313
// Success
319314
if (listener != null) {
320-
listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
315+
listenerInProgressCount.incrementAndGet();
316+
scheduler.submit(() -> {
317+
try {
318+
listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
319+
}
320+
finally {
321+
if(listenerInProgressCount.decrementAndGet() == 0){
322+
closeCondition.signalIfReady();
323+
}
324+
}
325+
});
321326
}
322327
} else {
323328
// Failure
324329
if (listener != null) {
325-
listener.afterBulk(exec.id, exec.request, exec.contexts, thr);
330+
listenerInProgressCount.incrementAndGet();
331+
scheduler.submit(() -> {
332+
try {
333+
listener.afterBulk(exec.id, exec.request, exec.contexts, thr);
334+
}
335+
finally {
336+
if(listenerInProgressCount.decrementAndGet() == 0){
337+
closeCondition.signalIfReady();
338+
}
339+
}
340+
});
326341
}
327342
}
343+
344+
sendRequestCondition.signalIfReadyAfter(() -> {
345+
requestsInFlightCount--;
346+
closeCondition.signalAllIfReady();
347+
});
328348
return null;
329349
});
330350
}

java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,20 +88,44 @@ private void printStats(TestTransport transport) {
8888
@Test
8989
public void basicTestFlush() throws Exception {
9090
// Prime numbers, so that we have leftovers to flush before shutting down
91-
multiThreadTest(7, 3, 5, 101);
91+
multiThreadTest(7, 3, 5, 101, true);
92+
}
93+
94+
@Test
95+
public void basicTestFlushWithInternalScheduler() throws Exception {
96+
// Prime numbers, so that we have leftovers to flush before shutting down
97+
multiThreadTest(7, 3, 5, 101, false);
9298
}
9399

94100
@Test
95101
public void basicTestNoFlush() throws Exception {
96102
// Will have nothing to flush on close.
97-
multiThreadTest(10, 3, 5, 100);
103+
multiThreadTest(10, 3, 5, 100, true);
104+
}
105+
106+
@Test
107+
public void basicTestNoFlushWithInternalScheduler() throws Exception {
108+
// Will have nothing to flush on close.
109+
multiThreadTest(10, 3, 5, 100, false);
98110
}
99111

100-
private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations) throws Exception {
112+
private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations,
113+
boolean externalScheduler) throws Exception {
101114

102115
CountingListener listener = new CountingListener();
103116
TestTransport transport = new TestTransport();
104117
ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport);
118+
ScheduledExecutorService scheduler;
119+
if (externalScheduler) {
120+
scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
121+
Thread t = Executors.defaultThreadFactory().newThread(r);
122+
t.setName("my-bulk-ingester-executor#");
123+
t.setDaemon(true);
124+
return t;
125+
});
126+
} else {
127+
scheduler = null;
128+
}
105129

106130
BulkIngester<Void> ingester = BulkIngester.of(b -> b
107131
.client(client)
@@ -130,6 +154,7 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads,
130154

131155
ingester.close();
132156
transport.close();
157+
if (scheduler != null) scheduler.shutdownNow();
133158

134159
printStats(ingester);
135160
printStats(listener);

0 commit comments

Comments
 (0)