Skip to content

improve: caching executor for startup tasks and remove parallel() #1755

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.processing.Controller;

/**
Expand All @@ -32,20 +33,27 @@ public synchronized void shouldStart() {
}

public synchronized void start(boolean startEventProcessor) {
controllers().parallelStream().forEach(c -> c.start(startEventProcessor));
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(controllers().stream(), c -> {
c.start(startEventProcessor);
return null;
}, c -> "Controller Starter for: " + c.getConfiguration().getName());
started = true;
}

public synchronized void stop() {
controllers().parallelStream().forEach(closeable -> {
log.debug("closing {}", closeable);
closeable.stop();
});
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(controllers().stream(), c -> {
log.debug("closing {}", c);
c.stop();
return null;
}, c -> "Controller Stopper for: " + c.getConfiguration().getName());
started = false;
}

public synchronized void startEventProcessing() {
controllers().parallelStream().forEach(Controller::startEventProcessing);
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(controllers().stream(), c -> {
c.startEventProcessing();
return null;
}, c -> "Event processor starter for: " + c.getConfiguration().getName());
}

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public synchronized void start() {
if (started) {
return;
}
ExecutorServiceManager.init();
controllerManager.shouldStart();
final var version = ConfigurationServiceProvider.instance().getVersion();
log.info(
Expand All @@ -114,7 +115,6 @@ public synchronized void start() {

final var clientVersion = Version.clientVersion();
log.info("Client version: {}", clientVersion);
ExecutorServiceManager.init();
// first start the controller manager before leader election,
// the leader election would start subsequently the processor if on
controllerManager.start(!leaderElectionManager.isLeaderElectionEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ public class ExecutorServiceManager {
private static ExecutorServiceManager instance;
private final ExecutorService executor;
private final ExecutorService workflowExecutor;
private final ExecutorService cachingExecutorService;
private final int terminationTimeoutSeconds;

private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
int terminationTimeoutSeconds) {
this.cachingExecutorService = Executors.newCachedThreadPool();
this.executor = new InstrumentedExecutorService(executor);
this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor);
this.terminationTimeoutSeconds = terminationTimeoutSeconds;
Expand All @@ -49,7 +51,7 @@ public static void init() {
}
}

public synchronized static void stop() {
public static synchronized void stop() {
if (instance != null) {
instance.doStop();
}
Expand All @@ -66,13 +68,26 @@ public synchronized static ExecutorServiceManager instance() {
return instance;
}

public static <T> void executeAndWaitForAllToComplete(Stream<T> stream,
/**
* Uses cachingExecutorService from this manager. Use this only for tasks, that don't have dynamic
* nature, in sense that won't grow with the number of inputs (thus kubernetes resources)
*
* @param stream of elements
* @param task to call on stream elements
* @param threadNamer for naming thread
* @param <T> type
*/
public static <T> void boundedExecuteAndWaitForAllToComplete(Stream<T> stream,
Function<T, Void> task, Function<T, String> threadNamer) {
final var instrumented = new InstrumentedExecutorService(
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
executeAndWaitForAllToComplete(stream, task, threadNamer, instance().cachingExecutorService());
}

public static <T> void executeAndWaitForAllToComplete(Stream<T> stream,
Function<T, Void> task, Function<T, String> threadNamer,
ExecutorService executorService) {
final var instrumented = new InstrumentedExecutorService(executorService);
try {
instrumented.invokeAll(stream.parallel().map(item -> (Callable<Void>) () -> {
instrumented.invokeAll(stream.map(item -> (Callable<Void>) () -> {
// change thread name for easier debugging
final var thread = Thread.currentThread();
final var name = thread.getName();
Expand All @@ -91,11 +106,12 @@ public static <T> void executeAndWaitForAllToComplete(Stream<T> stream,
} catch (ExecutionException e) {
throw new OperatorException(e.getCause());
} catch (InterruptedException e) {
log.warn("Interrupted.", e);
Thread.currentThread().interrupt();
}
});
shutdown(instrumented);
} catch (InterruptedException e) {
log.warn("Interrupted.", e);
Thread.currentThread().interrupt();
}
}
Expand All @@ -108,11 +124,16 @@ public ExecutorService workflowExecutorService() {
return workflowExecutor;
}

public ExecutorService cachingExecutorService() {
return cachingExecutorService;
}

private void doStop() {
try {
log.debug("Closing executor");
shutdown(executor);
shutdown(workflowExecutor);
shutdown(cachingExecutorService);
} catch (InterruptedException e) {
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() {
public synchronized void start() {
startEventSource(eventSources.namedControllerResourceEventSource());

ExecutorServiceManager.executeAndWaitForAllToComplete(
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(
eventSources.additionalNamedEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)),
this::startEventSource,
getThreadNamer("start"));

ExecutorServiceManager.executeAndWaitForAllToComplete(
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(
eventSources.additionalNamedEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)),
this::startEventSource,
Expand All @@ -86,10 +86,14 @@ private static Function<NamedEventSource, String> getThreadNamer(String stage) {
};
}

private static Function<NamespaceChangeable, String> getEventSourceThreadNamer(String stage) {
return es -> stage + " -> " + es;
}

@Override
public synchronized void stop() {
stopEventSource(eventSources.namedControllerResourceEventSource());
ExecutorServiceManager.executeAndWaitForAllToComplete(
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(
eventSources.additionalNamedEventSources(),
this::stopEventSource,
getThreadNamer("stop"));
Expand Down Expand Up @@ -179,13 +183,15 @@ public void broadcastOnResourceEvent(ResourceAction action, P resource, P oldRes
public void changeNamespaces(Set<String> namespaces) {
eventSources.controllerResourceEventSource()
.changeNamespaces(namespaces);
eventSources
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(eventSources
.additionalEventSources()
.filter(NamespaceChangeable.class::isInstance)
.map(NamespaceChangeable.class::cast)
.filter(NamespaceChangeable::allowsNamespaceChanges)
.parallel()
.forEach(ies -> ies.changeNamespaces(namespaces));
.filter(NamespaceChangeable::allowsNamespaceChanges), e -> {
e.changeNamespaces(namespaces);
return null;
},
getEventSourceThreadNamer("changeNamespace"));
}

public Set<EventSource> getRegisteredEventSources() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public InformerManager(MixedOperation<T, KubernetesResourceList<T>, Resource<T>>
public void start() throws OperatorException {
initSources();
// make sure informers are all started before proceeding further
ExecutorServiceManager.executeAndWaitForAllToComplete(sources.values().stream(),
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(sources.values().stream(),
iw -> {
iw.start();
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;

import static com.google.common.truth.Truth.assertThat;
import static io.javaoperatorsdk.operator.api.config.ConfigurationService.log;
import static org.awaitility.Awaitility.await;

class MultiVersionCRDIT {
Expand Down Expand Up @@ -72,9 +73,15 @@ public void onStop(SharedIndexInformer informer, Throwable ex) {
acceptOnlyIfUnsetOrEqualToAlreadySet(errorMessage, watcherEx.getCause().getMessage());
}
final var apiTypeClass = informer.getApiTypeClass();

log.debug("Current resourceClassName: " + resourceClassName);

resourceClassName =
acceptOnlyIfUnsetOrEqualToAlreadySet(resourceClassName, apiTypeClass.getName());
System.out.println("Informer for " + HasMetadata.getFullResourceName(apiTypeClass)

log.debug("API Type Class: " + apiTypeClass.getName()
+ " - resource class name: " + resourceClassName);
log.info("Informer for " + HasMetadata.getFullResourceName(apiTypeClass)
+ " stopped due to: " + ex.getMessage());
}

Expand Down Expand Up @@ -132,7 +139,7 @@ void invalidEventsShouldStopInformerAndCallInformerStoppedHandler() {
operator.create(v1res);

await()
.atMost(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofMillis(50))
.untilAsserted(() -> {
// v1 is the stored version so trying to create a v2 version should fail because we cannot
Expand Down