Skip to content

Commit d0af219

Browse files
fix: bulk ingester might skip lister requests
1 parent bf86580 commit d0af219

File tree

2 files changed

+23
-13
lines changed

2 files changed

+23
-13
lines changed

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -311,12 +311,6 @@ public void flush() {
311311
if (exec != null) {
312312
// A request was actually sent
313313
exec.futureResponse.handle((resp, thr) -> {
314-
315-
sendRequestCondition.signalIfReadyAfter(() -> {
316-
requestsInFlightCount--;
317-
closeCondition.signalAllIfReady();
318-
});
319-
320314
if (resp != null) {
321315
// Success
322316
if (listener != null) {
@@ -330,6 +324,11 @@ public void flush() {
330324
exec.contexts, thr));
331325
}
332326
}
327+
328+
sendRequestCondition.signalIfReadyAfter(() -> {
329+
requestsInFlightCount--;
330+
closeCondition.signalAllIfReady();
331+
});
333332
return null;
334333
});
335334
}

java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,26 +89,37 @@ private void printStats(TestTransport transport) {
8989
@Test
9090
public void basicTestFlush() throws Exception {
9191
// Prime numbers, so that we have leftovers to flush before shutting down
92-
multiThreadTest(7, 3, 5, 101);
92+
multiThreadTest(7, 3, 5, 101, true);
93+
}
94+
95+
@Test
96+
public void basicTestFlushWithInternalScheduler() throws Exception {
97+
// Prime numbers, so that we have leftovers to flush before shutting down
98+
multiThreadTest(7, 3, 5, 101, false);
9399
}
94100

95101
@Test
96102
public void basicTestNoFlush() throws Exception {
97103
// Will have nothing to flush on close.
98-
multiThreadTest(10, 3, 5, 100);
104+
multiThreadTest(10, 3, 5, 100, true);
99105
}
100106

101-
private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations) throws Exception {
107+
private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations, boolean externalScheduler) throws Exception {
102108

103109
CountingListener listener = new CountingListener();
104110
TestTransport transport = new TestTransport();
105111
ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport);
106-
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
112+
ScheduledExecutorService scheduler;
113+
if (externalScheduler) {
114+
scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
107115
Thread t = Executors.defaultThreadFactory().newThread(r);
108-
t.setName("my-bulk-ingester-executor#" );
116+
t.setName("my-bulk-ingester-executor#");
109117
t.setDaemon(true);
110118
return t;
111-
});
119+
});
120+
} else {
121+
scheduler = null;
122+
}
112123

113124
BulkIngester<Void> ingester = BulkIngester.of(b -> b
114125
.client(client)
@@ -138,7 +149,7 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads,
138149

139150
ingester.close();
140151
transport.close();
141-
scheduler.shutdownNow();
152+
if (scheduler != null) scheduler.shutdownNow();
142153

143154
printStats(ingester);
144155
printStats(listener);

0 commit comments

Comments
 (0)