From 3b4bc1ec4058e5af67373bb8209256c09428ddde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 5 Mar 2024 09:01:05 +0100 Subject: [PATCH 1/5] fix: the executor service now uses SynchronousQueue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The current approach used a non-bound queue, therefore the thread were not added dynamically into the Threadpool Signed-off-by: Attila Mészáros --- .../operator/api/config/ExecutorServiceManager.java | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 62f345426c..7abda2ef17 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -3,15 +3,7 @@ import java.time.Duration; import java.util.Collection; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -37,9 +29,8 @@ public class ExecutorServiceManager { public static ExecutorService newThreadPoolExecutor(int minThreads, int maxThreads) { minThreads = Utils.ensureValid(minThreads, "minimum number of threads", MIN_THREAD_NUMBER); maxThreads = Utils.ensureValid(maxThreads, "maximum number of threads", minThreads + 1); - return new ThreadPoolExecutor(minThreads, maxThreads, 1, TimeUnit.MINUTES, - new LinkedBlockingDeque<>()); + new SynchronousQueue<>()); } /** From 856a00a26c48bc374ac46aced536254573c01110 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 5 Mar 2024 10:16:29 +0100 Subject: [PATCH 2/5] docs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../api/config/ConfigurationService.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 53bfc75df9..6efb6326f5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -127,7 +127,7 @@ default boolean checkCRDAndValidateLocalModel() { return false; } - int DEFAULT_RECONCILIATION_THREADS_NUMBER = 200; + int DEFAULT_RECONCILIATION_THREADS_NUMBER = Integer.MAX_VALUE; int MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER = 10; /** @@ -190,6 +190,22 @@ default Metrics getMetrics() { return Metrics.NOOP; } + /** + * Use to provide custom executor service. By default, a + * {@link java.util.concurrent.ThreadPoolExecutor} is provided, that honors the values of + * concurrentReconciliationThreads and minConcurrentReconciliationThreads. When a controller + * starts, all the resources are reconciled, therefore there is a natural and expected burst on + * startup in general. According to this there are multiple options, using the ThreadPoolExecutor. + * Either: + *
    + *
  • Use a very high upper bound thread limit with + * {@link java.util.concurrent.SynchronousQueue}. This is the default approach.
  • + *
  • Use fixed number of threads with infinite queue, lik + * {@link java.util.concurrent.LinkedBlockingDeque}.
  • + *
  • In addition to that, could be further fine tuned using the + * {@link java.util.concurrent.ArrayBlockingQueue}.
  • + *
+ */ default ExecutorService getExecutorService() { return newThreadPoolExecutor(minConcurrentReconciliationThreads(), concurrentReconciliationThreads()); From 782a6a91e5f473f042023697c5447c1f421694e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 5 Mar 2024 10:32:36 +0100 Subject: [PATCH 3/5] docs improvements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../operator/api/config/ConfigurationService.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 6efb6326f5..e6054d664a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -191,11 +191,12 @@ default Metrics getMetrics() { } /** - * Use to provide custom executor service. By default, a - * {@link java.util.concurrent.ThreadPoolExecutor} is provided, that honors the values of + * Use to provide custom executor service.
+ * By default, a + * {@link java.util.concurrent.ThreadPoolExecutor} is used, that honors the values of * concurrentReconciliationThreads and minConcurrentReconciliationThreads. When a controller * starts, all the resources are reconciled, therefore there is a natural and expected burst on - * startup in general. According to this there are multiple options, using the ThreadPoolExecutor. + * startup. According to this there are multiple options, using the ThreadPoolExecutor. * Either: *
    *
  • Use a very high upper bound thread limit with From 1ae9e1bfc2d37d9253cc9631a305f7224119dfd1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 5 Mar 2024 10:49:59 +0100 Subject: [PATCH 4/5] format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../api/config/ConfigurationService.java | 8 ++-- .../operator/ThreadPoolTest.java | 48 +++++++++++++++++++ 2 files changed, 51 insertions(+), 5 deletions(-) create mode 100644 operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ThreadPoolTest.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index e6054d664a..f61ceef812 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -192,12 +192,10 @@ default Metrics getMetrics() { /** * Use to provide custom executor service.
    - * By default, a - * {@link java.util.concurrent.ThreadPoolExecutor} is used, that honors the values of - * concurrentReconciliationThreads and minConcurrentReconciliationThreads. When a controller + * By default, a {@link java.util.concurrent.ThreadPoolExecutor} is used, that honors the values + * of concurrentReconciliationThreads and minConcurrentReconciliationThreads. When a controller * starts, all the resources are reconciled, therefore there is a natural and expected burst on - * startup. According to this there are multiple options, using the ThreadPoolExecutor. - * Either: + * startup. According to this there are multiple options, using the ThreadPoolExecutor. Either: *
      *
    • Use a very high upper bound thread limit with * {@link java.util.concurrent.SynchronousQueue}. This is the default approach.
    • diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ThreadPoolTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ThreadPoolTest.java new file mode 100644 index 0000000000..f1cc37270e --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ThreadPoolTest.java @@ -0,0 +1,48 @@ +package io.javaoperatorsdk.operator; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.concurrent.*; + +public class ThreadPoolTest { + + public static final int NUM = 20000; + + @Test + void test() { +// ThreadPoolExecutor es = (ThreadPoolExecutor) Executors.newCachedThreadPool(); + var es = new ThreadPoolExecutor(10, 50, 1, TimeUnit.MINUTES, + new LinkedBlockingDeque<>(300)); + var sumSubmit = 0; + var futures = new ArrayList>(); + for (int i = 0; i< NUM; i++) { + var now = System.nanoTime() ; + Future f = es.submit(() -> { + try { + Thread.sleep(200); + System.out.println("Finished " + Thread.currentThread()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + var duration = System.nanoTime() - now; + sumSubmit += duration; + System.out.println("submit duration in nanos: "+duration); + futures.add(f); + } + + futures.forEach(f-> { + try { + f.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + System.out.println("finished:"+es.getPoolSize()); + System.out.println("avg submit:"+sumSubmit / NUM); + } + + +} From 21dae29c0d5eb69c64f4aee467a98937a1aeebd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 5 Mar 2024 10:51:04 +0100 Subject: [PATCH 5/5] delete test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../operator/ThreadPoolTest.java | 48 ------------------- 1 file changed, 48 deletions(-) delete mode 100644 operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ThreadPoolTest.java diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ThreadPoolTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ThreadPoolTest.java deleted file mode 100644 index f1cc37270e..0000000000 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ThreadPoolTest.java +++ /dev/null @@ -1,48 +0,0 @@ -package io.javaoperatorsdk.operator; - -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.concurrent.*; - -public class ThreadPoolTest { - - public static final int NUM = 20000; - - @Test - void test() { -// ThreadPoolExecutor es = (ThreadPoolExecutor) Executors.newCachedThreadPool(); - var es = new ThreadPoolExecutor(10, 50, 1, TimeUnit.MINUTES, - new LinkedBlockingDeque<>(300)); - var sumSubmit = 0; - var futures = new ArrayList>(); - for (int i = 0; i< NUM; i++) { - var now = System.nanoTime() ; - Future f = es.submit(() -> { - try { - Thread.sleep(200); - System.out.println("Finished " + Thread.currentThread()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); - var duration = System.nanoTime() - now; - sumSubmit += duration; - System.out.println("submit duration in nanos: "+duration); - futures.add(f); - } - - futures.forEach(f-> { - try { - f.get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - - System.out.println("finished:"+es.getPoolSize()); - System.out.println("avg submit:"+sumSubmit / NUM); - } - - -}