From 5194ccf0faa3e57c9e864bacfdcb5d1a84d68111 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Thu, 20 Jun 2024 18:20:53 +0200 Subject: [PATCH] before bulk synch, unit test fix --- .../elasticsearch/_helpers/bulk/BulkIngester.java | 3 ++- .../_helpers/bulk/BulkIngesterTest.java | 13 +++++++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index 69e937838..c6fe93ac3 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -293,7 +293,8 @@ public void flush() { if (listener != null) { BulkRequest finalRequest = request; - scheduler.submit(() -> listener.beforeBulk(id, finalRequest, requestContexts)); + // synchronous execution to make sure it actually runs before + listener.beforeBulk(id, finalRequest, requestContexts); } CompletionStage result = client.bulk(request); diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index 2a0b22416..a72473eda 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -47,6 +47,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -102,11 +103,18 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, CountingListener listener = new CountingListener(); TestTransport transport = new TestTransport(); ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport); + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName("my-bulk-ingester-executor#" ); + t.setDaemon(true); + return t; + }); BulkIngester ingester = BulkIngester.of(b -> b .client(client) .maxOperations(maxOperations) .maxConcurrentRequests(maxRequests) + .scheduler(scheduler) .listener(listener) ); @@ -130,6 +138,7 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, ingester.close(); transport.close(); + scheduler.shutdownNow(); printStats(ingester); printStats(listener); @@ -181,7 +190,7 @@ public void periodicFlushTest() throws Exception { // Disable other flushing limits .maxSize(-1) .maxOperations(-1) - .maxConcurrentRequests(Integer.MAX_VALUE) + .maxConcurrentRequests(Integer.MAX_VALUE-1) ); // Add an operation every 100 ms to give time @@ -242,7 +251,7 @@ public void afterBulk(long executionId, BulkRequest request, List contexts // Disable other flushing limits .maxSize(-1) .maxOperations(-1) - .maxConcurrentRequests(Integer.MAX_VALUE) + .maxConcurrentRequests(Integer.MAX_VALUE - 1) .listener(listener) );