diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java index 245cd6951e..52f71501a9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java @@ -164,7 +164,7 @@ public

InformerConfigurationBuilder withNamespacesInh } /** - * Whether or not the associated informer should track changes made to the parent + * Whether the associated informer should track changes made to the parent * {@link io.javaoperatorsdk.operator.processing.Controller}'s namespaces configuration. * * @param followChanges {@code true} to reconfigure the associated informer when the parent diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index fc68e6e413..d091d442f6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -17,6 +17,7 @@ import io.javaoperatorsdk.operator.processing.Controller; import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority; import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware; import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource; @@ -63,7 +64,12 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() { @Override public synchronized void start() { startEventSource(eventSources.namedControllerResourceEventSource()); - eventSources.additionalNamedEventSources().parallel().forEach(this::startEventSource); + eventSources.additionalNamedEventSources() + .filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)) + .parallel().forEach(this::startEventSource); + eventSources.additionalNamedEventSources() + .filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)) + .parallel().forEach(this::startEventSource); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java index 92d1f40096..5822711cc5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java @@ -4,6 +4,7 @@ import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority; class NamedEventSource implements EventSource { @@ -57,4 +58,9 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(original, name); } + + @Override + public EventSourceStartPriority priority() { + return original.priority(); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java index a6666ba81d..4eaee91add 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java @@ -6,6 +6,7 @@ public abstract class AbstractEventSource implements EventSource { private EventHandler handler; private volatile boolean running = false; + private EventSourceStartPriority eventSourceStartPriority = EventSourceStartPriority.DEFAULT; protected EventHandler getEventHandler() { return handler; @@ -29,4 +30,15 @@ public void start() throws OperatorException { public void stop() throws OperatorException { running = false; } + + @Override + public EventSourceStartPriority priority() { + return eventSourceStartPriority; + } + + public AbstractEventSource setEventSourcePriority( + EventSourceStartPriority eventSourceStartPriority) { + this.eventSourceStartPriority = eventSourceStartPriority; + return this; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java index 9a5afe6f40..76c8ca164d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java @@ -20,5 +20,7 @@ public interface EventSource extends LifecycleAware { */ void setEventHandler(EventHandler handler); - + default EventSourceStartPriority priority() { + return EventSourceStartPriority.DEFAULT; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSourceStartPriority.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSourceStartPriority.java new file mode 100644 index 0000000000..d1d758bdb4 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSourceStartPriority.java @@ -0,0 +1,17 @@ +package io.javaoperatorsdk.operator.processing.event.source; + +public enum EventSourceStartPriority { + + /** + * Event Sources with this priority are started and synced before the event source with DEFAULT + * priority. The use case to use this, if the event source holds an information regarding the + * state of a resource. For example a ConfigMap would store an ID of an external resource, in this + * case an event source that tracks the external resource might need this ID (event before the + * reconciliation) to check the state of the external resource. The only way to ensure that the ID + * is already cached is to start/sync related event source before the event source of the external + * resource. + */ + RESOURCE_STATE_LOADER, DEFAULT + + +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java index 2782ed52d1..aa9bccde09 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java @@ -12,6 +12,7 @@ import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.processing.Controller; import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource; import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource; @@ -59,7 +60,9 @@ public void closeShouldCascadeToEventSources() { @Test public void startCascadesToEventSources() { EventSource eventSource = mock(EventSource.class); + when(eventSource.priority()).thenReturn(EventSourceStartPriority.DEFAULT); EventSource eventSource2 = mock(TimerEventSource.class); + when(eventSource2.priority()).thenReturn(EventSourceStartPriority.DEFAULT); eventSourceManager.registerEventSource(eventSource); eventSourceManager.registerEventSource(eventSource2); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/ExternalStateIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/ExternalStateIT.java new file mode 100644 index 0000000000..ad81713239 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/ExternalStateIT.java @@ -0,0 +1,77 @@ +package io.javaoperatorsdk.operator; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.sample.externalstate.ExternalStateCustomResource; +import io.javaoperatorsdk.operator.sample.externalstate.ExternalStateReconciler; +import io.javaoperatorsdk.operator.sample.externalstate.ExternalStateSpec; +import io.javaoperatorsdk.operator.support.ExternalIDGenServiceMock; + +import static io.javaoperatorsdk.operator.sample.externalstate.ExternalStateReconciler.ID_KEY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class ExternalStateIT { + + private static final String TEST_RESOURCE_NAME = "test1"; + + public static final String INITIAL_TEST_DATA = "initialTestData"; + public static final String UPDATED_DATA = "updatedData"; + + private ExternalIDGenServiceMock externalService = ExternalIDGenServiceMock.getInstance(); + + @RegisterExtension + LocallyRunOperatorExtension operator = + LocallyRunOperatorExtension.builder().withReconciler(ExternalStateReconciler.class) + .build(); + + @Test + public void reconcilesResourceWithPersistentState() { + var resource = operator.create(testResource()); + assertResourcesCreated(resource, INITIAL_TEST_DATA); + + resource.getSpec().setData(UPDATED_DATA); + operator.replace(resource); + assertResourcesCreated(resource, UPDATED_DATA); + + operator.delete(resource); + assertResourcesDeleted(resource); + } + + private void assertResourcesDeleted(ExternalStateCustomResource resource) { + await().untilAsserted(() -> { + var cm = operator.get(ConfigMap.class, resource.getMetadata().getName()); + var resources = externalService.listResources(); + assertThat(cm).isNull(); + assertThat(resources).isEmpty(); + }); + } + + private void assertResourcesCreated(ExternalStateCustomResource resource, + String initialTestData) { + await().untilAsserted(() -> { + var cm = operator.get(ConfigMap.class, resource.getMetadata().getName()); + var resources = externalService.listResources(); + assertThat(resources).hasSize(1); + var extRes = externalService.listResources().get(0); + assertThat(extRes.getData()).isEqualTo(initialTestData); + assertThat(cm).isNotNull(); + assertThat(cm.getData().get(ID_KEY)).isEqualTo(extRes.getId()); + }); + } + + private ExternalStateCustomResource testResource() { + var res = new ExternalStateCustomResource(); + res.setMetadata(new ObjectMetaBuilder() + .withName(TEST_RESOURCE_NAME) + .build()); + + res.setSpec(new ExternalStateSpec()); + res.getSpec().setData(INITIAL_TEST_DATA); + return res; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/externalstate/ExternalStateCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/externalstate/ExternalStateCustomResource.java new file mode 100644 index 0000000000..70a51ecc72 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/externalstate/ExternalStateCustomResource.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.sample.externalstate; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("ess") +public class ExternalStateCustomResource + extends CustomResource + implements Namespaced { +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/externalstate/ExternalStateReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/externalstate/ExternalStateReconciler.java new file mode 100644 index 0000000000..3d202e1d52 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/externalstate/ExternalStateReconciler.java @@ -0,0 +1,135 @@ +package io.javaoperatorsdk.operator.sample.externalstate; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.*; +import io.javaoperatorsdk.operator.junit.KubernetesClientAware; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; +import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource; +import io.javaoperatorsdk.operator.support.ExternalIDGenServiceMock; +import io.javaoperatorsdk.operator.support.ExternalResource; +import io.javaoperatorsdk.operator.support.TestExecutionInfoProvider; + +@ControllerConfiguration() +public class ExternalStateReconciler + implements Reconciler, Cleaner, + EventSourceInitializer, KubernetesClientAware, + TestExecutionInfoProvider { + + public static final String ID_KEY = "id"; + private final AtomicInteger numberOfExecutions = new AtomicInteger(0); + + private ExternalIDGenServiceMock externalService = ExternalIDGenServiceMock.getInstance(); + private KubernetesClient client; + + InformerEventSource configMapEventSource; + PerResourcePollingEventSource externalResourceEventSource; + + @Override + public UpdateControl reconcile( + ExternalStateCustomResource resource, Context context) { + numberOfExecutions.addAndGet(1); + + var externalResource = context.getSecondaryResource(ExternalResource.class); + externalResource.ifPresentOrElse(r -> { + if (!r.getData().equals(resource.getSpec().getData())) { + updateExternalResource(resource, r); + } + }, () -> { + if (externalResource.isEmpty()) { + createExternalResource(resource); + } + }); + + + return UpdateControl.noUpdate(); + } + + private void updateExternalResource(ExternalStateCustomResource resource, + ExternalResource externalResource) { + var newResource = new ExternalResource(externalResource.getId(), resource.getSpec().getData()); + externalService.update(newResource); + externalResourceEventSource.handleRecentResourceUpdate(ResourceID.fromResource(resource), + newResource, externalResource); + } + + private void createExternalResource(ExternalStateCustomResource resource) { + var createdResource = + externalService.create(new ExternalResource(resource.getSpec().getData())); + var configMap = new ConfigMapBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName(resource.getMetadata().getName()) + .withNamespace(resource.getMetadata().getNamespace()) + .build()) + .withData(Map.of(ID_KEY, createdResource.getId())) + .build(); + configMap.addOwnerReference(resource); + client.configMaps().resource(configMap).create(); + + var primaryID = ResourceID.fromResource(resource); + // Making sure that the created resources are in the cache for the next reconciliation. + // This is critical in this case, since on next reconciliation if it would not be in the cache + // it would be created again. + configMapEventSource.handleRecentResourceCreate(primaryID, configMap); + externalResourceEventSource.handleRecentResourceCreate(primaryID, createdResource); + } + + @Override + public DeleteControl cleanup(ExternalStateCustomResource resource, + Context context) { + var externalResource = context.getSecondaryResource(ExternalResource.class); + externalResource.ifPresent(er -> { + externalService.delete(er.getId()); + }); + client.configMaps().inNamespace(resource.getMetadata().getNamespace()) + .withName(resource.getMetadata().getName()).delete(); + return DeleteControl.defaultDelete(); + } + + public int getNumberOfExecutions() { + return numberOfExecutions.get(); + } + + @Override + public Map prepareEventSources( + EventSourceContext context) { + + configMapEventSource = new InformerEventSource<>( + InformerConfiguration.from(ConfigMap.class, context).build(), context); + configMapEventSource.setEventSourcePriority(EventSourceStartPriority.RESOURCE_STATE_LOADER); + + externalResourceEventSource = new PerResourcePollingEventSource<>(primaryResource -> { + var configMap = configMapEventSource.getSecondaryResource(primaryResource).orElse(null); + if (configMap == null) { + return Collections.emptySet(); + } + var id = configMap.getData().get(ID_KEY); + var externalResource = externalService.read(id); + return externalResource.map(er -> Set.of(er)).orElse(Collections.emptySet()); + }, context.getPrimaryCache(), 300L, ExternalResource.class); + + return EventSourceInitializer.nameEventSources(configMapEventSource, + externalResourceEventSource); + } + + @Override + public KubernetesClient getKubernetesClient() { + return client; + } + + @Override + public void setKubernetesClient(KubernetesClient kubernetesClient) { + this.client = kubernetesClient; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/externalstate/ExternalStateSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/externalstate/ExternalStateSpec.java new file mode 100644 index 0000000000..eda0a887eb --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/externalstate/ExternalStateSpec.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.sample.externalstate; + +public class ExternalStateSpec { + + private String data; + + public String getData() { + return data; + } + + public ExternalStateSpec setData(String data) { + this.data = data; + return this; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/support/ExternalIDGenServiceMock.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/support/ExternalIDGenServiceMock.java new file mode 100644 index 0000000000..76d11c3044 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/support/ExternalIDGenServiceMock.java @@ -0,0 +1,42 @@ +package io.javaoperatorsdk.operator.support; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +public class ExternalIDGenServiceMock { + + private static ExternalIDGenServiceMock serviceMock = new ExternalIDGenServiceMock(); + + private Map resourceMap = new ConcurrentHashMap<>(); + + public ExternalResource create(ExternalResource externalResource) { + if (externalResource.getId() != null) { + throw new IllegalArgumentException("ID provided for external resource"); + } + String id = UUID.randomUUID().toString(); + var newResource = new ExternalResource(id, externalResource.getData()); + resourceMap.put(id, newResource); + return newResource; + } + + public Optional read(String id) { + return Optional.ofNullable(resourceMap.get(id)); + } + + public ExternalResource update(ExternalResource externalResource) { + return resourceMap.put(externalResource.getId(), externalResource); + } + + public Optional delete(String id) { + return Optional.ofNullable(resourceMap.remove(id)); + } + + public List listResources() { + return new ArrayList<>(resourceMap.values()); + } + + public static ExternalIDGenServiceMock getInstance() { + return serviceMock; + } + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/support/ExternalResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/support/ExternalResource.java index cb8d4b74e5..2d897bbdc7 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/support/ExternalResource.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/support/ExternalResource.java @@ -12,6 +12,15 @@ public class ExternalResource { private String id; private String data; + /** + * For the case that ide is generated by server + * + * @param data to store + */ + public ExternalResource(String data) { + this.data = data; + } + public ExternalResource(String id, String data) { this.id = id; this.data = data; diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/support/ExternalServiceMock.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/support/ExternalServiceMock.java index eea26637fc..d939f253ac 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/support/ExternalServiceMock.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/support/ExternalServiceMock.java @@ -13,6 +13,9 @@ public class ExternalServiceMock { private Map resourceMap = new ConcurrentHashMap<>(); public ExternalResource create(ExternalResource externalResource) { + if (externalResource.getId() == null) { + throw new IllegalArgumentException("id of the resource is null"); + } resourceMap.put(externalResource.getId(), externalResource); return externalResource; }