Skip to content

Commit d9e80fb

Browse files
fabriziofortinol-trotta
authored andcommitted
fix: bulk ingester might skip lister requests
1 parent aca5752 commit d9e80fb

File tree

2 files changed

+23
-13
lines changed

2 files changed

+23
-13
lines changed

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -311,12 +311,6 @@ public void flush() {
311311
if (exec != null) {
312312
// A request was actually sent
313313
exec.futureResponse.handle((resp, thr) -> {
314-
315-
sendRequestCondition.signalIfReadyAfter(() -> {
316-
requestsInFlightCount--;
317-
closeCondition.signalAllIfReady();
318-
});
319-
320314
if (resp != null) {
321315
// Success
322316
if (listener != null) {
@@ -330,6 +324,11 @@ public void flush() {
330324
exec.contexts, thr));
331325
}
332326
}
327+
328+
sendRequestCondition.signalIfReadyAfter(() -> {
329+
requestsInFlightCount--;
330+
closeCondition.signalAllIfReady();
331+
});
333332
return null;
334333
});
335334
}

java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,26 +90,37 @@ private void printStats(TestTransport transport) {
9090
@Test
9191
public void basicTestFlush() throws Exception {
9292
// Prime numbers, so that we have leftovers to flush before shutting down
93-
multiThreadTest(7, 3, 5, 101);
93+
multiThreadTest(7, 3, 5, 101, true);
94+
}
95+
96+
@Test
97+
public void basicTestFlushWithInternalScheduler() throws Exception {
98+
// Prime numbers, so that we have leftovers to flush before shutting down
99+
multiThreadTest(7, 3, 5, 101, false);
94100
}
95101

96102
@Test
97103
public void basicTestNoFlush() throws Exception {
98104
// Will have nothing to flush on close.
99-
multiThreadTest(10, 3, 5, 100);
105+
multiThreadTest(10, 3, 5, 100, true);
100106
}
101107

102-
private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations) throws Exception {
108+
private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations, boolean externalScheduler) throws Exception {
103109

104110
CountingListener listener = new CountingListener();
105111
TestTransport transport = new TestTransport();
106112
ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport);
107-
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
113+
ScheduledExecutorService scheduler;
114+
if (externalScheduler) {
115+
scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
108116
Thread t = Executors.defaultThreadFactory().newThread(r);
109-
t.setName("my-bulk-ingester-executor#" );
117+
t.setName("my-bulk-ingester-executor#");
110118
t.setDaemon(true);
111119
return t;
112-
});
120+
});
121+
} else {
122+
scheduler = null;
123+
}
113124

114125
BulkIngester<Void> ingester = BulkIngester.of(b -> b
115126
.client(client)
@@ -139,7 +150,7 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads,
139150

140151
ingester.close();
141152
transport.close();
142-
scheduler.shutdownNow();
153+
if (scheduler != null) scheduler.shutdownNow();
143154

144155
printStats(ingester);
145156
printStats(listener);

0 commit comments

Comments
 (0)