Skip to content

feat: external resource with generated id support #1527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public <P extends HasMetadata> InformerConfigurationBuilder<R> withNamespacesInh
}

/**
* Whether or not the associated informer should track changes made to the parent
* Whether the associated informer should track changes made to the parent
* {@link io.javaoperatorsdk.operator.processing.Controller}'s namespaces configuration.
*
* @param followChanges {@code true} to reconfigure the associated informer when the parent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource;
Expand Down Expand Up @@ -63,7 +64,12 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() {
@Override
public synchronized void start() {
startEventSource(eventSources.namedControllerResourceEventSource());
eventSources.additionalNamedEventSources().parallel().forEach(this::startEventSource);
eventSources.additionalNamedEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
.parallel().forEach(this::startEventSource);
eventSources.additionalNamedEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
.parallel().forEach(this::startEventSource);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;

class NamedEventSource implements EventSource {

Expand Down Expand Up @@ -57,4 +58,9 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(original, name);
}

@Override
public EventSourceStartPriority priority() {
return original.priority();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
public abstract class AbstractEventSource implements EventSource {
private EventHandler handler;
private volatile boolean running = false;
private EventSourceStartPriority eventSourceStartPriority = EventSourceStartPriority.DEFAULT;

protected EventHandler getEventHandler() {
return handler;
Expand All @@ -29,4 +30,15 @@ public void start() throws OperatorException {
public void stop() throws OperatorException {
running = false;
}

@Override
public EventSourceStartPriority priority() {
return eventSourceStartPriority;
}

public AbstractEventSource setEventSourcePriority(
EventSourceStartPriority eventSourceStartPriority) {
this.eventSourceStartPriority = eventSourceStartPriority;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ public interface EventSource extends LifecycleAware {
*/
void setEventHandler(EventHandler handler);


default EventSourceStartPriority priority() {
return EventSourceStartPriority.DEFAULT;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.javaoperatorsdk.operator.processing.event.source;

public enum EventSourceStartPriority {

/**
* Event Sources with this priority are started and synced before the event source with DEFAULT
* priority. The use case to use this, if the event source holds an information regarding the
* state of a resource. For example a ConfigMap would store an ID of an external resource, in this
* case an event source that tracks the external resource might need this ID (event before the
* reconciliation) to check the state of the external resource. The only way to ensure that the ID
* is already cached is to start/sync related event source before the event source of the external
* resource.
*/
RESOURCE_STATE_LOADER, DEFAULT


}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
Expand Down Expand Up @@ -59,7 +60,9 @@ public void closeShouldCascadeToEventSources() {
@Test
public void startCascadesToEventSources() {
EventSource eventSource = mock(EventSource.class);
when(eventSource.priority()).thenReturn(EventSourceStartPriority.DEFAULT);
EventSource eventSource2 = mock(TimerEventSource.class);
when(eventSource2.priority()).thenReturn(EventSourceStartPriority.DEFAULT);
eventSourceManager.registerEventSource(eventSource);
eventSourceManager.registerEventSource(eventSource2);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.javaoperatorsdk.operator;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
import io.javaoperatorsdk.operator.sample.externalstate.ExternalStateCustomResource;
import io.javaoperatorsdk.operator.sample.externalstate.ExternalStateReconciler;
import io.javaoperatorsdk.operator.sample.externalstate.ExternalStateSpec;
import io.javaoperatorsdk.operator.support.ExternalIDGenServiceMock;

import static io.javaoperatorsdk.operator.sample.externalstate.ExternalStateReconciler.ID_KEY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class ExternalStateIT {

private static final String TEST_RESOURCE_NAME = "test1";

public static final String INITIAL_TEST_DATA = "initialTestData";
public static final String UPDATED_DATA = "updatedData";

private ExternalIDGenServiceMock externalService = ExternalIDGenServiceMock.getInstance();

@RegisterExtension
LocallyRunOperatorExtension operator =
LocallyRunOperatorExtension.builder().withReconciler(ExternalStateReconciler.class)
.build();

@Test
public void reconcilesResourceWithPersistentState() {
var resource = operator.create(testResource());
assertResourcesCreated(resource, INITIAL_TEST_DATA);

resource.getSpec().setData(UPDATED_DATA);
operator.replace(resource);
assertResourcesCreated(resource, UPDATED_DATA);

operator.delete(resource);
assertResourcesDeleted(resource);
}

private void assertResourcesDeleted(ExternalStateCustomResource resource) {
await().untilAsserted(() -> {
var cm = operator.get(ConfigMap.class, resource.getMetadata().getName());
var resources = externalService.listResources();
assertThat(cm).isNull();
assertThat(resources).isEmpty();
});
}

private void assertResourcesCreated(ExternalStateCustomResource resource,
String initialTestData) {
await().untilAsserted(() -> {
var cm = operator.get(ConfigMap.class, resource.getMetadata().getName());
var resources = externalService.listResources();
assertThat(resources).hasSize(1);
var extRes = externalService.listResources().get(0);
assertThat(extRes.getData()).isEqualTo(initialTestData);
assertThat(cm).isNotNull();
assertThat(cm.getData().get(ID_KEY)).isEqualTo(extRes.getId());
});
}

private ExternalStateCustomResource testResource() {
var res = new ExternalStateCustomResource();
res.setMetadata(new ObjectMetaBuilder()
.withName(TEST_RESOURCE_NAME)
.build());

res.setSpec(new ExternalStateSpec());
res.getSpec().setData(INITIAL_TEST_DATA);
return res;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.javaoperatorsdk.operator.sample.externalstate;

import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;

@Group("sample.javaoperatorsdk")
@Version("v1")
@ShortNames("ess")
public class ExternalStateCustomResource
extends CustomResource<ExternalStateSpec, Void>
implements Namespaced {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package io.javaoperatorsdk.operator.sample.externalstate;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.*;
import io.javaoperatorsdk.operator.junit.KubernetesClientAware;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource;
import io.javaoperatorsdk.operator.support.ExternalIDGenServiceMock;
import io.javaoperatorsdk.operator.support.ExternalResource;
import io.javaoperatorsdk.operator.support.TestExecutionInfoProvider;

@ControllerConfiguration()
public class ExternalStateReconciler
implements Reconciler<ExternalStateCustomResource>, Cleaner<ExternalStateCustomResource>,
EventSourceInitializer<ExternalStateCustomResource>, KubernetesClientAware,
TestExecutionInfoProvider {

public static final String ID_KEY = "id";
private final AtomicInteger numberOfExecutions = new AtomicInteger(0);

private ExternalIDGenServiceMock externalService = ExternalIDGenServiceMock.getInstance();
private KubernetesClient client;

InformerEventSource<ConfigMap, ExternalStateCustomResource> configMapEventSource;
PerResourcePollingEventSource<ExternalResource, ExternalStateCustomResource> externalResourceEventSource;

@Override
public UpdateControl<ExternalStateCustomResource> reconcile(
ExternalStateCustomResource resource, Context<ExternalStateCustomResource> context) {
numberOfExecutions.addAndGet(1);

var externalResource = context.getSecondaryResource(ExternalResource.class);
externalResource.ifPresentOrElse(r -> {
if (!r.getData().equals(resource.getSpec().getData())) {
updateExternalResource(resource, r);
}
}, () -> {
if (externalResource.isEmpty()) {
createExternalResource(resource);
}
});


return UpdateControl.noUpdate();
}

private void updateExternalResource(ExternalStateCustomResource resource,
ExternalResource externalResource) {
var newResource = new ExternalResource(externalResource.getId(), resource.getSpec().getData());
externalService.update(newResource);
externalResourceEventSource.handleRecentResourceUpdate(ResourceID.fromResource(resource),
newResource, externalResource);
}

private void createExternalResource(ExternalStateCustomResource resource) {
var createdResource =
externalService.create(new ExternalResource(resource.getSpec().getData()));
var configMap = new ConfigMapBuilder()
.withMetadata(new ObjectMetaBuilder()
.withName(resource.getMetadata().getName())
.withNamespace(resource.getMetadata().getNamespace())
.build())
.withData(Map.of(ID_KEY, createdResource.getId()))
.build();
configMap.addOwnerReference(resource);
client.configMaps().resource(configMap).create();

var primaryID = ResourceID.fromResource(resource);
// Making sure that the created resources are in the cache for the next reconciliation.
// This is critical in this case, since on next reconciliation if it would not be in the cache
// it would be created again.
configMapEventSource.handleRecentResourceCreate(primaryID, configMap);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method shouldn't be called directly, imo. It shouldn't even be part of the public interface.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case it is needed, since we want to be strongly consistent regarding the event source.
(For long term we might want to implement these features directly to the client, similarly as in go)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but that's not up to the user to ensure that consistency. If the user has to enforce that, then we need to redesign things because the user shouldn't even be aware of that part as it's internal to how the SDK works.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dependent resources will hide this, but some might do it on lower level. The thing is that without that it just don't work, (In all cases). We need probably do some work on these levels in the future. But this is how it is now, hiding that layer would won't solve the problem. And dependent resources using that, as other abstractions might use that user implement itself, so definitelly would not hide it.

externalResourceEventSource.handleRecentResourceCreate(primaryID, createdResource);
}

@Override
public DeleteControl cleanup(ExternalStateCustomResource resource,
Context<ExternalStateCustomResource> context) {
var externalResource = context.getSecondaryResource(ExternalResource.class);
externalResource.ifPresent(er -> {
externalService.delete(er.getId());
});
client.configMaps().inNamespace(resource.getMetadata().getNamespace())
.withName(resource.getMetadata().getName()).delete();
return DeleteControl.defaultDelete();
}

public int getNumberOfExecutions() {
return numberOfExecutions.get();
}

@Override
public Map<String, EventSource> prepareEventSources(
EventSourceContext<ExternalStateCustomResource> context) {

configMapEventSource = new InformerEventSource<>(
InformerConfiguration.from(ConfigMap.class, context).build(), context);
configMapEventSource.setEventSourcePriority(EventSourceStartPriority.RESOURCE_STATE_LOADER);

externalResourceEventSource = new PerResourcePollingEventSource<>(primaryResource -> {
var configMap = configMapEventSource.getSecondaryResource(primaryResource).orElse(null);
if (configMap == null) {
return Collections.emptySet();
}
var id = configMap.getData().get(ID_KEY);
var externalResource = externalService.read(id);
return externalResource.map(er -> Set.of(er)).orElse(Collections.emptySet());
}, context.getPrimaryCache(), 300L, ExternalResource.class);

return EventSourceInitializer.nameEventSources(configMapEventSource,
externalResourceEventSource);
}

@Override
public KubernetesClient getKubernetesClient() {
return client;
}

@Override
public void setKubernetesClient(KubernetesClient kubernetesClient) {
this.client = kubernetesClient;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.javaoperatorsdk.operator.sample.externalstate;

public class ExternalStateSpec {

private String data;

public String getData() {
return data;
}

public ExternalStateSpec setData(String data) {
this.data = data;
return this;
}
}
Loading