Skip to content

feat: key based bulk resource creation #1521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ public static <T> ReconcileResult<T> noOperation(T resource) {
return new ReconcileResult<>(resource, Operation.NONE);
}

@SafeVarargs
public static <T> ReconcileResult<T> aggregatedResult(ReconcileResult<T>... results) {
public static <T> ReconcileResult<T> aggregatedResult(List<ReconcileResult<T>> results) {
if (results == null) {
throw new IllegalArgumentException("Should provide results to aggregate");
}
if (results.length == 1) {
return results[0];
if (results.size() == 1) {
return results.get(0);
}
final Map<T, Operation> operations = new HashMap<>(results.length);
final Map<T, Operation> operations = new HashMap<>(results.size());
for (ReconcileResult<T> res : results) {
res.getSingleResource().ifPresent(r -> operations.put(r, res.getSingleOperation()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.javaoperatorsdk.operator.processing.dependent;

import java.util.Optional;
import java.util.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,90 +24,92 @@ public abstract class AbstractDependentResource<R, P extends HasMetadata>

protected Creator<R, P> creator;
protected Updater<R, P> updater;
protected BulkDependentResource<R, P> bulkDependentResource;
@SuppressWarnings("rawtypes")
protected BulkDependentResource bulkDependentResource;
private ResourceDiscriminator<R, P> resourceDiscriminator;
private int currentCount;

@SuppressWarnings("unchecked")
public AbstractDependentResource() {
@SuppressWarnings({"unchecked", "rawtypes"})
protected AbstractDependentResource() {
creator = creatable ? (Creator<R, P>) this : null;
updater = updatable ? (Updater<R, P>) this : null;

bulkDependentResource = bulk ? (BulkDependentResource<R, P>) this : null;
bulkDependentResource = bulk ? (BulkDependentResource) this : null;
}

@SuppressWarnings("unchecked")
@Override
public ReconcileResult<R> reconcile(P primary, Context<P> context) {
if (bulk) {
final var count = bulkDependentResource.count(primary, context);
deleteBulkResourcesIfRequired(count, primary, context);
@SuppressWarnings("unchecked")
final ReconcileResult<R>[] results = new ReconcileResult[count];
for (int i = 0; i < count; i++) {
results[i] = reconcileIndexAware(primary, i, context);
final var targetKeys = bulkDependentResource.targetKeys(primary, context);
Map<Object, R> actualResources =
bulkDependentResource.getSecondaryResources(primary, context);

deleteBulkResourcesIfRequired(targetKeys, actualResources, primary, context);
final List<ReconcileResult<R>> results = new ArrayList<>(targetKeys.size());

for (Object key : targetKeys) {
results.add(reconcileIndexAware(primary, actualResources.get(key), key, context));
}
currentCount = count;
return ReconcileResult.aggregatedResult(results);
} else {
return reconcileIndexAware(primary, 0, context);
var actualResource = getSecondaryResource(primary, context);
return reconcileIndexAware(primary, actualResource.orElse(null), null, context);
}
}

protected void deleteBulkResourcesIfRequired(int targetCount, P primary, Context<P> context) {
if (targetCount >= currentCount) {
return;
}
for (int i = targetCount; i < currentCount; i++) {
var resource = bulkDependentResource.getSecondaryResource(primary, i, context);
var index = i;
resource.ifPresent(
r -> bulkDependentResource.deleteBulkResourceWithIndex(primary, r, index, context));
}
@SuppressWarnings({"unchecked", "rawtypes"})
protected void deleteBulkResourcesIfRequired(Set targetKeys, Map<Object, R> actualResources,
P primary, Context<P> context) {
actualResources.forEach((key, value) -> {
if (!targetKeys.contains(key)) {
bulkDependentResource.deleteBulkResource(primary, value, context);
}
});
}

protected ReconcileResult<R> reconcileIndexAware(P primary, int i, Context<P> context) {
Optional<R> maybeActual = bulk ? bulkDependentResource.getSecondaryResource(primary, i, context)
: getSecondaryResource(primary, context);
@SuppressWarnings("unchecked")
protected ReconcileResult<R> reconcileIndexAware(P primary, R resource, Object key,
Context<P> context) {
if (creatable || updatable) {
if (maybeActual.isEmpty()) {
if (resource == null) {
if (creatable) {
var desired = desiredIndexAware(primary, i, context);
var desired = desiredIndexAware(primary, key, context);
throwIfNull(desired, primary, "Desired");
logForOperation("Creating", primary, desired);
var createdResource = handleCreate(desired, primary, context);
return ReconcileResult.resourceCreated(createdResource);
}
} else {
final var actual = maybeActual.get();
if (updatable) {
final Matcher.Result<R> match;
if (bulk) {
match = bulkDependentResource.match(actual, primary, i, context);
match = bulkDependentResource.match(resource, primary, key, context);
} else {
match = updater.match(actual, primary, context);
match = updater.match(resource, primary, context);
}
if (!match.matched()) {
final var desired =
match.computedDesired().orElse(desiredIndexAware(primary, i, context));
match.computedDesired().orElse(desiredIndexAware(primary, key, context));
throwIfNull(desired, primary, "Desired");
logForOperation("Updating", primary, desired);
var updatedResource = handleUpdate(actual, desired, primary, context);
var updatedResource = handleUpdate(resource, desired, primary, context);
return ReconcileResult.resourceUpdated(updatedResource);
}
} else {
log.debug("Update skipped for dependent {} as it matched the existing one", actual);
log.debug("Update skipped for dependent {} as it matched the existing one", resource);
}
}
} else {
log.debug(
"Dependent {} is read-only, implement Creator and/or Updater interfaces to modify it",
getClass().getSimpleName());
}
return ReconcileResult.noOperation(maybeActual.orElse(null));
return ReconcileResult.noOperation(resource);
}

private R desiredIndexAware(P primary, int i, Context<P> context) {
return bulk ? desired(primary, i, context) : desired(primary, context);
private R desiredIndexAware(P primary, Object key, Context<P> context) {
return bulk ? (R) bulkDependentResource.desired(primary, key, context)
: desired(primary, context);
}

public Optional<R> getSecondaryResource(P primary, Context<P> context) {
Expand Down Expand Up @@ -172,13 +174,12 @@ protected R desired(P primary, Context<P> context) {
"desired method must be implemented if this DependentResource can be created and/or updated");
}

protected R desired(P primary, int index, Context<P> context) {
throw new IllegalStateException("Must be implemented for bulk DependentResource creation");
}

@SuppressWarnings("unchecked")
public void delete(P primary, Context<P> context) {
if (bulk) {
deleteBulkResourcesIfRequired(0, primary, context);
var actualResources = bulkDependentResource.getSecondaryResources(primary, context);
deleteBulkResourcesIfRequired(Collections.emptySet(), actualResources, primary, context);
} else {
handleDelete(primary, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.javaoperatorsdk.operator.processing.dependent;

import java.util.Optional;
import java.util.Map;
import java.util.Set;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;
Expand All @@ -13,24 +14,27 @@
* {@link Creator} and {@link Deleter} interfaces out of the box. A concrete dependent resource can
* implement additionally also {@link Updater}.
*/
public interface BulkDependentResource<R, P extends HasMetadata> extends Creator<R, P>, Deleter<P> {
public interface BulkDependentResource<R, P extends HasMetadata, T>
extends Creator<R, P>, Deleter<P> {

/**
* @return number of resources to create
*/
int count(P primary, Context<P> context);
Set<T> targetKeys(P primary, Context<P> context);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative would be here to provide not the keys but the resources, like the desired map: Map<T,R> desiredResources()
That would however need an adjustment in AbstractDependentResource, won't play nice with current matchers: making the simpler macher appraoch would make it again a little nicer. (Comparing just desired vs actual state)
@metacosm

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might take a look on it as a separate PR


R desired(P primary, int index, Context<P> context);
Map<T, R> getSecondaryResources(P primary, Context<P> context);

R desired(P primary, T key, Context<P> context);

// todo add back key?
/**
* Used to delete resource if the desired count is lower than the actual count of a resource.
*
* @param primary resource
* @param resource actual resource from the cache for the index
* @param i index of the resource
* @param context actual context
*/
void deleteBulkResourceWithIndex(P primary, R resource, int i, Context<P> context);
void deleteBulkResource(P primary, R resource, Context<P> context);

/**
* Determines whether the specified secondary resource matches the desired state with target index
Expand All @@ -45,8 +49,6 @@ public interface BulkDependentResource<R, P extends HasMetadata> extends Creator
* convenience methods ({@link Result#nonComputed(boolean)} and
* {@link Result#computed(boolean, Object)})
*/
Result<R> match(R actualResource, P primary, int index, Context<P> context);

Optional<R> getSecondaryResource(P primary, int index, Context<P> context);
Result<R> match(R actualResource, P primary, T index, Context<P> context);

}
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,20 @@ public Result<R> match(R actualResource, P primary, Context<P> context) {
return matcher.match(actualResource, primary, context);
}

public Result<R> match(R actualResource, P primary, int index, Context<P> context) {
final var desired = desired(primary, index, context);
return GenericKubernetesResourceMatcher.match(desired, actualResource, false);
public Result<R> match(R actualResource, P primary, Object key, Context<P> context) {
final var desired = bulkDependentResource.desired(primary, key, context);
return GenericKubernetesResourceMatcher.match((R) desired, actualResource, false);
}

protected void handleDelete(P primary, Context<P> context) {
var resource = getSecondaryResource(primary, context);
resource.ifPresent(r -> client.resource(r).delete());
}

public void deleteBulkResourceWithIndex(P primary, R resource, int i, Context<P> context) {

public void deleteBulkResource(P primary, R resource, Context<P> context) {
client.resource(resource).delete();

}

protected Resource<R> prepare(R desired, P primary, String actionName) {
Expand Down Expand Up @@ -229,11 +231,6 @@ protected R desired(P primary, Context<P> context) {
return super.desired(primary, context);
}

@Override
protected R desired(P primary, int index, Context<P> context) {
return super.desired(primary, index, context);
}

private void prepareEventFiltering(R desired, ResourceID resourceID) {
((InformerEventSource<R, P>) eventSource().orElseThrow())
.prepareForCreateOrUpdateEventFiltering(resourceID, desired);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package io.javaoperatorsdk.operator.sample.bulkdependent;

import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.*;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;
import io.javaoperatorsdk.operator.processing.dependent.BulkDependentResource;
import io.javaoperatorsdk.operator.processing.dependent.Creator;
import io.javaoperatorsdk.operator.processing.dependent.Matcher;
import io.javaoperatorsdk.operator.processing.dependent.Updater;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;

Expand All @@ -22,48 +21,62 @@ public class ConfigMapDeleterBulkDependentResource
implements Creator<ConfigMap, BulkDependentTestCustomResource>,
Updater<ConfigMap, BulkDependentTestCustomResource>,
Deleter<BulkDependentTestCustomResource>,
BulkDependentResource<ConfigMap, BulkDependentTestCustomResource> {
BulkDependentResource<ConfigMap, BulkDependentTestCustomResource, Integer> {

public static final String LABEL_KEY = "bulk";
public static final String LABEL_VALUE = "true";
public static final String ADDITIONAL_DATA_KEY = "additionalData";
public static final String INDEX_DELIMITER = "-";

public ConfigMapDeleterBulkDependentResource() {
super(ConfigMap.class);
}

@Override
public ConfigMap desired(BulkDependentTestCustomResource primary,
int index, Context<BulkDependentTestCustomResource> context) {
public Set<Integer> targetKeys(BulkDependentTestCustomResource primary,
Context<BulkDependentTestCustomResource> context) {
var number = primary.getSpec().getNumberOfResources();
Set<Integer> res = new HashSet<>();
for (int i = 0; i < number; i++) {
res.add(i);
}
return res;
}

@Override
public ConfigMap desired(BulkDependentTestCustomResource primary, Integer key,
Context<BulkDependentTestCustomResource> context) {
ConfigMap configMap = new ConfigMap();
configMap.setMetadata(new ObjectMetaBuilder()
.withName(primary.getMetadata().getName() + "-" + index)
.withName(primary.getMetadata().getName() + INDEX_DELIMITER + key)
.withNamespace(primary.getMetadata().getNamespace())
.withLabels(Map.of(LABEL_KEY, LABEL_VALUE))
.build());
configMap.setData(
Map.of("number", "" + index, ADDITIONAL_DATA_KEY, primary.getSpec().getAdditionalData()));
Map.of("number", "" + key, ADDITIONAL_DATA_KEY, primary.getSpec().getAdditionalData()));
return configMap;
}

// todo fix generics?
@Override
public int count(BulkDependentTestCustomResource primary,
Context<BulkDependentTestCustomResource> context) {
return primary.getSpec().getNumberOfResources();
public Matcher.Result<ConfigMap> match(ConfigMap actualResource,
BulkDependentTestCustomResource primary,
Integer index, Context<BulkDependentTestCustomResource> context) {
return super.match(actualResource, primary, index, context);
}

@Override
public Optional<ConfigMap> getSecondaryResource(BulkDependentTestCustomResource primary,
int index, Context<BulkDependentTestCustomResource> context) {
var resources = context.getSecondaryResources(resourceType()).stream()
.filter(r -> r.getMetadata().getName().endsWith("-" + index))
.collect(Collectors.toList());
if (resources.isEmpty()) {
return Optional.empty();
} else if (resources.size() > 1) {
throw new IllegalStateException("More than one resource found for index:" + index);
} else {
return Optional.of(resources.get(0));
}
public Map<Integer, ConfigMap> getSecondaryResources(BulkDependentTestCustomResource primary,
Context<BulkDependentTestCustomResource> context) {
var configMaps = context.getSecondaryResources(ConfigMap.class);
Map<Integer, ConfigMap> result = new HashMap<>(configMaps.size());
configMaps.forEach(cm -> {
String name = cm.getMetadata().getName();
if (name.startsWith(primary.getMetadata().getName())) {
String key = name.substring(name.lastIndexOf(INDEX_DELIMITER) + 1);
result.put(Integer.parseInt(key), cm);
}
});
return result;
}
}
Loading