Skip to content

fix: shut down operator when failing to start #1138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/snapshot-releases.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ concurrency:
cancel-in-progress: true
on:
push:
branches: [ main, v1, next ]
branches: [ main, v1, v2, next ]
workflow_dispatch:
jobs:
test:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,26 @@ public List<Controller> getControllers() {
* and start the cluster monitoring processes.
*/
public void start() {
controllers.shouldStart();

final var version = configurationService.getVersion();
log.info(
"Operator SDK {} (commit: {}) built on {} starting...",
version.getSdkVersion(),
version.getCommit(),
version.getBuiltTime());

final var clientVersion = Version.clientVersion();
log.info("Client version: {}", clientVersion);

ExecutorServiceManager.init(configurationService);
controllers.start();
try {
controllers.shouldStart();

final var version = configurationService.getVersion();
log.info(
"Operator SDK {} (commit: {}) built on {} starting...",
version.getSdkVersion(),
version.getCommit(),
version.getBuiltTime());

final var clientVersion = Version.clientVersion();
log.info("Client version: {}", clientVersion);

ExecutorServiceManager.init(configurationService);
controllers.start();
} catch (Exception e) {
log.error("Error starting operator", e);
stop();
throw e;
}
}

@Override
Expand Down Expand Up @@ -166,10 +172,6 @@ public synchronized void start() {
}

public synchronized void stop() {
if (!started) {
return;
}

this.controllers.values().parallelStream().forEach(closeable -> {
log.debug("closing {}", closeable);
closeable.stop();
Expand All @@ -178,6 +180,7 @@ public synchronized void stop() {
started = false;
}

@SuppressWarnings("unchecked")
public synchronized void add(Controller controller) {
final var configuration = controller.getConfiguration();
final var resourceTypeName = ReconcilerUtils
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package io.javaoperatorsdk.operator;

import java.util.Arrays;
import java.util.Locale;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
Expand Down Expand Up @@ -98,4 +103,49 @@ public static String getDefaultReconcilerName(String reconcilerClassName) {
}
return reconcilerClassName.toLowerCase(Locale.ROOT);
}

public static void handleKubernetesClientException(Exception e, String resourceTypeName) {
if (e instanceof MissingCRDException) {
throw ((MissingCRDException) e);
}

if (e instanceof KubernetesClientException) {
KubernetesClientException ke = (KubernetesClientException) e;
if (404 == ke.getCode()) {
// only throw MissingCRDException if the 404 error occurs on the target CRD
if (resourceTypeName.equals(ke.getFullResourceName())
|| matchesResourceType(resourceTypeName, ke)) {
throw new MissingCRDException(resourceTypeName, null, e.getMessage(), e);
}
}
}
}

private static boolean matchesResourceType(String resourceTypeName,
KubernetesClientException exception) {
final var fullResourceName = exception.getFullResourceName();
if (fullResourceName != null) {
return resourceTypeName.equals(fullResourceName);
} else {
// extract matching information from URI in the message if available
final var message = exception.getMessage();
final var regex = Pattern
.compile(".*http(s?)://[^/]*/api(s?)/(\\S*).*") // NOSONAR: input is controlled
.matcher(message);
if (regex.matches()) {
var group = regex.group(3);
if (group.endsWith(".")) {
group = group.substring(0, group.length() - 1);
}
final var segments = Arrays.stream(group.split("/")).filter(Predicate.not(String::isEmpty))
.collect(Collectors.toUnmodifiableList());
if (segments.size() != 3) {
return false;
}
final var targetResourceName = segments.get(2) + "." + segments.get(0);
return resourceTypeName.equals(targetResourceName);
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public void start() throws OperatorException {

eventSourceManager.start();
} catch (MissingCRDException e) {
stop();
throwMissingCRDException(crdName, specVersion, controllerName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public void start() {
try {
eventSource.start();
} catch (Exception e) {
log.warn("Error starting {}", eventSource, e);
log.warn("Error starting {} -> {}", eventSource, e);
throw e;
}
}
eventProcessor.start();
Expand Down Expand Up @@ -118,6 +119,7 @@ public final void registerEventSource(EventSource eventSource)
}
}

@SuppressWarnings("unchecked")
public void broadcastOnResourceEvent(ResourceAction action, R resource, R oldResource) {
for (var eventSource : eventSources) {
if (eventSource instanceof ResourceEventAware) {
Expand Down Expand Up @@ -194,8 +196,9 @@ public Iterator<EventSource> iterator() {
}

public Set<EventSource> all() {
return new LinkedHashSet<>(sources.values().stream().flatMap(Collection::stream)
.collect(Collectors.toList()));
return sources.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toCollection(LinkedHashSet::new));
}

public void clear() {
Expand All @@ -219,6 +222,7 @@ public void add(EventSource eventSource) {
sources.computeIfAbsent(keyFor(eventSource), k -> new ArrayList<>()).add(eventSource);
}

@SuppressWarnings("rawtypes")
private Class getDependentType(EventSource source) {
return source instanceof ResourceEventSource
? ((ResourceEventSource) source).getResourceClass()
Expand Down Expand Up @@ -248,6 +252,7 @@ private String keyFor(Class<?> dependentType) {
return key;
}

@SuppressWarnings("unchecked")
public <S> ResourceEventSource<R, S> get(Class<S> dependentType, String name) {
final var sourcesForType = sources.get(keyFor(dependentType));
if (sourcesForType == null || sourcesForType.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.javaoperatorsdk.operator.MissingCRDException;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.processing.Controller;
Expand All @@ -26,6 +24,7 @@
import io.javaoperatorsdk.operator.processing.event.source.AbstractResourceEventSource;
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;

import static io.javaoperatorsdk.operator.ReconcilerUtils.handleKubernetesClientException;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
Expand All @@ -46,6 +45,7 @@ public class ControllerResourceEventSource<T extends HasMetadata>
private final ControllerResourceCache<T> cache;
private final TemporaryResourceCache<T> temporaryResourceCache;

@SuppressWarnings("unchecked")
public ControllerResourceEventSource(Controller<T> controller) {
super(controller.getConfiguration().getResourceClass());
this.controller = controller;
Expand Down Expand Up @@ -87,9 +87,7 @@ public void start() {
});
}
} catch (Exception e) {
if (e instanceof KubernetesClientException) {
handleKubernetesClientException(e);
}
handleKubernetesClientException(e, controller.getConfiguration().getResourceTypeName());
throw e;
}
super.start();
Expand Down Expand Up @@ -193,17 +191,6 @@ public SharedIndexInformer<T> getInformer(String namespace) {
return getInformers().get(Objects.requireNonNullElse(namespace, ANY_NAMESPACE_MAP_KEY));
}

private void handleKubernetesClientException(Exception e) {
KubernetesClientException ke = (KubernetesClientException) e;
if (404 == ke.getCode()) {
// only throw MissingCRDException if the 404 error occurs on the target CRD
final var targetCRDName = controller.getConfiguration().getResourceTypeName();
if (targetCRDName.equals(ke.getFullResourceName())) {
throw new MissingCRDException(targetCRDName, null, e.getMessage(), e);
}
}
}

@Override
public Optional<T> getAssociated(T primary) {
return get(ResourceID.fromResource(primary));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedInformer;
import io.fabric8.kubernetes.client.informers.cache.Store;
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
Expand Down Expand Up @@ -142,7 +143,13 @@ private void propagateEvent(T object) {

@Override
public void start() {
sharedInformer.run();
try {
sharedInformer.run();
} catch (Exception e) {
ReconcilerUtils.handleKubernetesClientException(e,
HasMetadata.getFullResourceName(sharedInformer.getApiTypeClass()));
throw e;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class TimerEventSource<R extends HasMetadata>
implements ResourceEventAware<R> {
private static final Logger log = LoggerFactory.getLogger(TimerEventSource.class);

private final Timer timer = new Timer();
private final Timer timer = new Timer(true);
private final AtomicBoolean running = new AtomicBoolean();
private final Map<ResourceID, EventProducerTimeTask> onceTasks = new ConcurrentHashMap<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.javaoperatorsdk.operator;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.V1ApiextensionAPIGroupDSL;
import io.fabric8.kubernetes.client.dsl.ApiextensionsAPIGroupDSL;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.FilterWatchListMultiDeletable;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@SuppressWarnings("unchecked")
public class MockKubernetesClient {

public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz) {
final var client = mock(KubernetesClient.class);
MixedOperation<T, KubernetesResourceList<T>, Resource<T>> resources =
mock(MixedOperation.class);
NonNamespaceOperation<T, KubernetesResourceList<T>, Resource<T>> nonNamespaceOperation =
mock(NonNamespaceOperation.class);
FilterWatchListMultiDeletable<T, KubernetesResourceList<T>> inAnyNamespace = mock(
FilterWatchListMultiDeletable.class);
FilterWatchListDeletable<T, KubernetesResourceList<T>> filterable =
mock(FilterWatchListDeletable.class);
when(resources.inNamespace(anyString())).thenReturn(nonNamespaceOperation);
when(nonNamespaceOperation.withLabelSelector(nullable(String.class))).thenReturn(filterable);
when(resources.inAnyNamespace()).thenReturn(inAnyNamespace);
when(inAnyNamespace.withLabelSelector(nullable(String.class))).thenReturn(filterable);
SharedIndexInformer<T> informer = mock(SharedIndexInformer.class);
when(filterable.runnableInformer(anyLong())).thenReturn(informer);
when(client.resources(clazz)).thenReturn(resources);

final var apiGroupDSL = mock(ApiextensionsAPIGroupDSL.class);
when(client.apiextensions()).thenReturn(apiGroupDSL);
final var v1 = mock(V1ApiextensionAPIGroupDSL.class);
when(apiGroupDSL.v1()).thenReturn(v1);
final var operation = mock(NonNamespaceOperation.class);
when(v1.customResourceDefinitions()).thenReturn(operation);
when(operation.withName(any())).thenReturn(mock(Resource.class));

return client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,25 @@

import org.junit.jupiter.api.Test;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
import io.javaoperatorsdk.operator.sample.simple.TestCustomReconciler;
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;

import static io.javaoperatorsdk.operator.ReconcilerUtils.getDefaultFinalizerName;
import static io.javaoperatorsdk.operator.ReconcilerUtils.getDefaultNameFor;
import static io.javaoperatorsdk.operator.ReconcilerUtils.getDefaultReconcilerName;
import static io.javaoperatorsdk.operator.ReconcilerUtils.handleKubernetesClientException;
import static io.javaoperatorsdk.operator.ReconcilerUtils.isFinalizerValid;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

class ReconcilerUtilsTest {
Expand Down Expand Up @@ -39,4 +48,20 @@ void defaultFinalizerShouldWork() {
void noFinalizerMarkerShouldWork() {
assertTrue(isFinalizerValid(Constants.NO_FINALIZER));
}

@Test
void handleKubernetesExceptionShouldThrowMissingCRDExceptionWhenAppropriate() {
assertThrows(MissingCRDException.class, () -> handleKubernetesClientException(
new KubernetesClientException(
"Failure executing: GET at: https://kubernetes.docker.internal:6443/apis/tomcatoperator.io/v1/tomcats. Message: Not Found.",
404, null),
HasMetadata.getFullResourceName(Tomcat.class)));
}

@Group("tomcatoperator.io")
@Version("v1")
@ShortNames("tc")
private static class Tomcat extends CustomResource<Void, Void> implements Namespaced {
}

}
Loading