From d9e80fba11d3b06a26a5cc9f5871706850453b0d Mon Sep 17 00:00:00 2001 From: Fabrizio Fortino Date: Tue, 20 Aug 2024 16:22:00 +0200 Subject: [PATCH 1/3] fix: bulk ingester might skip lister requests --- .../_helpers/bulk/BulkIngester.java | 11 ++++---- .../_helpers/bulk/BulkIngesterTest.java | 25 +++++++++++++------ 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index 5e6b8addc..af5401baa 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -311,12 +311,6 @@ public void flush() { if (exec != null) { // A request was actually sent exec.futureResponse.handle((resp, thr) -> { - - sendRequestCondition.signalIfReadyAfter(() -> { - requestsInFlightCount--; - closeCondition.signalAllIfReady(); - }); - if (resp != null) { // Success if (listener != null) { @@ -330,6 +324,11 @@ public void flush() { exec.contexts, thr)); } } + + sendRequestCondition.signalIfReadyAfter(() -> { + requestsInFlightCount--; + closeCondition.signalAllIfReady(); + }); return null; }); } diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index d6aee8cc5..fb7290399 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -90,26 +90,37 @@ private void printStats(TestTransport transport) { @Test public void basicTestFlush() throws Exception { // Prime numbers, so that we have leftovers to flush before shutting down - multiThreadTest(7, 3, 5, 101); + multiThreadTest(7, 3, 5, 101, true); + } + + @Test + public void basicTestFlushWithInternalScheduler() throws Exception { + // Prime numbers, so that we have leftovers to flush before shutting down + multiThreadTest(7, 3, 5, 101, false); } @Test public void basicTestNoFlush() throws Exception { // Will have nothing to flush on close. - multiThreadTest(10, 3, 5, 100); + multiThreadTest(10, 3, 5, 100, true); } - private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations) throws Exception { + private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations, boolean externalScheduler) throws Exception { CountingListener listener = new CountingListener(); TestTransport transport = new TestTransport(); ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport); - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + ScheduledExecutorService scheduler; + if (externalScheduler) { + scheduler = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = Executors.defaultThreadFactory().newThread(r); - t.setName("my-bulk-ingester-executor#" ); + t.setName("my-bulk-ingester-executor#"); t.setDaemon(true); return t; - }); + }); + } else { + scheduler = null; + } BulkIngester ingester = BulkIngester.of(b -> b .client(client) @@ -139,7 +150,7 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, ingester.close(); transport.close(); - scheduler.shutdownNow(); + if (scheduler != null) scheduler.shutdownNow(); printStats(ingester); printStats(listener); From ac086fff804969a1d70448c23b1d8ac2c26ee11d Mon Sep 17 00:00:00 2001 From: Fabrizio Fortino Date: Tue, 20 Aug 2024 16:38:10 +0200 Subject: [PATCH 2/3] minor: fix style --- .../clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index fb7290399..4f93429a6 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -105,7 +105,8 @@ public void basicTestNoFlush() throws Exception { multiThreadTest(10, 3, 5, 100, true); } - private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations, boolean externalScheduler) throws Exception { + private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations, + boolean externalScheduler) throws Exception { CountingListener listener = new CountingListener(); TestTransport transport = new TestTransport(); From 324af0a8957bc86fe6d6a37a0665ffa74a7bdcb7 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Mon, 26 Aug 2024 15:51:16 +0200 Subject: [PATCH 3/3] always waiting for listener to be done before closing --- .../_helpers/bulk/BulkIngester.java | 29 +++++++++++++++---- .../_helpers/bulk/BulkIngesterTest.java | 6 ++++ 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index af5401baa..91d194844 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -76,6 +76,7 @@ public class BulkIngester implements AutoCloseable { private final FnCondition addCondition = new FnCondition(lock, this::canAddOperation); private final FnCondition sendRequestCondition = new FnCondition(lock, this::canSendRequest); private final FnCondition closeCondition = new FnCondition(lock, this::closedAndFlushed); + private AtomicInteger listenerInProgressCount = new AtomicInteger(); private static class RequestExecution { public final long id; @@ -235,7 +236,7 @@ private boolean canAddOperation() { } private boolean closedAndFlushed() { - return isClosed && operations.isEmpty() && requestsInFlightCount == 0; + return isClosed && operations.isEmpty() && requestsInFlightCount == 0 && listenerInProgressCount.get() == 0; } //----- Ingester logic @@ -314,14 +315,32 @@ public void flush() { if (resp != null) { // Success if (listener != null) { - scheduler.submit(() -> listener.afterBulk(exec.id, exec.request, - exec.contexts, resp)); + listenerInProgressCount.incrementAndGet(); + scheduler.submit(() -> { + try { + listener.afterBulk(exec.id, exec.request, exec.contexts, resp); + } + finally { + if(listenerInProgressCount.decrementAndGet() == 0){ + closeCondition.signalIfReady(); + } + } + }); } } else { // Failure if (listener != null) { - scheduler.submit(() -> listener.afterBulk(exec.id, exec.request, - exec.contexts, thr)); + listenerInProgressCount.incrementAndGet(); + scheduler.submit(() -> { + try { + listener.afterBulk(exec.id, exec.request, exec.contexts, thr); + } + finally { + if(listenerInProgressCount.decrementAndGet() == 0){ + closeCondition.signalIfReady(); + } + } + }); } } diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index 4f93429a6..a76f3f75f 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -105,6 +105,12 @@ public void basicTestNoFlush() throws Exception { multiThreadTest(10, 3, 5, 100, true); } + @Test + public void basicTestNoFlushWithInternalScheduler() throws Exception { + // Will have nothing to flush on close. + multiThreadTest(10, 3, 5, 100, false); + } + private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations, boolean externalScheduler) throws Exception {