Skip to content

refactor: simplify handling of reused event sources #1518

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,17 @@ public List<DependentResourceSpec> getDependentResources() {
throw new IllegalArgumentException(
"A DependentResource named '" + name + "' already exists: " + spec);
}

var eventSourceName = dependent.useEventSourceWithName();
eventSourceName = Constants.NO_VALUE_SET.equals(eventSourceName) ? null : eventSourceName;

final var context = "DependentResource of type '" + dependentType.getName() + "'";
spec = new DependentResourceSpec(dependentType, config, name,
Set.of(dependent.dependsOn()),
instantiateIfNotDefault(dependent.readyPostcondition(), Condition.class, context),
instantiateIfNotDefault(dependent.reconcilePrecondition(), Condition.class, context),
instantiateIfNotDefault(dependent.deletePostcondition(), Condition.class, context),
dependent.provideEventSource());
eventSourceName);
specsMap.put(name, spec);
}

Expand Down Expand Up @@ -287,7 +291,6 @@ private Object createKubernetesResourceConfig(Class<? extends DependentResource>
OnDeleteFilter<? extends HasMetadata> onDeleteFilter = null;
GenericFilter<? extends HasMetadata> genericFilter = null;
ResourceDiscriminator<?, ? extends HasMetadata> resourceDiscriminator = null;
String eventSourceNameToUse = null;
if (kubeDependent != null) {
if (!Arrays.equals(KubernetesDependent.DEFAULT_NAMESPACES,
kubeDependent.namespaces())) {
Expand All @@ -314,14 +317,12 @@ private Object createKubernetesResourceConfig(Class<? extends DependentResource>
resourceDiscriminator =
instantiateIfNotDefault(kubeDependent.resourceDiscriminator(),
ResourceDiscriminator.class, context);
eventSourceNameToUse = Constants.NO_VALUE_SET.equals(kubeDependent.eventSourceToUse()) ? null
: kubeDependent.eventSourceToUse();
}

config =
new KubernetesDependentResourceConfig(namespaces, labelSelector, configuredNS,
resourceDiscriminator, onAddFilter,
onUpdateFilter, onDeleteFilter, genericFilter, eventSourceNameToUse);
onUpdateFilter, onDeleteFilter, genericFilter);

return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void replaceConfig(String name, Object newConfig, DependentResourceSpec<
namedDependentResourceSpecs.put(name,
new DependentResourceSpec<>(current.getDependentResourceClass(), newConfig, name,
current.getDependsOn(), current.getReadyCondition(), current.getReconcileCondition(),
current.getDeletePostCondition(), current.provideEventSource()));
current.getDeletePostCondition(), current.getUseEventSourceWithName().orElse(null)));
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -220,7 +220,8 @@ public ControllerConfiguration<R> build() {
KubernetesDependentResourceConfig c) {
return new DependentResourceSpec(spec.getDependentResourceClass(),
c.setNamespaces(namespaces), name, spec.getDependsOn(), spec.getReadyCondition(),
spec.getReconcileCondition(), spec.getDeletePostCondition(), spec.provideEventSource());
spec.getReconcileCondition(), spec.getDeletePostCondition(),
(String) spec.getUseEventSourceWithName().orElse(null));
}

public static <R extends HasMetadata> ControllerConfigurationOverrider<R> override(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ public class DependentResourceSpec<T extends DependentResource<?, ?>, C> {

private final Condition<?, ?> deletePostCondition;

private final boolean provideEventSource;
private final String useEventSourceWithName;

public DependentResourceSpec(Class<T> dependentResourceClass, C dependentResourceConfig,
String name, Set<String> dependsOn, Condition<?, ?> readyCondition,
Condition<?, ?> reconcileCondition, Condition<?, ?> deletePostCondition,
boolean provideEventSource) {
String useEventSourceWithName) {
this.dependentResourceClass = dependentResourceClass;
this.dependentResourceConfig = dependentResourceConfig;
this.name = name;
this.dependsOn = dependsOn;
this.readyCondition = readyCondition;
this.reconcileCondition = reconcileCondition;
this.deletePostCondition = deletePostCondition;
this.provideEventSource = provideEventSource;
this.useEventSourceWithName = useEventSourceWithName;
}

public Class<T> getDependentResourceClass() {
Expand Down Expand Up @@ -94,7 +94,7 @@ public Condition getDeletePostCondition() {
return deletePostCondition;
}

public boolean provideEventSource() {
return provideEventSource;
public Optional<String> getUseEventSourceWithName() {
return Optional.ofNullable(useEventSourceWithName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.javaoperatorsdk.operator.api.reconciler.dependent;

import java.util.Optional;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;

public interface DeferrableEventSourceHolder<P extends HasMetadata> {

default void useEventSourceWithName(String name) {}

Optional<String> resolveEventSource(EventSourceRetriever<P> eventSourceRetriever);
Copy link
Collaborator Author

@csviri csviri Oct 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Optional part feels a little weird to me. IMO should rather throw something like EventSourceNotFoundException.
What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not have to be even collected, since this will always fail to start the operator, and needs code change. So it is not even a config error if an exception is thrown here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's a little awkward. I'd like to collect it because, as a user, I prefer to know all the mistakes I made at once if possible instead of failing, fixing one thing only to have an error on another thing and fail again…


}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.api.reconciler.dependent;

import io.javaoperatorsdk.operator.api.reconciler.Constants;
import io.javaoperatorsdk.operator.processing.dependent.workflow.Condition;

import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_VALUE_SET;
Expand Down Expand Up @@ -59,11 +60,11 @@
String[] dependsOn() default {};

/**
* Setting this to false means that the event source provided by the dependent resource won't be
* used. This is helpful if more dependent resources created for the same type, and want to share
* a common event source. In that case an event source needs to be explicitly registered.
* Setting here a name of the event source means that dependent resource will use an event source
* registered with that name. So won't create one. This is helpful if more dependent resources
* created for the same type, and want to share a common event source.
*
* @return if the event source (if any) provided by the dependent resource should be used or not.
* @return event source name (if any) provided by the dependent resource should be used.
*/
boolean provideEventSource() default true;
String useEventSourceWithName() default NO_VALUE_SET;
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@ default Optional<ResourceEventSource<R, P>> eventSource(
return Optional.empty();
}

/**
* Calling this method, instructs the implementation to not provide an event source, even if it
* normally does.
*/
void doNotProvideEventSource();


default Optional<R> getSecondaryResource(P primary, Context<P> context) {
return Optional.empty();
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.javaoperatorsdk.operator.processing;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -32,7 +35,7 @@
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceAware;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DeferrableEventSourceHolder;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedDependentResourceContext;
import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow;
Expand Down Expand Up @@ -214,25 +217,35 @@ public void initAndRegisterEventSources(EventSourceContext<P> context) {
final var ownSources = provider.prepareEventSources(context);
ownSources.forEach(eventSourceManager::registerEventSource);
}
managedWorkflow
.getDependentResourcesByName().entrySet().stream()
.forEach(drEntry -> {
if (drEntry.getValue() instanceof EventSourceProvider) {
final var provider = (EventSourceProvider) drEntry.getValue();
final var source = provider.initEventSource(context);
eventSourceManager.registerEventSource(drEntry.getKey(), source);
} else {
Optional<ResourceEventSource> eventSource =
drEntry.getValue().eventSource(context);
eventSource.ifPresent(es -> {
eventSourceManager.registerEventSource(drEntry.getKey(), es);
});
}
});
managedWorkflow.getDependentResourcesByName().entrySet().stream().map(Map.Entry::getValue)
.filter(EventSourceAware.class::isInstance)
.forEach(dr -> ((EventSourceAware) dr)
.selectEventSources(eventSourceManager));

// register created event sources
final var dependentResourcesByName = managedWorkflow.getDependentResourcesByName();
final var size = dependentResourcesByName.size();
if (size > 0) {
dependentResourcesByName.forEach((key, value) -> {
if (value instanceof EventSourceProvider) {
final var provider = (EventSourceProvider) value;
final var source = provider.initEventSource(context);
eventSourceManager.registerEventSource(key, source);
} else {
Optional<ResourceEventSource> eventSource = value.eventSource(context);
eventSource.ifPresent(es -> eventSourceManager.registerEventSource(key, es));
}
});

// resolve event sources referenced by name for dependents that reuse an existing event source
final Map<String, List<DeferrableEventSourceHolder>> unresolvable = new HashMap<>(size);
dependentResourcesByName.values().stream()
.filter(DeferrableEventSourceHolder.class::isInstance)
.map(DeferrableEventSourceHolder.class::cast)
.forEach(dr -> ((DeferrableEventSourceHolder<P>) dr)
.resolveEventSource(eventSourceManager).ifPresent(unresolved -> unresolvable
.computeIfAbsent(unresolved, s -> new ArrayList<>()).add(dr)));
if (!unresolvable.isEmpty()) {
throw new IllegalStateException(
"Couldn't resolve referenced EventSources: " + unresolvable);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
import io.javaoperatorsdk.operator.api.reconciler.ResourceDiscriminator;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
import io.javaoperatorsdk.operator.api.reconciler.dependent.ReconcileResult;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;

@Ignore
public abstract class AbstractDependentResource<R, P extends HasMetadata>
Expand All @@ -29,7 +27,6 @@ public abstract class AbstractDependentResource<R, P extends HasMetadata>
protected Creator<R, P> creator;
protected Updater<R, P> updater;
protected BulkDependentResource<R, P> bulkDependentResource;
private boolean returnEventSource = true;

protected List<ResourceDiscriminator<R, P>> resourceDiscriminator = new ArrayList<>(1);

Expand All @@ -41,23 +38,6 @@ public AbstractDependentResource() {
bulkDependentResource = bulk ? (BulkDependentResource<R, P>) this : null;
}

@Override
public void doNotProvideEventSource() {
this.returnEventSource = false;
}

@Override
public Optional<ResourceEventSource<R, P>> eventSource(EventSourceContext<P> eventSourceContext) {
if (!returnEventSource) {
return Optional.empty();
} else {
return Optional.of(provideEventSource(eventSourceContext));
}
}

protected abstract ResourceEventSource<R, P> provideEventSource(
EventSourceContext<P> eventSourceContext);

@Override
public ReconcileResult<R> reconcile(P primary, Context<P> context) {
if (bulk) {
Expand Down Expand Up @@ -239,7 +219,4 @@ protected int lastKnownBulkSize() {
return resourceDiscriminator.size();
}

protected boolean getReturnEventSource() {
return returnEventSource;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceAware;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DeferrableEventSourceHolder;
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller;
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
Expand All @@ -17,7 +17,7 @@

@Ignore
public abstract class AbstractEventSourceHolderDependentResource<R, P extends HasMetadata, T extends ResourceEventSource<R, P>>
extends AbstractDependentResource<R, P> implements EventSourceAware<P> {
extends AbstractDependentResource<R, P> implements DeferrableEventSourceHolder<P> {

private T eventSource;
private final Class<R> resourceType;
Expand All @@ -26,45 +26,51 @@ public abstract class AbstractEventSourceHolderDependentResource<R, P extends Ha
protected OnUpdateFilter<R> onUpdateFilter;
protected OnDeleteFilter<R> onDeleteFilter;
protected GenericFilter<R> genericFilter;
protected String eventSourceToUse;
protected String eventSourceNameToUse;

protected AbstractEventSourceHolderDependentResource(Class<R> resourceType) {
this.resourceType = resourceType;
}


public ResourceEventSource<R, P> provideEventSource(EventSourceContext<P> context) {
public Optional<ResourceEventSource<R, P>> eventSource(EventSourceContext<P> context) {
// some sub-classes (e.g. KubernetesDependentResource) can have their event source created
// before this method is called in the managed case, so only create the event source if it
// hasn't already been set.
// The filters are applied automatically only if event source is created automatically. So if an
// event source
// is shared between dependent resources this does not override the existing filters.
if (eventSource == null) {
if (eventSource == null && eventSourceNameToUse == null) {
setEventSource(createEventSource(context));
applyFilters();
}
return eventSource;
return Optional.ofNullable(eventSource);
}

@SuppressWarnings("unchecked")
@Override
public void selectEventSources(EventSourceRetriever<P> eventSourceRetriever) {
if (!getReturnEventSource()) {
if (eventSourceToUse != null) {
setEventSource(
(T) eventSourceRetriever.getResourceEventSourceFor(resourceType(), eventSourceToUse));
} else {
setEventSource((T) eventSourceRetriever.getResourceEventSourceFor(resourceType()));
public Optional<String> resolveEventSource(EventSourceRetriever<P> eventSourceRetriever) {
if (eventSourceNameToUse != null && eventSource == null) {
final var source =
eventSourceRetriever.getResourceEventSourceFor(resourceType(), eventSourceNameToUse);
if (source == null) {
return Optional.of(eventSourceNameToUse);
}
setEventSource((T) source);
}
return Optional.empty();
}

/** To make this backwards compatible even for respect of overriding */
@SuppressWarnings("unchecked")
public T initEventSource(EventSourceContext<P> context) {
return (T) eventSource(context).orElseThrow();
}

@Override
public void useEventSourceWithName(String name) {
this.eventSourceNameToUse = name;
}

@Override
public Class<R> resourceType() {
return resourceType;
Expand Down Expand Up @@ -116,10 +122,4 @@ public void setOnUpdateFilter(OnUpdateFilter<R> onUpdateFilter) {
public void setOnDeleteFilter(OnDeleteFilter<R> onDeleteFilter) {
this.onDeleteFilter = onDeleteFilter;
}

public AbstractEventSourceHolderDependentResource<R, P, T> setEventSourceToUse(
String eventSourceToUse) {
this.eventSourceToUse = eventSourceToUse;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,4 @@

Class<? extends ResourceDiscriminator> resourceDiscriminator() default ResourceDiscriminator.class;

String eventSourceToUse() default NO_VALUE_SET;
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ public void configureWith(KubernetesDependentResourceConfig<R> config) {
if (discriminator != null) {
setResourceDiscriminator(discriminator);
}
config.getEventSourceToUse().ifPresent(n -> {
doNotProvideEventSource();
setEventSourceToUse(n);
});
}

private void configureWith(String labelSelector, Set<String> namespaces,
Expand Down
Loading