diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java index b996eeead9..4e4f945515 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java @@ -20,7 +20,6 @@ import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; -import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.AnnotationDependentResourceConfigurator; import io.javaoperatorsdk.operator.processing.dependent.workflow.Condition; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; @@ -121,19 +120,17 @@ public ResourceEventFilter
getEventFilter() {
Class filter = filterType.getConstructor().newInstance();
-
- if (answer == null) {
- answer = filter;
- } else {
- answer = answer.and(filter);
- }
- } catch (Exception e) {
- throw new IllegalArgumentException(e);
+ for (var filterType : filterTypes) {
+ try {
+ ResourceEventFilter filter = filterType.getConstructor().newInstance();
+
+ if (answer == null) {
+ answer = filter;
+ } else {
+ answer = answer.and(filter);
}
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e);
}
}
return answer != null ? answer : ResourceEventFilters.passthrough();
@@ -177,22 +174,6 @@ private void configureFromAnnotatedReconciler(Object instance) {
}
}
- @SuppressWarnings("unchecked")
- private void configureFromCustomAnnotation(Object instance) {
- if (instance instanceof AnnotationDependentResourceConfigurator) {
- AnnotationDependentResourceConfigurator configurator =
- (AnnotationDependentResourceConfigurator) instance;
- final Class extends Annotation> configurationClass =
- (Class extends Annotation>) Utils.getFirstTypeArgumentFromInterface(
- instance.getClass(), AnnotationDependentResourceConfigurator.class);
- final var configAnnotation = instance.getClass().getAnnotation(configurationClass);
- // always called even if the annotation is null so that implementations can provide default
- // values
- final var config = configurator.configFrom(configAnnotation, this);
- configurator.configureWith(config);
- }
- }
-
@Override
@SuppressWarnings("unchecked")
public Optional
private final boolean contextInitializer;
private final boolean isCleaner;
private final Metrics metrics;
- private final ManagedWorkflow managedWorkflow;
+ private final Workflow managedWorkflow;
private final GroupVersionKind associatedGVK;
private final EventProcessor eventProcessor;
@@ -83,8 +83,9 @@ public Controller(Reconciler reconciler,
this.metrics = Optional.ofNullable(configurationService.getMetrics()).orElse(Metrics.NOOP);
contextInitializer = reconciler instanceof ContextInitializer;
isCleaner = reconciler instanceof Cleaner;
- managedWorkflow = configurationService.getWorkflowFactory().workflowFor(configuration);
- managedWorkflow.resolve(kubernetesClient, configuration);
+
+ final var managed = configurationService.getWorkflowFactory().workflowFor(configuration);
+ managedWorkflow = managed.resolve(kubernetesClient, configuration);
eventSourceManager = new EventSourceManager<>(this);
eventProcessor = new EventProcessor<>(eventSourceManager);
@@ -135,7 +136,7 @@ public Map execute() throws Exception {
initContextIfNeeded(resource, context);
- if (!managedWorkflow.isEmptyWorkflow()) {
+ if (!managedWorkflow.isEmpty()) {
var res = managedWorkflow.reconcile(resource, context);
((DefaultManagedDependentResourceContext) context.managedDependentResourceContext())
.setWorkflowExecutionResult(res);
@@ -180,7 +181,7 @@ public Map context) {
final var dependentResourcesByName = managedWorkflow.getDependentResourcesByName();
final var size = dependentResourcesByName.size();
if (size > 0) {
- dependentResourcesByName.forEach((key, value) -> {
- if (value instanceof EventSourceProvider) {
- final var provider = (EventSourceProvider) value;
+ dependentResourcesByName.forEach((key, dependentResource) -> {
+ if (dependentResource instanceof EventSourceProvider) {
+ final var provider = (EventSourceProvider) dependentResource;
final var source = provider.initEventSource(context);
eventSourceManager.registerEventSource(key, source);
} else {
- Optional implements ManagedWorkflow {
- private final Workflow workflow;
- private final boolean isEmptyWorkflow;
- private boolean resolved;
+ private final Set workflow) {
- isEmptyWorkflow = dependentResourceSpecs.isEmpty();
- this.workflow = workflow;
+ @Override
+ @SuppressWarnings("unused")
+ public List context) {
- checkIfResolved();
- return workflow.reconcile(primary, context);
+ protected Set context) {
- checkIfResolved();
- return workflow.cleanup(primary, context);
+ protected Set resolve(KubernetesClient client,
+ @SuppressWarnings("unchecked")
+ public Workflow resolve(KubernetesClient client,
ControllerConfiguration configuration) {
- if (!resolved) {
- workflow.resolve(client, configuration);
- resolved = true;
+ final var alreadyResolved = new HashMap configuration) {
+ final DependentResource primary resource
+ */
+@SuppressWarnings("rawtypes")
+public class DefaultWorkflow implements Workflow {
+
+ private final Map context) {
+ WorkflowReconcileExecutor workflowReconcileExecutor =
+ new WorkflowReconcileExecutor<>(this, primary, context);
+ var result = workflowReconcileExecutor.reconcile();
+ if (throwExceptionAutomatically) {
+ result.throwAggregateExceptionIfErrorsPresent();
+ }
+ return result;
+ }
+
+ @Override
+ public WorkflowCleanupResult cleanup(P primary, Context context) {
+ WorkflowCleanupExecutor workflowCleanupExecutor =
+ new WorkflowCleanupExecutor<>(this, primary, context);
+ var result = workflowCleanupExecutor.cleanup();
+ if (throwExceptionAutomatically) {
+ result.throwAggregateExceptionIfErrorsPresent();
+ }
+ return result;
+ }
+
+ @Override
+ public Set configuration) {}
+
+ public Optional {
- ManagedWorkflow noOpWorkflow = new ManagedWorkflow() {
- @Override
- public WorkflowReconcileResult reconcile(HasMetadata primary, Context context) {
- throw new IllegalStateException("Shouldn't be called");
- }
+ @SuppressWarnings("unused")
+ default List context);
-
- WorkflowCleanupResult cleanup(P primary, Context context);
-
- boolean isCleaner();
-
- boolean isEmptyWorkflow();
-
- Map resolve(KubernetesClient client, ControllerConfiguration configuration);
+ Workflow resolve(KubernetesClient client, ControllerConfiguration configuration);
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowFactory.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowFactory.java
index 5b5135e95a..4923084de8 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowFactory.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowFactory.java
@@ -1,22 +1,35 @@
package io.javaoperatorsdk.operator.processing.dependent.workflow;
+import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
-import static io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow.noOpWorkflow;
-
-public interface ManagedWorkflowFactory {
+public interface ManagedWorkflowFactory Workflow createWorkflow(
+
+ public ManagedWorkflow createWorkflow(
List DefaultManagedWorkflow createAsDefault(
+ List configuration) {
- final var spec = configuration.getDependentResources().stream()
- .filter(drs -> drs.getName().equals(getName()))
- .findFirst().orElseThrow();
-
- final DependentResource primary resource
- */
-@SuppressWarnings("rawtypes")
-public class Workflow {
+public interface Workflow {
- public static final boolean THROW_EXCEPTION_AUTOMATICALLY_DEFAULT = true;
+ boolean THROW_EXCEPTION_AUTOMATICALLY_DEFAULT = true;
- private final Map context) {
+ throw new UnsupportedOperationException("Implement this");
}
- public DependentResource getDependentResourceFor(DependentResourceNode node) {
- throwIfUnresolved();
- return dependentResource(node);
+ default WorkflowCleanupResult cleanup(P primary, Context context) {
+ throw new UnsupportedOperationException("Implement this");
}
- private DependentResource dependentResource(DependentResourceNode node) {
- return ((AbstractDependentResourceNode) dependentResourceNodes.get(node.getName()))
- .getDependentResource();
+ @SuppressWarnings("rawtypes")
+ default Set context) {
- throwIfUnresolved();
- WorkflowReconcileExecutor workflowReconcileExecutor =
- new WorkflowReconcileExecutor<>(this, primary, context);
- var result = workflowReconcileExecutor.reconcile();
- if (throwExceptionAutomatically) {
- result.throwAggregateExceptionIfErrorsPresent();
- }
- return result;
+ default boolean hasCleaner() {
+ return false;
}
- public WorkflowCleanupResult cleanup(P primary, Context context) {
- throwIfUnresolved();
- WorkflowCleanupExecutor workflowCleanupExecutor =
- new WorkflowCleanupExecutor<>(this, primary, context);
- var result = workflowCleanupExecutor.cleanup();
- if (throwExceptionAutomatically) {
- result.throwAggregateExceptionIfErrorsPresent();
- }
- return result;
- }
-
- Set configuration) {
- if (!resolved) {
- dependentResourceNodes.values().forEach(drn -> drn.resolve(client, configuration));
- resolved = true;
- }
- }
-
- boolean hasCleaner() {
- return hasCleaner;
+ default boolean isEmpty() {
+ return true;
}
- static boolean isDeletable(Class extends DependentResource> drClass) {
- final var isDeleter = Deleter.class.isAssignableFrom(drClass);
- if (!isDeleter) {
- return false;
- }
-
- if (KubernetesDependentResource.class.isAssignableFrom(drClass)) {
- return !GarbageCollected.class.isAssignableFrom(drClass);
- }
- return true;
+ @SuppressWarnings("rawtypes")
+ default Map {
- private final Map addDependentResource(DependentResource dependentResource) {
- currentNode = new DefaultDependentResourceNode<>(dependentResource);
+ currentNode = new DependentResourceNode<>(dependentResource);
isCleaner = dependentResource.isDeletable();
final var name = currentNode.getName();
dependentResourceNodes.put(name, currentNode);
@@ -64,7 +61,7 @@ public WorkflowBuilder withDeletePostcondition(Condition deletePostcondition)
DependentResourceNode getNodeByDependentResource(DependentResource, ?> dependentResource) {
// first check by name
final var node =
- dependentResourceNodes.get(DefaultDependentResourceNode.getNameFor(dependentResource));
+ dependentResourceNodes.get(DependentResourceNode.getNameFor(dependentResource));
if (node != null) {
return node;
} else {
@@ -75,26 +72,13 @@ DependentResourceNode getNodeByDependentResource(DependentResource, ?> depende
}
}
- public boolean isThrowExceptionAutomatically() {
- return throwExceptionAutomatically;
- }
-
public WorkflowBuilder withThrowExceptionFurther(boolean throwExceptionFurther) {
this.throwExceptionAutomatically = throwExceptionFurther;
return this;
}
public Workflow build() {
- return build(ExecutorServiceManager.instance().workflowExecutorService());
- }
-
- public Workflow build(int parallelism) {
- return build(Executors.newFixedThreadPool(parallelism));
- }
-
- public Workflow build(ExecutorService executorService) {
- // workflow has been built from dependent resources so it is already resolved
- return new Workflow(new HashSet<>(dependentResourceNodes.values()), executorService,
- throwExceptionAutomatically, true, isCleaner);
+ return new DefaultWorkflow(new HashSet<>(dependentResourceNodes.values()),
+ throwExceptionAutomatically, isCleaner);
}
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java
index 12656c2a8a..3a65d8b112 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java
@@ -10,6 +10,7 @@
import org.slf4j.LoggerFactory;
import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
@@ -52,7 +53,7 @@ private synchronized void handleCleanup(DependentResourceNode dependentResourceN
return;
}
- Future> nodeFuture = workflow.getExecutorService()
+ Future> nodeFuture = ExecutorServiceManager.instance().workflowExecutorService()
.submit(new CleanupExecutor<>(dependentResourceNode));
markAsExecuting(dependentResourceNode, nodeFuture);
log.debug("Submitted for cleanup: {}", dependentResourceNode);
@@ -116,10 +117,10 @@ private boolean hasErroredDependent(DependentResourceNode dependentResourceNode)
private WorkflowCleanupResult createCleanupResult() {
final var erroredDependents = getErroredDependents();
final var postConditionNotMet = postDeleteConditionNotMet.stream()
- .map(workflow::getDependentResourceFor)
+ .map(DependentResourceNode::getDependentResource)
.collect(Collectors.toList());
final var deleteCalled = this.deleteCalled.stream()
- .map(workflow::getDependentResourceFor)
+ .map(DependentResourceNode::getDependentResource)
.collect(Collectors.toList());
return new WorkflowCleanupResult(erroredDependents, postConditionNotMet, deleteCalled);
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutor.java
index 98abe41977..773bb0332f 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutor.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutor.java
@@ -11,6 +11,7 @@
import org.slf4j.LoggerFactory;
import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
@@ -63,11 +64,11 @@ private synchronized