From 720c07848bc40f5853d5824d7518b9190ed5b2a8 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Wed, 5 Jun 2024 16:38:17 +0200 Subject: [PATCH 1/5] running listener code in separate thread --- .../_helpers/bulk/BulkIngester.java | 78 ++++++++++++------- 1 file changed, 50 insertions(+), 28 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index a6e61367e..ab6789301 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -82,7 +82,8 @@ private static class RequestExecution { public final List contexts; public final CompletionStage futureResponse; - RequestExecution(long id, BulkRequest request, List contexts, CompletionStage futureResponse) { + RequestExecution(long id, BulkRequest request, List contexts, + CompletionStage futureResponse) { this.id = id; this.request = request; this.contexts = contexts; @@ -99,19 +100,18 @@ private BulkIngester(Builder builder) { this.maxOperations = builder.bulkOperations < 0 ? Integer.MAX_VALUE : builder.bulkOperations; this.listener = builder.listener; this.flushIntervalMillis = builder.flushIntervalMillis; - - if (flushIntervalMillis != null) { - long flushInterval = flushIntervalMillis; - // Create a scheduler if needed - ScheduledExecutorService scheduler; + // Create a scheduler if needed + ScheduledExecutorService scheduler = null; + if (flushIntervalMillis != null || listener != null) { + if (builder.scheduler == null) { - scheduler = Executors.newSingleThreadScheduledExecutor((r) -> { - Thread t = Executors.defaultThreadFactory().newThread(r); - t.setName("bulk-ingester-flusher#" + ingesterId); - t.setDaemon(true); - return t; - }); + scheduler = Executors.newScheduledThreadPool(maxRequests + 1, (r) -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName("bulk-ingester-executor#" + ingesterId); + t.setDaemon(true); + return t; + }); // Keep it, we'll have to close it. this.scheduler = scheduler; @@ -119,7 +119,11 @@ private BulkIngester(Builder builder) { // It's not ours, we will not close it. scheduler = builder.scheduler; } - + + } + + if (flushIntervalMillis != null) { + long flushInterval = flushIntervalMillis; this.flushTask = scheduler.scheduleWithFixedDelay( this::failsafeFlush, flushInterval, flushInterval, @@ -221,7 +225,7 @@ public long requestCount() { * @see Builder#maxConcurrentRequests */ public long requestContentionsCount() { - return this.sendRequestCondition.contentions(); + return this.sendRequestCondition.contentions(); } //----- Predicates for the condition variables @@ -265,7 +269,7 @@ private BulkRequest.Builder newRequest() { private void failsafeFlush() { try { flush(); - } catch(Throwable thr) { + } catch (Throwable thr) { // Log the error and continue logger.error("Error in background flush", thr); } @@ -280,7 +284,8 @@ public void flush() { () -> { // Build the request BulkRequest request = newRequest().operations(operations).build(); - List requestContexts = contexts == null ? Collections.nCopies(operations.size(), null) : contexts; + List requestContexts = contexts == null ? Collections.nCopies(operations.size(), + null) : contexts; // Prepare for next round operations = new ArrayList<>(); @@ -291,7 +296,8 @@ public void flush() { long id = sendRequestCondition.invocations(); if (listener != null) { - listener.beforeBulk(id, request, requestContexts); + BulkRequest finalRequest = request; + scheduler.submit(() -> listener.beforeBulk(id, finalRequest, requestContexts)); } CompletionStage result = client.bulk(request); @@ -303,7 +309,7 @@ public void flush() { } return new RequestExecution<>(id, request, requestContexts, result); - }); + }); if (exec != null) { // A request was actually sent @@ -317,12 +323,14 @@ public void flush() { if (resp != null) { // Success if (listener != null) { - listener.afterBulk(exec.id, exec.request, exec.contexts, resp); + scheduler.submit(() -> listener.afterBulk(exec.id, exec.request, + exec.contexts, resp)); } } else { // Failure if (listener != null) { - listener.afterBulk(exec.id, exec.request, exec.contexts, thr); + scheduler.submit(() -> listener.afterBulk(exec.id, exec.request, + exec.contexts, thr)); } } return null; @@ -383,7 +391,8 @@ public void close() { // Flush buffered operations flush(); // and wait for all requests to be completed - closeCondition.whenReady(() -> {}); + closeCondition.whenReady(() -> { + }); if (flushTask != null) { flushTask.cancel(false); @@ -404,7 +413,7 @@ public static class Builder implements ObjectBuilder listener; @@ -424,7 +433,8 @@ public Builder client(ElasticsearchClient client) { } /** - * Sets when to flush a new bulk request based on the number of operations currently added. Defaults to + * Sets when to flush a new bulk request based on the number of operations currently added. + * Defaults to * {@code 1000}. Can be set to {@code -1} to disable it. * * @throws IllegalArgumentException if less than -1. @@ -438,7 +448,8 @@ public Builder maxOperations(int count) { } /** - * Sets when to flush a new bulk request based on the size in bytes of actions currently added. A request is sent + * Sets when to flush a new bulk request based on the size in bytes of actions currently added. A + * request is sent * once that size has been exceeded. Defaults to 5 megabytes. Can be set to {@code -1} to disable it. * * @throws IllegalArgumentException if less than -1. @@ -452,7 +463,8 @@ public Builder maxSize(long bytes) { } /** - * Sets the number of concurrent requests allowed to be executed. A value of 1 means 1 request is allowed to be executed + * Sets the number of concurrent requests allowed to be executed. A value of 1 means 1 request is + * allowed to be executed * while accumulating new bulk requests. Defaults to {@code 1}. * * @throws IllegalArgumentException if less than 1. @@ -468,7 +480,8 @@ public Builder maxConcurrentRequests(int max) { /** * Sets an interval flushing any bulk actions pending if the interval passes. Defaults to not set. *

- * Flushing is still subject to the maximum number of requests set with {@link #maxConcurrentRequests}. + * Flushing is still subject to the maximum number of requests set with + * {@link #maxConcurrentRequests}. * * @throws IllegalArgumentException if not a positive duration. */ @@ -483,13 +496,21 @@ public Builder flushInterval(long value, TimeUnit unit) { /** * Sets an interval flushing any bulk actions pending if the interval passes. Defaults to not set. *

- * Flushing is still subject to the maximum number of requests set with {@link #maxConcurrentRequests}. + * Flushing is still subject to the maximum number of requests set with + * {@link #maxConcurrentRequests}. + * Deprecated in favor of {@link #scheduler} */ + @Deprecated public Builder flushInterval(long value, TimeUnit unit, ScheduledExecutorService scheduler) { this.scheduler = scheduler; return flushInterval(value, unit); } + public Builder scheduler(ScheduledExecutorService scheduler) { + this.scheduler = scheduler; + return this; + } + public Builder listener(BulkListener listener) { this.listener = listener; return this; @@ -518,7 +539,8 @@ public Builder globalSettings(Function build() { // Ensure some chunking criteria are defined - boolean hasCriteria = this.bulkOperations >= 0 || this.bulkSize >= 0 || this.flushIntervalMillis != null; + boolean hasCriteria = + this.bulkOperations >= 0 || this.bulkSize >= 0 || this.flushIntervalMillis != null; if (!hasCriteria) { throw new IllegalStateException("No bulk operation chunking criteria have been set."); From 1ea316490ac514b98665cfba3d7b005ced58b874 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Wed, 5 Jun 2024 16:57:03 +0200 Subject: [PATCH 2/5] thread name --- .../clients/elasticsearch/_helpers/bulk/BulkIngester.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index ab6789301..b5aff6a4e 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -108,7 +108,7 @@ private BulkIngester(Builder builder) { if (builder.scheduler == null) { scheduler = Executors.newScheduledThreadPool(maxRequests + 1, (r) -> { Thread t = Executors.defaultThreadFactory().newThread(r); - t.setName("bulk-ingester-executor#" + ingesterId); + t.setName("bulk-ingester-executor#" + ingesterId + "#" + t.getId()); t.setDaemon(true); return t; }); From 3618e9d7fcc9dff7757714a8ab3598e6461dd7b7 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Wed, 5 Jun 2024 17:19:46 +0200 Subject: [PATCH 3/5] closing logic --- .../_helpers/bulk/BulkIngester.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index b5aff6a4e..4c61909e4 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -62,6 +62,7 @@ public class BulkIngester implements AutoCloseable { private @Nullable ScheduledFuture flushTask; private @Nullable ScheduledExecutorService scheduler; + private boolean isExternalScheduler = false; // Current state private List operations = new ArrayList<>(); @@ -101,25 +102,20 @@ private BulkIngester(Builder builder) { this.listener = builder.listener; this.flushIntervalMillis = builder.flushIntervalMillis; - // Create a scheduler if needed - ScheduledExecutorService scheduler = null; if (flushIntervalMillis != null || listener != null) { - + // Create a scheduler if needed if (builder.scheduler == null) { - scheduler = Executors.newScheduledThreadPool(maxRequests + 1, (r) -> { + this.scheduler = Executors.newScheduledThreadPool(maxRequests + 1, (r) -> { Thread t = Executors.defaultThreadFactory().newThread(r); t.setName("bulk-ingester-executor#" + ingesterId + "#" + t.getId()); t.setDaemon(true); return t; }); - - // Keep it, we'll have to close it. - this.scheduler = scheduler; } else { // It's not ours, we will not close it. - scheduler = builder.scheduler; + this.scheduler = builder.scheduler; + this.isExternalScheduler = true; } - } if (flushIntervalMillis != null) { @@ -398,7 +394,7 @@ public void close() { flushTask.cancel(false); } - if (scheduler != null) { + if (scheduler != null && !isExternalScheduler) { scheduler.shutdownNow(); } } From aed254c17429f009a95c2f0c4b669edf665126a9 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Wed, 5 Jun 2024 17:29:26 +0200 Subject: [PATCH 4/5] comment --- .../clients/elasticsearch/_helpers/bulk/BulkIngester.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index 4c61909e4..4a7c95c01 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -502,6 +502,10 @@ public Builder flushInterval(long value, TimeUnit unit, ScheduledExecut return flushInterval(value, unit); } + /** + * Sets a custom scheduler to run the flush thread and the listener logic. A default one is used if + * not set. + */ public Builder scheduler(ScheduledExecutorService scheduler) { this.scheduler = scheduler; return this; From 6e251a42e8514218fa6d1a11aa07635acb2e8501 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Mon, 17 Jun 2024 18:59:36 +0200 Subject: [PATCH 5/5] javadoc fixes --- .../clients/elasticsearch/_helpers/bulk/BulkIngester.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index 4a7c95c01..69e937838 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -429,8 +429,7 @@ public Builder client(ElasticsearchClient client) { } /** - * Sets when to flush a new bulk request based on the number of operations currently added. - * Defaults to + * Sets when to flush a new bulk request based on the number of operations currently added. Defaults to * {@code 1000}. Can be set to {@code -1} to disable it. * * @throws IllegalArgumentException if less than -1. @@ -494,7 +493,7 @@ public Builder flushInterval(long value, TimeUnit unit) { *

* Flushing is still subject to the maximum number of requests set with * {@link #maxConcurrentRequests}. - * Deprecated in favor of {@link #scheduler} + * @deprecated use {@link #scheduler(ScheduledExecutorService)} */ @Deprecated public Builder flushInterval(long value, TimeUnit unit, ScheduledExecutorService scheduler) {